In [1]:
!pip install pyspark



In [58]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql.functions import month, year, quarter, count,countDistinct, sum, col,max

In [2]:
spark = SparkSession.builder \
    .appName("Sales Data Analysis") \
    .getOrCreate()


In [4]:
schema = StructType([
    StructField("product_id",IntegerType(),True),
    StructField("customer_id",StringType(),True),
    StructField("order_date",DateType(),True),
    StructField("location",StringType(),True),
    StructField("source_order",StringType(),True)
])
sales_df = spark.read.format("csv").option("inferschema","true").schema(schema).load("/content/sales.csv.txt")

In [11]:
sales_df.show(10)

+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|product_id|customer_id|order_date|location|source_order|order_year|order_month|order_quarter|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|          1|            1|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|          1|            1|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|          1|            1|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|          1|            1|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|          1|            1|
|         3|          A|2023-01-11|   India|  Restaurant|      2023|          1|            1|
|         2|          B|2022-02-01|   India|      Swiggy|      2022|          2|            1|
|         2|          B|2023-01-02|   India|      

In [7]:
sales_df = sales_df.withColumn("order_year", year(sales_df.order_date))
sales_df = sales_df.withColumn("order_month", month(sales_df.order_date))
sales_df = sales_df.withColumn("order_quarter", quarter(sales_df.order_date))

In [9]:
display(sales_df)

DataFrame[product_id: int, customer_id: string, order_date: date, location: string, source_order: string, order_year: int, order_month: int, order_quarter: int]

In [14]:
schema = StructType([
    StructField("product_id",IntegerType(),True),
    StructField("product_name",StringType(),True),
    StructField("price",StringType(),True),

])
menu_df = spark.read.format("csv").option("inferschema","true").schema(schema).load("/content/menu.csv.txt")

In [15]:
menu_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
|         6|       Pasta|  180|
+----------+------------+-----+



In [46]:
total_amount_spent = (
    sales_df.join(menu_df, "product_id")
    .groupBy("customer_id")
    .agg(sum("price").alias("total_spent"))
    .orderBy("customer_id")
)
total_amount_spent.show()

+-----------+-----------+
|customer_id|total_spent|
+-----------+-----------+
|          A|     4260.0|
|          B|     4440.0|
|          C|     2400.0|
|          D|     1200.0|
|          E|     2040.0|
+-----------+-----------+



In [47]:
total_product_sales = (
    sales_df.join(menu_df, "product_id")
    .groupBy("product_name")
    .agg(sum("price").alias("total_sales"))
    .orderBy("product_name")
)
total_product_sales.show()

+------------+-----------+
|product_name|total_sales|
+------------+-----------+
|     Biryani|      480.0|
|     Chowmin|     3600.0|
|        Dosa|     1320.0|
|       PIZZA|     2100.0|
|       Pasta|     1080.0|
|    sandwich|     5760.0|
+------------+-----------+



In [48]:
total_month_sales = (
    sales_df.join(menu_df, "product_id")
    .groupBy("order_month")
    .agg(sum("price").alias("total_sales"))
    .orderBy("order_month")
)
total_month_sales.show()

+-----------+-----------+
|order_month|total_sales|
+-----------+-----------+
|          1|     2960.0|
|          2|     2730.0|
|          3|      910.0|
|          5|     2960.0|
|          6|     2960.0|
|          7|      910.0|
|         11|      910.0|
+-----------+-----------+



In [50]:
total_year_sales = (
    sales_df.join(menu_df, "product_id")
    .groupBy("order_year")
    .agg(sum("price").alias("total_yearly_sales"))
    .orderBy("order_year")
)
total_year_sales.show()

+----------+------------------+
|order_year|total_yearly_sales|
+----------+------------------+
|      2022|            4350.0|
|      2023|            9990.0|
+----------+------------------+



In [52]:
total_quaterly_sales = (
      sales_df.join(menu_df,'product_id')
      .groupBy('order_quarter')
      .agg(sum('price').alias('total_quaterly_sales')).orderBy('order_quarter')

                      )

total_quaterly_sales.show()

+-------------+--------------------+
|order_quarter|total_quaterly_sales|
+-------------+--------------------+
|            1|              6600.0|
|            2|              5920.0|
|            3|               910.0|
|            4|               910.0|
+-------------+--------------------+



In [56]:
total_product_count = (
      sales_df.join(menu_df,'product_id')
      .groupBy('product_id','product_name')
      .agg(count("product_id")
      .alias('product_count'))
      .orderBy(col('product_count').desc())
      .drop("product_id")

                      )

total_product_count.show()

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|       PIZZA|           21|
|        Dosa|           12|
|     Biryani|            6|
|       Pasta|            6|
+------------+-------------+



In [36]:
customer_freq = (sales_df.filter(sales_df.source_order=='Restaurant').groupBy('customer_id').agg(countDistinct('order_date').alias('distinct_order_count'))
)

customer_freq.show()

+-----------+--------------------+
|customer_id|distinct_order_count|
+-----------+--------------------+
|          E|                   5|
|          B|                   6|
|          D|                   1|
|          C|                   3|
|          A|                   6|
+-----------+--------------------+



In [42]:
sales_country = (
    sales_df
    .join(menu_df, 'product_id')
    .groupBy('location')
    .agg(sum('price').alias('total_sales'))
    .orderBy('location')
)
sales_country.show()

+--------+-----------+
|location|total_sales|
+--------+-----------+
|   India|     4860.0|
|      UK|     7020.0|
|     USA|     2460.0|
+--------+-----------+



