# Python-PySpark Coding Challenge
Purpose: Load purchases and products, combine, and summarize for reporting.

Written by: Scott Davis 2023-03-27

In [138]:
import pandas as pd

In [139]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [140]:
# show raw CSV files in filesystem - optional
!ls

CustomerPurchasesDataset.csv  ProductDetailsDataset.csv  sample_data


In [141]:
# Import raw data to see structure - optional

#dfCust = spark.read.csv("CustomerPurchasesDataset.csv", header = True, quote = "\'")
#dfProd = spark.read.csv("ProductDetailsDataset.csv", header = True, quote = "\'")

#dfCust.show()
#dfCust.printSchema()

#dfProd.show()
#dfProd.printSchema()

In [142]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

In [143]:
# Assign data types
schemaCust = StructType([
    StructField('customer_id', IntegerType()),
    StructField('product_id', IntegerType()),
    StructField('purchase_amount', DoubleType())
])

schemaProd = StructType([
    StructField('product_id', IntegerType()),
    StructField('product_name', StringType()),
    StructField('unit_price', DoubleType())
])


In [144]:
# read raw data with schema definitions
dfCust = spark.read.csv("CustomerPurchasesDataset.csv", header = True, quote = "\'", schema = schemaCust)
dfProd = spark.read.csv("ProductDetailsDataset.csv", header = True, quote = "\'", schema = schemaProd)

In [145]:
# show data
dfCust.show()
dfCust.printSchema()

dfProd.show()
dfProd.printSchema()

+-----------+----------+---------------+
|customer_id|product_id|purchase_amount|
+-----------+----------+---------------+
|        101|      1001|           50.0|
|        101|      1002|           75.0|
|        102|      1001|          100.0|
|        103|      1003|           25.0|
|        104|      1004|          150.0|
+-----------+----------+---------------+

root
 |-- customer_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- purchase_amount: double (nullable = true)

+----------+------------+----------+
|product_id|product_name|unit_price|
+----------+------------+----------+
|      1001|   Product A|      10.0|
|      1002|   Product B|      15.0|
|      1003|   Product C|       5.0|
|      1004|   Product D|      20.0|
|      1005|   Product E|      30.0|
+----------+------------+----------+

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- unit_price: double (nullable = true)



## Problem 1
A PySpark DataFrame containing the joined data with the following columns:
 -	customer_id
 - 	product_id
 -	purchase_amount
 -	product_name
 -	unit_price

In [146]:
# inner join on product_id and drop excess join column
dfJoin1 = dfCust.join(dfProd, dfCust.product_id == dfProd.product_id, how = 'inner').drop(dfProd.product_id)
dfJoin1.show()

+-----------+----------+---------------+------------+----------+
|customer_id|product_id|purchase_amount|product_name|unit_price|
+-----------+----------+---------------+------------+----------+
|        102|      1001|          100.0|   Product A|      10.0|
|        101|      1001|           50.0|   Product A|      10.0|
|        101|      1002|           75.0|   Product B|      15.0|
|        103|      1003|           25.0|   Product C|       5.0|
|        104|      1004|          150.0|   Product D|      20.0|
+-----------+----------+---------------+------------+----------+



## Problem 2

A PySpark DataFrame containing the total revenue generated by each product with the following columns:
 - product_id
 - product_name
 - total_revenue

In [147]:
# use joined data, group by product, sum by purchase amount, and rename sum column
dfJoin2 = dfJoin1.groupBy('product_id', 'product_name').sum('purchase_amount').orderBy('product_id').withColumnRenamed('sum(purchase_amount)', 'total_revenue')
dfJoin2.show()

+----------+------------+-------------+
|product_id|product_name|total_revenue|
+----------+------------+-------------+
|      1001|   Product A|        150.0|
|      1002|   Product B|         75.0|
|      1003|   Product C|         25.0|
|      1004|   Product D|        150.0|
+----------+------------+-------------+



# Problem 3
A PySpark DataFrame containing the total revenue generated by each customer with the following columns:
 - customer_id
 - total_revenue


In [148]:
# use joined data, group by customer, sum purchase amount, and rename sum column
dfJoin3 = dfJoin1.groupBy('customer_id').sum('purchase_amount').orderBy('customer_id').withColumnRenamed('sum(purchase_amount)', 'total_revenue')
dfJoin3.show()

+-----------+-------------+
|customer_id|total_revenue|
+-----------+-------------+
|        101|        125.0|
|        102|        100.0|
|        103|         25.0|
|        104|        150.0|
+-----------+-------------+



## Problem 4
A PySpark DataFrame containing the top 5 products by revenue with the following columns:
 - product_id
 - product_name
 - total_revenue


In [149]:
# use output from product summary above, sort descending by purchase amount, keep top 5
dfJoin4 = dfJoin2.orderBy('total_revenue', ascending = False).limit(5)
dfJoin4.show()

+----------+------------+-------------+
|product_id|product_name|total_revenue|
+----------+------------+-------------+
|      1001|   Product A|        150.0|
|      1004|   Product D|        150.0|
|      1002|   Product B|         75.0|
|      1003|   Product C|         25.0|
+----------+------------+-------------+



## Problem 5
A PySpark DataFrame containing the top 5 customers by revenue with the following columns:
 - customer_id
 - total_revenue


In [150]:
# use output from customer summary above, sort descending by purchase amount, keep top 5
dfJoin5 = dfJoin3.orderBy('total_revenue', ascending = False).limit(5)
dfJoin5.show()

+-----------+-------------+
|customer_id|total_revenue|
+-----------+-------------+
|        104|        150.0|
|        101|        125.0|
|        102|        100.0|
|        103|         25.0|
+-----------+-------------+

