## Prep steps:

In [1]:
! pip install pyspark

Defaulting to user installation because normal site-packages is not writeable


In [5]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("regionSalesHomework").setMaster("local[*]")

sc = SparkContext(conf=conf)

Picked up JAVA_TOOL_OPTIONS: -XX:+UseContainerSupport -XX:ActiveProcessorCount=1
Picked up JAVA_TOOL_OPTIONS: -XX:+UseContainerSupport -XX:ActiveProcessorCount=1
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/25 19:00:05 WARN Utils: Your hostname, saradawner2-trng2224dat-1l4u3l62pul, resolves to a loopback address: 127.0.0.1; using 10.0.5.2 instead (on interface eth0)
25/06/25 19:00:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/25 19:00:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Homework:

### 1. Load the Dataset

In [44]:
region_sales_raw = sc.textFile("file:////workspace/TRNG-2224-data-engineering/week1/datasets/region_sales.txt")

In [43]:
region_records = region_sales_raw.map(lambda x: x.split(",")).map(lambda x: (int(x[0]), x[1], x[2], int(x[3])))

### 2. Broadcast variable and set accumulator

In [52]:
category_discounts = {"Electronics": 0.10, "Furniture": 0.15, "Clothing": 0.05, "Books": 0.20}
broadcast_category_discounts = sc.broadcast(category_discounts)

In [59]:
below_300_count = sc.accumulator(0)

### 3. Calculate and return the RDD

In [54]:
def enrich_and_count(record):
    product_id, region, category, original_amount = record
    discount_applied = (1 - broadcast_category_discounts.value[category]) * original_amount
    below_300_count.add(1) if discount_applied < 300 else None
    return (product_id, region, category, original_amount, discount_applied)

In [60]:
enriched_region_records = region_records.map(enrich_and_count)
enriched_region_records.collect()

[(2001, 'South', 'Electronics', 838, 754.2),
 (2002, 'West', 'Furniture', 471, 400.34999999999997),
 (2003, 'North', 'Electronics', 803, 722.7),
 (2004, 'West', 'Furniture', 174, 147.9),
 (2005, 'South', 'Clothing', 590, 560.5),
 (2006, 'North', 'Furniture', 937, 796.4499999999999),
 (2007, 'North', 'Electronics', 391, 351.90000000000003),
 (2008, 'West', 'Electronics', 961, 864.9),
 (2009, 'North', 'Electronics', 305, 274.5),
 (2010, 'East', 'Clothing', 213, 202.35),
 (2011, 'East', 'Electronics', 615, 553.5),
 (2012, 'South', 'Clothing', 573, 544.35),
 (2013, 'East', 'Clothing', 352, 334.4),
 (2014, 'West', 'Clothing', 768, 729.5999999999999),
 (2015, 'North', 'Electronics', 231, 207.9),
 (2016, 'East', 'Furniture', 217, 184.45),
 (2017, 'West', 'Clothing', 346, 328.7),
 (2018, 'North', 'Books', 375, 300.0),
 (2019, 'South', 'Electronics', 313, 281.7),
 (2020, 'West', 'Furniture', 903, 767.55),
 (2021, 'West', 'Clothing', 904, 858.8),
 (2022, 'North', 'Furniture', 812, 690.1999999999

### 4. Save enriched table and display accumulator result

In [56]:
enriched_region_records.map(lambda x: ",".join(map(str,x))).coalesce(1).saveAsTextFile("enriched_region_sales")

In [61]:
below_300_count.value

25