In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType

In [2]:
spark = SparkSession.builder.getOrCreate()
df = spark.read.format("Json") \
  .option("inferSchema", "true") \
  .load("/FileStore/tables/TakeAway/data.json")
df.show()

In [3]:
df.printSchema()

In [4]:
df1 = df.withColumn("orders", F.explode("orders")).select("customerId", F.col("orders.orderId").alias("orderId"), F.col("orders.basket").alias("basket")).drop("orders")
df1.show()

In [5]:
df2 = df1.withColumn("basket", F.explode("basket").alias("basket")).select("customerId", "orderId", F.col("basket.productId").alias("productId"), F.col("basket.productType").alias("productType"), F.col("basket.grossMerchandiseValueEur").alias("grossMerchandiseValueEur"))
df2.show(truncate = False)

In [6]:
def netMerchCal(x, y):
  if x == "beverage":
    return y+0.09*y
  elif x == "hot food":
    return y+0.15*y
  else:
    return y+0.07*y
  
netMerch_udf = F.udf(lambda a, b : netMerchCal(a, b), DoubleType())

df3 = df2.withColumn("netMerch", netMerch_udf("productType", "grossMerchandiseValueEur"))
df3.show(truncate = False)

In [7]:
df3.groupBy("orderId").agg(F.sum("netMerch")).show(truncate = False)

In [8]:
df3.write.format("csv").save("/FileStore/tables/TakeAway/result/")