In [1]:
!pip install pyspark



In [2]:
from pyspark import SparkContext,SparkConf
from datetime import datetime

In [3]:
conf=SparkConf().setAppName("TestApp").setMaster("local")
sc=SparkContext(conf=conf)

In [4]:
productsRdd=sc.textFile('/content/products.csv')

In [5]:
productsListRdd=productsRdd.map(lambda x:x.split(','))

In [6]:
productsListRdd.take(5)

[['product_id', 'product_name', 'category'],
 ['P0001', 'Product_1', 'Toys'],
 ['P0002', 'Product_2', 'Electronics'],
 ['P0003', 'Product_3', 'Electronics'],
 ['P0004', 'Product_4', 'Toys']]

In [45]:
header_products = productsListRdd.first()
productsDataRdd = productsListRdd.filter(lambda row: row != header_products)

In [46]:
productsPairRdd=productsDataRdd.map(lambda x:(x[0],x[1]))

In [47]:
productsPairRdd.take(5)

[('P0001', 'Product_1'),
 ('P0002', 'Product_2'),
 ('P0003', 'Product_3'),
 ('P0004', 'Product_4'),
 ('P0005', 'Product_5')]

In [14]:
salesRdd=sc.textFile('/content/sales.csv')

In [15]:
salesListRdd=salesRdd.map(lambda x:x.split(','))

In [16]:
salesListRdd.take(5)

[['sale_id', 'customer_id', 'product_id', 'quantity', 'price', 'sale_date'],
 ['1', '103', 'P0363', '3', '98.89', '2025-01-20'],
 ['2', '436', 'P0088', '5', '179.75', '2024-12-17'],
 ['3', '349', 'P0139', '1', '96.37', '2024-09-27'],
 ['4', '271', 'P0120', '3', '42.19', '2025-01-22']]

In [26]:
header_sales = salesListRdd.first()
salesDataRdd = salesListRdd.filter(lambda x: x != header_sales)

In [61]:
monthlyCounts = salesDataRdd.map(lambda x: (datetime.strptime(x[5], "%Y-%m-%d").strftime("%Y-%m"),(x[2], int(x[3]))))

In [62]:
monthlyCounts.take(5)

[('2025-01', ('P0363', 3)),
 ('2024-12', ('P0088', 5)),
 ('2024-09', ('P0139', 1)),
 ('2025-01', ('P0120', 3)),
 ('2025-02', ('P0313', 5))]

In [64]:
mPQuantity = monthlyCounts.map(lambda x: ((x[0], x[1][0]), x[1][1])) \
                                         .reduceByKey(lambda a, b: a + b)

In [65]:
mPQuantity.take(5)

[(('2025-01', 'P0363'), 283),
 (('2024-12', 'P0088'), 337),
 (('2024-09', 'P0139'), 338),
 (('2025-01', 'P0120'), 306),
 (('2025-02', 'P0313'), 269)]

In [67]:
topSellingPPM = mPQuantity.map(lambda x: (x[0][0], (x[1], x[0][1]))) \
                                                  .reduceByKey(lambda a, b: a if a[0] > b[0] else b)

In [68]:
topSellingPPM.take(5)

[('2025-01', (403, 'P0482')),
 ('2024-12', (417, 'P0067')),
 ('2024-09', (422, 'P0204')),
 ('2025-02', (371, 'P0267')),
 ('2024-10', (410, 'P0144'))]

In [74]:
topPermonth=topSellingPPM.map(lambda x:(x[1][1],(x[0],x[1][0])))

In [75]:
joinedRDD = topPermonth.join(productsPairRdd)

In [76]:
finalResultRdd = joinedRDD.map(lambda x: (x[1][0][0], x[1][1], x[1][0][1]))

In [77]:
finalResultRdd.take(5)

[('2025-01', 'Product_482', 403),
 ('2024-09', 'Product_204', 422),
 ('2025-02', 'Product_267', 371),
 ('2024-05', 'Product_393', 150),
 ('2024-08', 'Product_449', 390)]

In [78]:
finalResultRdd.saveAsTextFile('/content/top_selling_products_by_month')