In [0]:
# Import required libraries from PySpark
from pyspark.sql.types import StructField, StringType, IntegerType, DateType, StructType
from pyspark.sql.functions import year, month, quarter, desc, count


In [0]:
# Define schema for the product data
prod_schema = StructType([
    StructField('PROD_ID', StringType(), True),   # Product ID
    StructField('PROD_NAME', StringType(), True), # Product Name
    StructField('PRICE', StringType(), True)      # Price of the product
])

In [0]:
# Load product data from CSV file into a DataFrame with the specified schema
prod_df = spark.read.csv('/FileStore/tables/product.txt', header=False, schema=prod_schema)


In [0]:
# Display the first 5 rows of the product DataFrame
prod_df.head(5)

Out[84]: [Row(PROD_ID='1', PROD_NAME=' Margherita', PRICE=' 100'),
 Row(PROD_ID='2', PROD_NAME=' Pepperoni', PRICE=' 150'),
 Row(PROD_ID='3', PROD_NAME=' Cheese Pizza', PRICE=' 120'),
 Row(PROD_ID='4', PROD_NAME=' Meat Lover’s', PRICE=' 110'),
 Row(PROD_ID='5', PROD_NAME=' Veggie Delight', PRICE=' 80')]

In [0]:
# Define schema for the order data
orders_schema = StructType([
    StructField("PROD_ID", IntegerType(), True),    # Product ID
    StructField("CUST_ID", StringType(), True),     # Customer ID
    StructField("ORDER_DATE", DateType(), True),    # Order date
    StructField("LOCATION", StringType(), True),    # Order location
    StructField("ORDER_TYPE", StringType(), True) # Source of the order
])

In [0]:
# Load order data from CSV file into a DataFrame with the specified schema
orders_df = spark.read.csv('/FileStore/tables/order.txt', header=False, schema=orders_schema)


In [0]:
# Display the first 5 rows of the order DataFrame
orders_df.head(5)

Out[102]: [Row(PROD_ID=1, CUST_ID='A', ORDER_DATE=datetime.date(2023, 1, 1), LOCATION='Egypt', ORDER_TYPE='Delivery'),
 Row(PROD_ID=2, CUST_ID='A', ORDER_DATE=datetime.date(2022, 1, 1), LOCATION='Egypt', ORDER_TYPE='Delivery'),
 Row(PROD_ID=2, CUST_ID='A', ORDER_DATE=datetime.date(2023, 1, 7), LOCATION='Egypt', ORDER_TYPE='Delivery'),
 Row(PROD_ID=3, CUST_ID='A', ORDER_DATE=datetime.date(2023, 1, 10), LOCATION='Egypt', ORDER_TYPE='Restaurant'),
 Row(PROD_ID=3, CUST_ID='A', ORDER_DATE=datetime.date(2022, 1, 11), LOCATION='Egypt', ORDER_TYPE='Delivery')]

In [0]:
# Add additional columns to the order DataFrame for year, month, and quarter derived from the order date
orders_df = orders_df.withColumn('ORDER_YEAR', year(orders_df.ORDER_DATE))
orders_df = orders_df.withColumn('ORDER_MONTH', month(orders_df.ORDER_DATE))
orders_df = orders_df.withColumn('ORDER_QUARTER', quarter(orders_df.ORDER_DATE))


In [0]:
# Display the updated order DataFrame with the new columns
display(orders_df)

PROD_ID,CUST_ID,ORDER_DATE,LOCATION,ORDER_TYPE,ORDER_YEAR,ORDER_MONTH,ORDER_QUARTER
1,A,2023-01-01,Egypt,Delivery,2023,1,1
2,A,2022-01-01,Egypt,Delivery,2022,1,1
2,A,2023-01-07,Egypt,Delivery,2023,1,1
3,A,2023-01-10,Egypt,Restaurant,2023,1,1
3,A,2022-01-11,Egypt,Delivery,2022,1,1
3,A,2023-01-11,Egypt,Restaurant,2023,1,1
2,B,2022-02-01,Egypt,Delivery,2022,2,1
2,B,2023-01-02,Egypt,Delivery,2023,1,1
1,B,2023-01-04,Egypt,Restaurant,2023,1,1
1,B,2023-02-11,Egypt,Delivery,2023,2,1


In [0]:
# Join the product DataFrame with the order DataFrame on the product ID column
sales_df = orders_df.join(prod_df, "PROD_ID")

In [0]:
# Calculate total spending by each customer
customer_spend = (
    sales_df.groupBy("CUST_ID")  # Group by customer ID
    .agg({"PRICE": "SUM"})       # Aggregate sum of prices
    .withColumnRenamed("sum(PRICE)", "Total Cost") # Rename the aggregated column
    .orderBy(desc("Total Cost")) # Order by total cost in descending order
)
display(customer_spend)

