In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession  # Entry point to start our Spark application
from pyspark.sql import functions as F  # Used for data transformations
from pyspark.sql.window import Window  # Window functions for partitioning operations on data

In [3]:
# Initialize a SparkSession
spark = SparkSession.builder.appName("Customer Sales Analysis").getOrCreate()

In [4]:
# Sales data
sales_data = [
    ("2024-01-15", 1, 1, 10, 20.0),
    ("2024-03-14", 2, 1, 10, 20.0),
]

In [5]:
# Define schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

schema = StructType([
    StructField("sale_date", StringType(), True),
    StructField("store_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", FloatType(), True),
])

In [6]:
# Create DataFrame
sales_df = spark.createDataFrame(sales_data, schema=schema)

In [7]:
# Show DataFrame
sales_df.show()

+----------+--------+----------+--------+-----+
| sale_date|store_id|product_id|quantity|price|
+----------+--------+----------+--------+-----+
|2024-01-15|       1|         1|      10| 20.0|
|2024-03-14|       2|         1|      10| 20.0|
+----------+--------+----------+--------+-----+



In [8]:
# Add a calculated column for total sales (quantity * price)
sales_df = sales_df.withColumn("total_sales", F.col("quantity") * F.col("price"))
sales_df.show()

+----------+--------+----------+--------+-----+-----------+
| sale_date|store_id|product_id|quantity|price|total_sales|
+----------+--------+----------+--------+-----+-----------+
|2024-01-15|       1|         1|      10| 20.0|      200.0|
|2024-03-14|       2|         1|      10| 20.0|      200.0|
+----------+--------+----------+--------+-----+-----------+



In [9]:
# Aggregate total sales per store
sales_by_store = sales_df.groupBy("store_id").agg(F.sum("total_sales").alias("total_store_sales"))
sales_by_store.show()

+--------+-----------------+
|store_id|total_store_sales|
+--------+-----------------+
|       1|            200.0|
|       2|            200.0|
+--------+-----------------+



In [10]:
# Calculate the average sales per product
average_sales = sales_df.groupBy("product_id").agg(F.avg("total_sales").alias("average_product_sales"))
average_sales.show()

+----------+---------------------+
|product_id|average_product_sales|
+----------+---------------------+
|         1|                200.0|
+----------+---------------------+



In [11]:
# Sort sales by date
sales_sorted = sales_df.orderBy("sale_date")
sales_sorted.show()

+----------+--------+----------+--------+-----+-----------+
| sale_date|store_id|product_id|quantity|price|total_sales|
+----------+--------+----------+--------+-----+-----------+
|2024-01-15|       1|         1|      10| 20.0|      200.0|
|2024-03-14|       2|         1|      10| 20.0|      200.0|
+----------+--------+----------+--------+-----+-----------+



In [12]:
# Use a window function to calculate cumulative sales by store
window_spec = Window.partitionBy("store_id").orderBy("sale_date")
sales_df = sales_df.withColumn("cumulative_sales", F.sum("total_sales").over(window_spec))
sales_df.show()

+----------+--------+----------+--------+-----+-----------+----------------+
| sale_date|store_id|product_id|quantity|price|total_sales|cumulative_sales|
+----------+--------+----------+--------+-----+-----------+----------------+
|2024-01-15|       1|         1|      10| 20.0|      200.0|           200.0|
|2024-03-14|       2|         1|      10| 20.0|      200.0|           200.0|
+----------+--------+----------+--------+-----+-----------+----------------+

