**Costumer Spening Analysis**

In this project, we simulate collecting data from a business, storing in it Spark, and then using SparkSQL to get insight regarding specific requests from the client, which will be later used to analyze the data in order to find trends.

In [None]:
!pip install pyspark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

#Starting the spark session
spark = SparkSession.builder.appName("Costumer Sales Analysis").getOrCreate() #initialized a Spark session

#Simulating sales data, defining columns, and creating its Spark dataframe
sales_data = [
    ("2024-01-15",1,1,10,20.0,1),
    ("2024-03-14",2,4,23,25.0,2),
    ("2024-04-15",3,2,30,15.0,3),
    ("2024-05-16",4,1,32,45.0,4),
    ("2024-07-19",5,4,15,22.0,5),
]

sales_schema = ["Order_date", "Order_id", "Costumer_id", "Quantity_sold", "Price", "Product_Id"]
sales_df = spark.createDataFrame(sales_data, sales_schema)
sales_df.createOrReplaceTempView("Sales")

#Simulating costumer data, defining columns, and creating its Spark dataframe
costumer_data = [
    (1, "Costumer_A"),
    (2, "Costumer_B"),
    (3, "Costumer_C"),
    (4, "Costumer_D"),
]

costumer_data_schema = ["Costumer_ID", "Costumer_Name"]
costumer_data_df = spark.createDataFrame(costumer_data, costumer_data_schema)
costumer_data_df.createOrReplaceTempView("Costumers")

#Simulating products data, defining columns, and creating its Spark dataframe
products_data = [
    (1, "Product_A","Electronics"),
    (2, "Product_B","Furniture"),
    (3, "Product_C","Electronics"),
    (4, "Product_D","Home_Appliances"),
    (5, "Product_E", "Furniture"),
]

product_schema = ["Product_ID", "Product_Name", "Product_Category"]
product_data_df = spark.createDataFrame(products_data, product_schema)
product_data_df.createOrReplaceTempView("Products")

#Simulating caterogies data, defining columns, and creating its Spark dataframe
categories_data = [
    ("Electronics", "High-Tech _Gadgets"),
    ("Furniture", "Household_Items"),
    ("Home_Appliances","Utility_Goods")
]

categories_data_schema = ["Category", "Category_Description"]
categories_data_df = spark.createDataFrame(categories_data, categories_data_schema)
categories_data_df.createOrReplaceTempView("Categories")

#Joining tables
joined_data = spark.sql("""
  SELECT s.Order_date, s.Quantity_sold, s.Price,
  c.Costumer_Name,
  p.Product_Name, p.Product_Category,
  (s.Quantity_sold * s.Price) AS Total_Spent
  FROM Sales s
  JOIN Costumers c ON s.Costumer_Id = c.Costumer_Id
  JOIN Products p ON s.Product_Id = p.Product_Id
  JOIN Categories cat ON p.Product_Category = cat.category
""")
joined_data.createOrReplaceTempView("Joined_Sales")

#Caulculate total spending
costumer_spending = spark.sql("""
  SELECT Costumer_Name, SUM(Total_Spent) AS Total_spending
  FROM Joined_Sales
  GROUP BY Costumer_Name
  ORDER BY Total_spending DESC
""")

joined_data.createOrReplaceTempView("Joined_Sales")
costumer_spending.show()

#Top products for each category
top_products_by_category = spark.sql("""
  SELECT *
  FROM(
    SELECT Product_Category, Product_Name, SUM(Quantity_Sold) AS Total_Quantity_Sold,
      ROW_NUMBER() OVER (PARTITION BY Product_Category ORDER BY SUM(Quantity_Sold) DESC) AS rank
    From Joined_Sales
    Group BY Product_Category, Product_Name
    ) ranked_products
    WHERE rank = 1
""")
top_products_by_category.show()
top_products_by_category.createOrReplaceTempView("Top_Products")

#Filter to show only results for the 'Electronics' category
electronics_data = spark.sql("""
    SELECT Costumer_Name, Product_Name, Total_Spent
    FROM Joined_Sales
    WHERE Product_Category = 'Electronics'
    ORDER BY Total_Spent DESC
""")
electronics_data.show()


#Stopping Spark Session
spark.stop()

+-------------+--------------+
|Costumer_Name|Total_spending|
+-------------+--------------+
|   Costumer_A|        1640.0|
|   Costumer_D|         905.0|
|   Costumer_B|         450.0|
+-------------+--------------+

+----------------+------------+-------------------+----+
|Product_Category|Product_Name|Total_Quantity_Sold|rank|
+----------------+------------+-------------------+----+
|     Electronics|   Product_C|                 30|   1|
|       Furniture|   Product_B|                 23|   1|
| Home_Appliances|   Product_D|                 32|   1|
+----------------+------------+-------------------+----+

+-------------+------------+-----------+
|Costumer_Name|Product_Name|Total_Spent|
+-------------+------------+-----------+
|   Costumer_B|   Product_C|      450.0|
|   Costumer_A|   Product_A|      200.0|
+-------------+------------+-----------+

