<a href="https://colab.research.google.com/github/fvgm-spec/coderoad-de-bootcamp/blob/main/top_most_sold.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
## Intialize PySpark
!pip install pyspark
!pip install -q findspark



In [None]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, desc
import pyspark

findspark.init()
spark = SparkSession.builder.master("local[*]").appName("Spark Session").getOrCreate()

## Reading CVS fils for Products and Sales

In [None]:
products_df = (spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load('/content/drive/MyDrive/data/PRODUCTS_.csv')
  )

products_df.printSchema()
products_df.show()

root
 |-- gtin: long (nullable = true)
 |-- categoryParent: string (nullable = true)
 |-- colorLabel: integer (nullable = true)
 |-- modelLabel: string (nullable = true)
 |-- productLabelLong: string (nullable = true)
 |-- sizeLabel: string (nullable = true)
 |-- productCode: long (nullable = true)

+------------+------------------+----------+----------+--------------------+---------+------------+
|        gtin|    categoryParent|colorLabel|modelLabel|    productLabelLong|sizeLabel| productCode|
+------------+------------------+----------+----------+--------------------+---------+------------+
|795570602555|     710-LS WOVENS|       126|  4HRW7018|LS SNS DOBBY TEXT...|        M|795570602555|
|795570643275|         730-POLOS|        10|  4HRK7125|SS SLD 3BTN PLO R...|        S|795570643275|
|713610532624|        910-WOVENS|       996|  CUWFC21B|SS L/C DOBBY CAMP...|     3XLT|713610532624|
|795570983463|     710-LS WOVENS|       394|  4IRW7001|LS NON-IRON TWILL...|      XXL|795570983463|

In [None]:
sales_df = (spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load('/content/drive/MyDrive/data/SALES_.csv')
  )

sales_df.printSchema()
sales_df.show()

root
 |-- date: date (nullable = true)
 |-- site_code: integer (nullable = true)
 |-- sku: long (nullable = true)
 |-- sales_qty: integer (nullable = true)

+----------+---------+------------+---------+
|      date|site_code|         sku|sales_qty|
+----------+---------+------------+---------+
|2024-11-14|      524|795570789591|        1|
|2024-11-13|      549| 98593095718|        3|
|2024-11-14|      615| 84971953411|        1|
|2024-11-12|      524| 94833171273|        1|
|2024-11-14|      510|888987766696|        1|
|2024-11-13|      613|795570036558|        1|
|2024-11-12|      601| 15712078292|        1|
|2024-11-12|      543|795570790726|        1|
|2024-11-12|      613|795570930368|        1|
|2024-11-12|      523| 15712069351|        1|
|2024-11-13|      522| 15712078049|        1|
|2024-11-12|      566| 15712078285|        1|
|2024-11-12|      564| 84971909791|        1|
|2024-11-12|      656|889817491467|        1|
|2024-11-14|      549| 94833171679|        1|
|2024-11-13|   

In [None]:
# Print schema of dataframes to verify data types
print("Sales DataFrame Schema:")
sales_df.printSchema()

print("Products DataFrame Schema:")
products_df.printSchema()

Sales DataFrame Schema:
root
 |-- date: date (nullable = true)
 |-- site_code: integer (nullable = true)
 |-- sku: long (nullable = true)
 |-- sales_qty: integer (nullable = true)

Products DataFrame Schema:
root
 |-- gtin: long (nullable = true)
 |-- categoryParent: string (nullable = true)
 |-- colorLabel: integer (nullable = true)
 |-- modelLabel: string (nullable = true)
 |-- productLabelLong: string (nullable = true)
 |-- sizeLabel: string (nullable = true)
 |-- productCode: long (nullable = true)



In [None]:
# Clean the sales data - filter out null SKUs
valid_sales_df = sales_df.filter(col("sku").isNotNull())

In [None]:
# Aggregate sales data to get total sales quantity by SKU
sales_by_sku = valid_sales_df.groupBy("sku") \
    .agg(sum("sales_qty").alias("total_sales")) \
    .orderBy(desc("total_sales"))

In [None]:
# Show top sales results
print("Top SKUs by Sales Quantity:")
sales_by_sku.show(10)

Top SKUs by Sales Quantity:
+------------+-----------+
|         sku|total_sales|
+------------+-----------+
| 98593095718|         19|
| 94833167559|         14|
|888987484637|         13|
|795570226706|          9|
| 30672047225|          9|
|795570511062|          9|
| 98593212146|          8|
|795570226720|          8|
| 94833167542|          8|
| 15712078292|          8|
+------------+-----------+
only showing top 10 rows



In [None]:
# Join sales data with products data to get categories
sales_with_categories = sales_by_sku.join(
    products_df.select("productCode", "categoryParent"),
    sales_by_sku["sku"] == products_df["productCode"],
    "inner"
)

In [None]:
# Get the top 2 products with their categories
top_2_products = sales_with_categories.orderBy(desc("total_sales")).limit(2)

In [None]:
# Show final results
print("Top 2 Most Sold Products with Parent Categories:")
top_2_products.select("sku", "total_sales", "categoryParent").show()

# Display categories only
print("Parent Categories of Top 2 Most Sold Products:")
top_2_products.select("categoryParent").show()

Top 2 Most Sold Products with Parent Categories:
+------------+-----------+---------------+
|         sku|total_sales| categoryParent|
+------------+-----------+---------------+
| 94833167559|         14|  710-LS WOVENS|
|888987484637|         13|770-ACCESSORIES|
+------------+-----------+---------------+

Parent Categories of Top 2 Most Sold Products:
+---------------+
| categoryParent|
+---------------+
|  710-LS WOVENS|
|770-ACCESSORIES|
+---------------+

