<a href="https://colab.research.google.com/github/RamyaRamasubramaniyan/PysparkProjects/blob/main/Spark_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [None]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
        .master("local")
        .appName("Colab")
        .config('spark.ui.port', '4050')
        .getOrCreate())

In [None]:
spark

In [None]:
from datetime import datetime, date
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DateType, DoubleType, BooleanType, DecimalType
from pyspark.sql.functions import lit, col, when, sum as sum_, max as max_, last, datediff, to_date
from pyspark.sql import Window

In [None]:
sample_data = [(1, "Ramya",  2000.00, "Tamil", True, 2, date(2024, 1, 31)),
               (2, "Willem", 1000.53, "German", False, 0, date(2023, 12, 31)),
               (3, "Saskia", 100.00, "Dutch", True, 1, date(2024, 4, 21)),
               (4, "Bianca", 10.64, "English", True, 5, date(2024, 4, 21)),
               (5, "Jeroen", 270.05, "French", True, 4, date(2024, 5, 30)),
               (5, "Jeroen", 270.05, "French", True, 4, date(2024, 5, 30)),
               (6, "Saskia", 400.00, "Japanese", True, 2, date(2024, 3, 13)),
               (7, "Tristan", 5.00, "English", False, 0, None),
               (8, "Danil", 20.05, "Roman", True, 4, date(2024, 1, 30)),
               (9, "Danil", 80.10, "Thai", True, 2, date(2024, 2, 3)),
               ]
print(sample_data)

In [None]:
sample_schema = StructType([StructField("SNo", IntegerType(), False),
                           StructField("Borrower_Name", StringType(), False),
                           StructField("Library_Credits", DoubleType(), True),
                           StructField("Language", StringType(), True),
                           StructField("Has_Borrowed", BooleanType(), True),
                           StructField("Borrowed_Books_Count", IntegerType(), True),
                           StructField("Book_Due_Date", DateType(), True),
                           ])
print(sample_schema)

In [None]:
df = spark.createDataFrame(data=sample_data, schema=sample_schema)

In [None]:
df.show(10, False)

In [None]:
fields = df.columns

In [None]:
# # csv writer using python
# import csv
# sample_header = fields
# with open("/content/drive/MyDrive/Pyspark/Library_Sample_Data.csv", newline='', mode='w+') as myfile:
#   wr = csv.writer(myfile, quoting=csv.QUOTE_ALL)
#   wr.writerow(sample_header)
#   wr.writerows(sample_data)

In [None]:
S# csv writer using dataframe writes compressed csv
# df.write.csv("/content/Library_Sample_Data.csv")

In [None]:
df.count()

In [None]:
df.distinct().count()

In [None]:
unique_df = df.dropDuplicates(subset=["Borrower_Name"])
unique_df.orderBy("SNo").show()
unique_df.count()

In [None]:
grouped_data = df.groupBy("Borrower_Name", "Language").agg(sum_("Library_Credits").alias("Total_Library_Credits"),
                                                           sum_("Borrowed_Books_Count").alias("Total_Borrowed_Books_Count")
                                                           )
grouped_data.orderBy("Borrower_Name").show()

In [None]:
today = datetime.now().date()
tranform_data = (df
                 .withColumn("Due_Fine_Flag", when((col("Book_Due_Date") < today)
                                                      & (col("Has_Borrowed") == True), True).otherwise(False))
                 .withColumn("Due_Fine_Amount", (when((col("Book_Due_Date") < today)
                                                      & (col("Has_Borrowed") == True),
                                                   datediff(lit(today), col("Book_Due_Date")) * 5 * col("Borrowed_Books_Count") )
                                                .otherwise(lit(0))
                                                ).cast(DecimalType(10, 2))
                 ))

tranform_data.orderBy("SNo").show()

In [None]:
# using groupby
grouped_data = (tranform_data
                .groupBy("Borrower_Name", "Language")
                 .agg(sum_("Library_Credits").alias("Total_Library_Credits"),
                      sum_("Borrowed_Books_Count").alias("Total_Borrowed_Books_Count"),
                      last("Due_Fine_Flag").alias("Due_Fine_Flag"),
                      sum_("Due_Fine_Amount").alias("Total_Due_Fine_Amount")
                      )
).filter(col("Due_Fine_Flag"))
grouped_data.orderBy("Borrower_Name").show()

In [None]:
#using window function
window = Window.partitionBy("Borrower_Name", "Language").orderBy("Book_Due_Date")
df_window = (tranform_data
             .withColumn("Total_Library_Credits", sum_("Library_Credits").over(window))
             .withColumn("Total_Borrowed_Books_Count", sum_("Borrowed_Books_Count").over(window))
             .withColumn("Total_Due_Fine_Amount", sum_("Due_Fine_Amount").over(window))
             ).filter(col("Due_Fine_Flag"))
df_window.orderBy("Borrower_Name").show()

In [None]:
#Creating a class
import abc
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DateType, DoubleType, BooleanType, DecimalType
from pyspark.sql.functions import lit, col, when, sum as sum_, max as max_, last, datediff, to_date
from pyspark.sql import Window

class SparkSessionCreation():

    def _spark_session():
      return (SparkSession.builder
              .master("local")
              .appName("Colab")
              .config('spark.ui.port', '4050')
              .getOrCreate())


class LibraryManagementfunctions(SparkSessionCreation):

    def __init__(self):
      self.spark = super()._spark_session

    def run_pipeline(self):
        raise NotImplementedError

    def _due_fine_calculator(self):
        pass

    def _due_fine_checker(self):
        raise NotImplementedError

class LibraryManagement(LibraryManagementfunctions):

    def __init__(self):
      super().__init__()
      today = datetime.now().date()
      print(self.spark)

    def run_pipeline(self):
      return (self._read_source()
              .tranform(self._due_fine_calculator)
              .transform(self._due_fine_calculator)
              )

    def _read_source(self):
      return (self
              .spark()
              .read
              .option("delimiter", ",")
              .option("header", "true")
              .csv("/content/drive/MyDrive/Pyspark/Library_Sample_Data.csv"))

    def _due_fine_calculator(self):
      return(df
             .withColumn("Due_Fine_Flag", when((col("Book_Due_Date") < self.today)
                                                & (col("Has_Borrowed") == True), True).otherwise(False))
             .withColumn("Due_Fine_Amount", (when((col("Book_Due_Date") < self.today)
                                                 & (col("Has_Borrowed") == True),
                                                  datediff(lit(today), col("Book_Due_Date")) * 5 * col("Borrowed_Books_Count") )
                                                             .otherwise(lit(0))
                                            ).cast(DecimalType(10, 2))
              ))

    @staticmethod
    def _due_fine_checker(df):
      window = Window.partitionBy("Borrower_Name", "Language").orderBy("Book_Due_Date")
      return (df
             .withColumn("Total_Library_Credits", sum_("Library_Credits").over(window))
             .withColumn("Total_Borrowed_Books_Count", sum_("Borrowed_Books_Count").over(window))
             .withColumn("Total_Due_Fine_Amount", sum_("Due_Fine_Amount").over(window))
             ).filter(col("Due_Fine_Flag"))


In [None]:
obj = LibraryManagement()
obj.run_pipeline()