CUST_ID,Total Cost
B,4440.0
A,4260.0
C,2400.0
E,2040.0
D,1200.0


Databricks visualization. Run in Databricks to view.

In [0]:

# Calculate total spending by product category
spend_category_df = (
    sales_df.groupBy("PROD_NAME") # Group by product name
    .agg({"PRICE": "SUM"})        # Aggregate sum of prices
    .withColumnRenamed("sum(PRICE)", "Total Cost") # Rename the aggregated column
    .orderBy(desc("Total Cost"))  # Order by total cost in descending order
)
display(spend_category_df)

PROD_NAME,Total Cost
Cheese Pizza,5760.0
Pepperoni,3600.0
Margherita,2100.0
Meat Lover’s,1320.0
Hawaiian,1080.0
Veggie Delight,480.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Calculate total monthly revenue
monthly_revenue_df = (
    sales_df.groupBy("ORDER_MONTH") # Group by order month
    .agg({"PRICE": "SUM"})          # Aggregate sum of prices
    .withColumnRenamed("sum(PRICE)", "Total Cost") # Rename the aggregated column
    .orderBy(desc("Total Cost"))    # Order by total cost in descending order
)
display(monthly_revenue_df)

ORDER_MONTH,Total Cost
1,2960.0
6,2960.0
5,2960.0
2,2730.0
3,910.0
7,910.0
11,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Calculate yearly revenue
Year_revenue_df = (
    sales_df.groupBy("ORDER_YEAR")  # Group by order year
    .agg({"PRICE": "SUM"})          # Aggregate sum of prices
    .withColumnRenamed("sum(PRICE)", "YEARLY SALES") # Rename the aggregated column
    .orderBy(desc("YEARLY SALES"))  # Order by yearly sales in descending order
)
display(Year_revenue_df)

ORDER_YEAR,YEARLY SALES
2023,9990.0
2022,4350.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Calculate quarterly sales
quarter_sales_df = (
    sales_df.groupBy("ORDER_QUARTER") # Group by order quarter
    .agg({"PRICE": "SUM"})            # Aggregate sum of prices
    .withColumnRenamed("sum(PRICE)", "QUARTER SALES") # Rename the aggregated column
    .orderBy(desc("QUARTER SALES"))   # Order by quarterly sales in descending order
)
display(quarter_sales_df)

ORDER_QUARTER,QUARTER SALES
1,6600.0
2,5920.0
3,910.0
4,910.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Calculate monthly sales across different years
month_revenue_df = (
    sales_df.groupBy("ORDER_YEAR", "ORDER_MONTH") # Group by order year and month
    .agg({"PRICE": "sum"})                        # Aggregate sum of prices
    .withColumnRenamed("sum(PRICE)", "MONTH SALES") # Rename the aggregated column
    .orderBy(desc("MONTH SALES"))                 # Order by monthly sales in descending order
)
display(month_revenue_df)

ORDER_YEAR,ORDER_MONTH,MONTH SALES
2023,6,2240.0
2023,5,2240.0
2023,1,2240.0
2023,2,1680.0
2022,2,1050.0
2022,1,720.0
2022,5,720.0
2022,6,720.0
2023,7,530.0
2023,3,530.0


Databricks visualization. Run in Databricks to view.

In [0]:
# Identify top 5 most frequently sold products
FREQ_PROD = (
    sales_df.groupBy("PROD_NAME")    # Group by product name
    .agg(count(prod_df.PROD_ID).alias("FREQ_PROD")) # Count the frequency of product ID
    .orderBy("FREQ_PROD", ascending=False) # Order by frequency in descending order
    .limit(5)                         # Limit to top 5 products
)
display(FREQ_PROD)

PROD_NAME,FREQ_PROD
Cheese Pizza,48
Pepperoni,24
Margherita,21
Meat Lover’s,12
Hawaiian,6


In [0]:
# Calculate the frequency of purchases by each customer
CUST_SALES = (
    sales_df.groupBy("CUST_ID")  # Group by customer ID
    .agg(count(sales_df.CUST_ID).alias("FREQ_cust")) # Count the frequency of customer ID
    .orderBy("FREQ_cust", ascending=False) # Order by frequency in descending order
)
display(CUST_SALES)

CUST_ID,FREQ_cust
B,36
A,33
E,18
C,18
D,12


In [0]:
# Calculate the frequency of purchases by SOURCE OF ORDER
ORDER_TYPE = (
    sales_df.groupBy("ORDER_TYPE")  # Group by customer ID
    .agg(count(sales_df.ORDER_TYPE).alias("FREQ_ORDER_TYPE")) # Count the frequency of customer ID
    .orderBy("FREQ_ORDER_TYPE", ascending=False) # Order by frequency in descending order
)
display(ORDER_TYPE)

ORDER_TYPE,FREQ_ORDER_TYPE
Delivery,90
Restaurant,27


Databricks visualization. Run in Databricks to view.