In [45]:
sales_source = (
    sales_df
    .join(menu_df, 'product_id')
    .groupBy('source_order')
    .agg(sum('price').alias('total_sales'))
    .orderBy('total_sales')
)
sales_source.show()

+------------+-----------+
|source_order|total_sales|
+------------+-----------+
|  Restaurant|     3090.0|
|      zomato|     4920.0|
|      Swiggy|     6330.0|
+------------+-----------+



In [57]:
clv = (
    sales_df.join(menu_df, "product_id")
    .groupBy("customer_id", "order_year")
    .agg(sum("price").alias("yearly_spent"))
    .orderBy("customer_id", "order_year")
)
clv.show()


+-----------+----------+------------+
|customer_id|order_year|yearly_spent|
+-----------+----------+------------+
|          A|      2022|      1620.0|
|          A|      2023|      2640.0|
|          B|      2022|      1260.0|
|          B|      2023|      3180.0|
|          C|      2022|       540.0|
|          C|      2023|      1860.0|
|          D|      2022|       600.0|
|          D|      2023|       600.0|
|          E|      2022|       330.0|
|          E|      2023|      1710.0|
+-----------+----------+------------+



In [59]:
recency_frequency = (
    sales_df.groupBy("customer_id")
    .agg(
        max("order_date").alias("last_order_date"),
        count("order_date").alias("order_count")
    )
    .orderBy("customer_id")
)
recency_frequency.show()

+-----------+---------------+-----------+
|customer_id|last_order_date|order_count|
+-----------+---------------+-----------+
|          A|     2023-06-11|         33|
|          B|     2023-11-11|         36|
|          C|     2023-06-11|         18|
|          D|     2023-11-06|         12|
|          E|     2023-11-11|         18|
+-----------+---------------+-----------+



In [60]:
top_products_by_source = (
    sales_df.join(menu_df, "product_id")
    .groupBy("source_order", "product_name")
    .agg(count("product_id").alias("sales_count"))
    .orderBy("source_order", col("sales_count").desc())
)
top_products_by_source.show()

+------------+------------+-----------+
|source_order|product_name|sales_count|
+------------+------------+-----------+
|  Restaurant|    sandwich|         18|
|  Restaurant|       PIZZA|          6|
|  Restaurant|        Dosa|          3|
|      Swiggy|     Chowmin|         18|
|      Swiggy|    sandwich|         15|
|      Swiggy|       PIZZA|         15|
|      Swiggy|        Dosa|          3|
|      zomato|    sandwich|         15|
|      zomato|        Dosa|          6|
|      zomato|     Biryani|          6|
|      zomato|       Pasta|          6|
|      zomato|     Chowmin|          6|
+------------+------------+-----------+



In [61]:
seasonal_trends = (
    sales_df.join(menu_df, "product_id")
    .groupBy("order_month", "product_name")
    .agg(sum("price").alias("monthly_sales"))
    .orderBy("order_month", "monthly_sales", ascending=False)
)
seasonal_trends.show()


+-----------+------------+-------------+
|order_month|product_name|monthly_sales|
+-----------+------------+-------------+
|         11|    sandwich|        360.0|
|         11|        Dosa|        220.0|
|         11|     Chowmin|        150.0|
|         11|       PIZZA|        100.0|
|         11|     Biryani|         80.0|
|          7|    sandwich|        360.0|
|          7|        Dosa|        220.0|
|          7|     Chowmin|        150.0|
|          7|       PIZZA|        100.0|
|          7|     Biryani|         80.0|
|          6|    sandwich|       1200.0|
|          6|     Chowmin|        900.0|
|          6|       PIZZA|        500.0|
|          6|       Pasta|        360.0|
|          5|    sandwich|       1200.0|
|          5|     Chowmin|        900.0|
|          5|       PIZZA|        500.0|
|          5|       Pasta|        360.0|
|          3|    sandwich|        360.0|
|          3|        Dosa|        220.0|
+-----------+------------+-------------+
only showing top

In [62]:
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

window_spec = Window.orderBy("order_year")

sales_growth = (
    sales_df.join(menu_df, "product_id")
    .groupBy("order_year")
    .agg(sum("price").alias("yearly_sales"))
    .withColumn("previous_sales", lag("yearly_sales").over(window_spec))
    .withColumn(
        "growth_rate",
        ((col("yearly_sales") - col("previous_sales")) / col("previous_sales") * 100).alias("growth_rate")
    )
)
sales_growth.show()


+----------+------------+--------------+------------------+
|order_year|yearly_sales|previous_sales|       growth_rate|
+----------+------------+--------------+------------------+
|      2022|      4350.0|          NULL|              NULL|
|      2023|      9990.0|        4350.0|129.65517241379308|
+----------+------------+--------------+------------------+



In [63]:
location_analysis = (
    sales_df.join(menu_df, "product_id")
    .groupBy("location", "product_name")
    .agg(sum("price").alias("location_sales"))
    .orderBy("location", col("location_sales").desc())
)
location_analysis.show()


+--------+------------+--------------+
|location|product_name|location_sales|
+--------+------------+--------------+
|   India|    sandwich|        2160.0|
|   India|     Chowmin|        1800.0|
|   India|       PIZZA|         900.0|
|      UK|    sandwich|        2880.0|
|      UK|     Chowmin|        1800.0|
|      UK|       PIZZA|         900.0|
|      UK|        Dosa|         660.0|
|      UK|       Pasta|         540.0|
|      UK|     Biryani|         240.0|
|     USA|    sandwich|         720.0|
|     USA|        Dosa|         660.0|
|     USA|       Pasta|         540.0|
|     USA|       PIZZA|         300.0|
|     USA|     Biryani|         240.0|
+--------+------------+--------------+

