In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, trim

#Custom Schema
customersSchema = StructType([StructField("Id", IntegerType(), False), StructField("LastName", StringType(), False), StructField("FirstName", StringType(), False)])
customersDF = spark.read.schema(customersSchema).load("./BAKERY/customers.csv",
                              format="csv", 
                              sep=",",
                              header="true")

customersDF = customersDF.withColumn("LastName", trim(col("LastName"))).withColumn("FirstName", trim(col("FirstName")))

#Auto schema and trim
goodsDF = spark.read.load("./BAKERY/goods.csv",
                              format="csv", 
                              sep=",",
                              inferSchema='true',
                              header="true")

itemsDF = spark.read.load("./BAKERY/items.csv",
                              format="csv", 
                              sep=",",
                              inferSchema='true',
                              header="true")

itemsDF = itemsDF.withColumn("Ordinal", trim(col(" Ordinal"))).withColumn("Item", trim(col(" Item"))).drop(" Item", " Ordinal")


receiptsDF = spark.read.load("./BAKERY/receipts.csv",
                              format="csv", 
                              sep=",",
                              inferSchema='true',
                              header="true")

receiptsDF = receiptsDF.withColumn("Date", trim(col(" Date"))).withColumn("CustomerId", col(" CustomerId").cast(IntegerType())).drop(" Date", " CustomerId")

In [2]:
#Count by sales
productsCountDF = itemsDF.groupBy(itemsDF.Item).count().withColumnRenamed('count', 'Solds')

#Join with names and categories, replacing nulls with 0s
productsTotalDF = goodsDF.join(productsCountDF, productsCountDF.Item == goodsDF.Id, 'left').fillna({'Solds':'0'})

productsTotalDF.orderBy(productsTotalDF.Id).select(productsTotalDF.Id, 
                                                   productsTotalDF.Flavor, 
                                                   productsTotalDF.Food, 
                                                   productsTotalDF.Solds).repartition(1).write.format('csv').save('./ProductSales')

In [3]:
#Count num purchases and filter those with less than 2
purchasesCountDF = receiptsDF.groupBy(receiptsDF.CustomerId, receiptsDF.Date).count().withColumnRenamed('count', 'NumPurchases')
purchasesCountDF = purchasesCountDF.filter(purchasesCountDF.NumPurchases > 1).drop(purchasesCountDF.NumPurchases)

#Append all dates to same Id
appendedDatesDF = purchasesCountDF.rdd.reduceByKey(lambda a, b: f"{a}, {b}").toDF()

#Join clients with their data
multiShoppersDF = appendedDatesDF.join(customersDF, appendedDatesDF._1 == customersDF.Id)

multiShoppersDF.orderBy(multiShoppersDF.Id).select(multiShoppersDF.Id, 
                                                   multiShoppersDF.FirstName, 
                                                   multiShoppersDF.LastName, 
                                                   multiShoppersDF._2).repartition(1).write.format('csv').save('./MultiShoppers')

In [4]:
#Get receipts total
receiptsTotalDF = itemsDF.join(goodsDF, itemsDF.Item == goodsDF.Id).groupBy(col("Reciept")).sum().select(col("Reciept"), col("sum(Price)"))

#Join with names and categories, replacing nulls with 0s
customersTotalDF = receiptsDF.join(receiptsTotalDF, receiptsTotalDF.Reciept == receiptsDF.RecieptNumber).groupBy(col("CustomerId")).sum().select(col("CustomerId"), col("sum(sum(Price))"))

#Join customers info with their purchases total amount
customersTotalWithNamesDF = customersTotalDF.join(customersDF, customersTotalDF.CustomerId == customersDF.Id).withColumnRenamed("sum(sum(Price))", "Total amount")

customersTotalWithNamesDF.orderBy(customersTotalWithNamesDF.Id).select(customersTotalWithNamesDF.Id,
                                 customersTotalWithNamesDF.FirstName,
                                 customersTotalWithNamesDF.LastName, 
                                 col('Total amount')).repartition(1).write.format('csv').save('./CustomersTotalBill')