# RDD

an RDD is the fundamental data structure of Apache Spark. It's a fault-tolerant, distributed collection of elements that can be operated on in parallel.

**Key Characteristics:**

- Immutable
- Lazy evaluation
- Fault tolerant (via lineage info)
- Partitioned across cluster nodes
- Can be cached in memory

### SparkContext and SparkConf


SparkContext is the entry point for Spark functionality.

#### `SparkConf`

- Configuration for Spark application

**Common settings:**

- setMaster("local[*]") – Use local mode with all cores
- setAppName("RDDExample") – Application name

e.g.,


### transformations

Transformations create a new RDD from an existing one. They are lazy – not executed until an action is triggered.

| Transformation  | Description                                          |
| --------------- | ---------------------------------------------------- |
| `map(func)`     | Returns a new RDD by applying `func` to each element |
| `filter(func)`  | Filters elements for which `func` returns true       |
| `flatMap(func)` | Like map but flattens the result                     |
| `distinct()`    | Removes duplicates                                   |
| `union(rdd)`    | Combines two RDDs                                    |
| `groupByKey()`  | Groups values with same key                          |
| `reduceByKey()` | Aggregates values with same key using a function     |
| `sortBy(func)`  | Sorts RDD by computed key                            |


### actions

Actions trigger computation and return results or write data.

| Action             | Description                            |
| ------------------ | -------------------------------------- |
| `collect()`        | Returns all elements to driver         |
| `count()`          | Returns number of elements             |
| `first()`          | Returns first element                  |
| `take(n)`          | Returns first `n` elements             |
| `reduce(func)`     | Reduces elements using binary operator |
| `saveAsTextFile()` | Writes RDD to text files               |




reference - [spark rdd docs](https://spark.apache.org/docs/latest/rdd-programming-guide.html)


In [1]:
! pip install pyspark

# pyspark 4.0.0 - jdk 17 or above



In [None]:
! pip install pyspark==3.5.0

# pyspark 3.5 - jdk 11

In [1]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("regionSalesDemo").setMaster("local[*]")

sc = SparkContext(conf=conf)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/29 19:44:01 WARN Utils: Your hostname, codespaces-b78a22, resolves to a loopback address: 127.0.0.1; using 10.0.2.187 instead (on interface eth0)
25/07/29 19:44:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/29 19:44:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
sc.defaultParallelism

2

In [3]:
sc

In [13]:
sales_raw = sc.textFile("file:///workspaces/TRNG-2235-DB-Analyst/week1/datasets/region_sales_data.txt")

In [5]:
sales_raw.collect()

['2001,South,Electronics,838',
 '2002,West,Furniture,471',
 '2003,North,Electronics,803',
 '2004,West,Furniture,174',
 '2005,South,Clothing,590',
 '2006,North,Furniture,937',
 '2007,North,Electronics,391',
 '2008,West,Electronics,961',
 '2009,North,Electronics,305',
 '2010,East,Clothing,213',
 '2011,East,Electronics,615',
 '2012,South,Clothing,573',
 '2013,East,Clothing,352',
 '2014,West,Clothing,768',
 '2015,North,Electronics,231',
 '2016,East,Furniture,217',
 '2017,West,Clothing,346',
 '2018,North,Books,375',
 '2019,South,Electronics,313',
 '2020,West,Furniture,903',
 '2021,West,Clothing,904',
 '2022,North,Furniture,812',
 '2023,West,Furniture,590',
 '2024,North,Books,452',
 '2025,North,Books,697',
 '2026,East,Clothing,959',
 '2027,North,Books,661',
 '2028,South,Books,700',
 '2029,East,Clothing,832',
 '2030,North,Furniture,163',
 '2031,South,Books,413',
 '2032,North,Furniture,952',
 '2033,North,Books,240',
 '2034,East,Clothing,740',
 '2035,South,Clothing,264',
 '2036,West,Books,211',

In [14]:
records = sales_raw.map(lambda x:x.split(","))

records.take(4)

[['2001', 'South', 'Electronics', '838'],
 ['2002', 'West', 'Furniture', '471'],
 ['2003', 'North', 'Electronics', '803'],
 ['2004', 'West', 'Furniture', '174']]

In [15]:
records = records.map(lambda x:(int(x[0]), x[1], x[2], int(x[3])))
records.take(9)

[(2001, 'South', 'Electronics', 838),
 (2002, 'West', 'Furniture', 471),
 (2003, 'North', 'Electronics', 803),
 (2004, 'West', 'Furniture', 174),
 (2005, 'South', 'Clothing', 590),
 (2006, 'North', 'Furniture', 937),
 (2007, 'North', 'Electronics', 391),
 (2008, 'West', 'Electronics', 961),
 (2009, 'North', 'Electronics', 305)]

In [61]:
# optional rdd- list
rdd = sc.parallelize([
    "2001,South,Ele,123",
    "2002,West,Furniture,345"
])

pairs = rdd.map(lambda line:[
    (line.split(",")[1], int(line.split(",")[3])),
    (line.split(",")[2], int(line.split(",")[3])),
])

pairs = pairs.flatMap(lambda x: x)


pairs.collect()

[('South', 123), ('Ele', 123), ('West', 345), ('Furniture', 345)]

In [None]:
# total sales by category and region

category_sales = records.map(lambda x:((x[1]+x[2]), x[3]))
category_sales.collect()

[('SouthElectronics', 838),
 ('WestFurniture', 471),
 ('NorthElectronics', 803),
 ('WestFurniture', 174),
 ('SouthClothing', 590),
 ('NorthFurniture', 937),
 ('NorthElectronics', 391),
 ('WestElectronics', 961),
 ('NorthElectronics', 305),
 ('EastClothing', 213),
 ('EastElectronics', 615),
 ('SouthClothing', 573),
 ('EastClothing', 352),
 ('WestClothing', 768),
 ('NorthElectronics', 231),
 ('EastFurniture', 217),
 ('WestClothing', 346),
 ('NorthBooks', 375),
 ('SouthElectronics', 313),
 ('WestFurniture', 903),
 ('WestClothing', 904),
 ('NorthFurniture', 812),
 ('WestFurniture', 590),
 ('NorthBooks', 452),
 ('NorthBooks', 697),
 ('EastClothing', 959),
 ('NorthBooks', 661),
 ('SouthBooks', 700),
 ('EastClothing', 832),
 ('NorthFurniture', 163),
 ('SouthBooks', 413),
 ('NorthFurniture', 952),
 ('NorthBooks', 240),
 ('EastClothing', 740),
 ('SouthClothing', 264),
 ('WestBooks', 211),
 ('EastClothing', 103),
 ('NorthElectronics', 462),
 ('WestElectronics', 479),
 ('EastElectronics', 666),
 

In [23]:
# total sales by category

category_sales = records.map(lambda x:(x[2], x[3]))
category_sales.collect()

[('Electronics', 838),
 ('Furniture', 471),
 ('Electronics', 803),
 ('Furniture', 174),
 ('Clothing', 590),
 ('Furniture', 937),
 ('Electronics', 391),
 ('Electronics', 961),
 ('Electronics', 305),
 ('Clothing', 213),
 ('Electronics', 615),
 ('Clothing', 573),
 ('Clothing', 352),
 ('Clothing', 768),
 ('Electronics', 231),
 ('Furniture', 217),
 ('Clothing', 346),
 ('Books', 375),
 ('Electronics', 313),
 ('Furniture', 903),
 ('Clothing', 904),
 ('Furniture', 812),
 ('Furniture', 590),
 ('Books', 452),
 ('Books', 697),
 ('Clothing', 959),
 ('Books', 661),
 ('Books', 700),
 ('Clothing', 832),
 ('Furniture', 163),
 ('Books', 413),
 ('Furniture', 952),
 ('Books', 240),
 ('Clothing', 740),
 ('Clothing', 264),
 ('Books', 211),
 ('Clothing', 103),
 ('Electronics', 462),
 ('Electronics', 479),
 ('Electronics', 666),
 ('Clothing', 347),
 ('Electronics', 435),
 ('Clothing', 669),
 ('Electronics', 715),
 ('Electronics', 232),
 ('Electronics', 569),
 ('Books', 989),
 ('Clothing', 507),
 ('Clothing',

In [24]:
total_sales_by_category = category_sales.reduceByKey(lambda x,y:x+y)

In [25]:
total_sales_by_category.collect()

[('Furniture', 12832),
 ('Clothing', 14410),
 ('Electronics', 15593),
 ('Books', 10739)]

In [31]:
# average sales per category

average_sales_by_category = category_sales.mapValues(lambda x:(x,1)) \
                                .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1])) \
                                .mapValues(lambda x: x[0]/x[1])

average_sales_by_category.collect()

[('Furniture', 583.2727272727273),
 ('Clothing', 514.6428571428571),
 ('Electronics', 556.8928571428571),
 ('Books', 488.1363636363636)]

In [32]:
# highgest selling category

highest_selling_category = total_sales_by_category.max(key=lambda x: x[1])

highest_selling_category

('Electronics', 15593)

In [36]:
# categories with sales above 12000

high_selling_cat_12k = total_sales_by_category.filter(lambda x: x[1]>12000)

high_selling_cat_12k.collect()

[('Furniture', 12832), ('Clothing', 14410), ('Electronics', 15593)]

### Shared Variables

When you pass a function (like in map or reduce) to Spark, that function runs on different machines in the cluster — not on your local driver.

**By default:**

- Spark makes separate copies of any variable you use inside those functions.
- So, if a task changes a variable on the executor, that change will not reflect back in your driver program.

This is done to keep things fast and distributed — but it also means you can not just update normal variables across tasks.

**challenges:**

You want to count how many rows have Amount > 500 using this code:

```py
count = 0

def increment_count(x):
    count += 1 

records.filter(lambda x: int(x[3]) > 500).foreach(increment_count)
print(count)

```

This will not work because each machine updates its own copy of count not the original one in the driver.

To solve such issues, spark provides two types of shared variables:

1. **Broadcast Variables:** a read-only variable that can be cached on each machine (executor). Used to efficiently share large data (like lookup tables) with all tasks without copying it multiple times.
2. **Accumulators:** variables used to safely implement counters or sums across mulitple worker nodes. You can only add to them (not read or subtract inside tasks). The final value is only accessible on the driver after an action is executed.




In [37]:
count = 0

def increment_count(x):
    count += 1 

records.filter(lambda x: int(x[3]) > 500).foreach(increment_count)
print(count)

25/07/29 19:56:19 ERROR Executor: Exception in task 1.0 in stage 42.0 (TID 62)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 2044, in main
    process()
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 2034, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/core/rdd.py", line 5306, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/core/rdd.py", line 5306, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 42.0 failed 1 times, most recent failure: Lost task 1.0 in stage 42.0 (TID 62) (ac24ef9e-b3a5-4839-adc4-181fe6faed47.internal.cloudapp.net executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 2044, in main
    process()
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 2034, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/core/rdd.py", line 5306, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/core/rdd.py", line 5306, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/core/rdd.py", line 5306, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  [Previous line repeated 1 more time]
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/core/rdd.py", line 705, in func
    return f(iterator)
           ^^^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/core/rdd.py", line 1630, in processPartition
    f(x)
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/util.py", line 131, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_38434/1037894068.py", line 4, in increment_count
UnboundLocalError: cannot access local variable 'count' where it is not associated with a value

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:581)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:940)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:925)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.mutable.Growable.addAll(Growable.scala:61)
	at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
	at scala.collection.mutable.ArrayBuilder.addAll(ArrayBuilder.scala:75)
	at scala.collection.IterableOnceOps.toArray(IterableOnce.scala:1505)
	at scala.collection.IterableOnceOps.toArray$(IterableOnce.scala:1498)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1057)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2524)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$3(DAGScheduler.scala:2935)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2935)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2927)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2927)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1295)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1295)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3207)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2484)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2505)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2524)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2549)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1056)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:203)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at jdk.internal.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 2044, in main
    process()
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 2034, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/core/rdd.py", line 5306, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/core/rdd.py", line 5306, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/core/rdd.py", line 5306, in pipeline_func
    return func(split, prev_func(split, iterator))
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^
  [Previous line repeated 1 more time]
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/core/rdd.py", line 705, in func
    return f(iterator)
           ^^^^^^^^^^^
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/core/rdd.py", line 1630, in processPartition
    f(x)
  File "/usr/local/python/3.12.1/lib/python3.12/site-packages/pyspark/util.py", line 131, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_38434/1037894068.py", line 4, in increment_count
UnboundLocalError: cannot access local variable 'count' where it is not associated with a value

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:581)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:940)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:925)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:532)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.mutable.Growable.addAll(Growable.scala:61)
	at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
	at scala.collection.mutable.ArrayBuilder.addAll(ArrayBuilder.scala:75)
	at scala.collection.IterableOnceOps.toArray(IterableOnce.scala:1505)
	at scala.collection.IterableOnceOps.toArray$(IterableOnce.scala:1498)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1057)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2524)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more


In [38]:
# broadcast variable

regions = {"North": "N", "South": "S", "East": "E", "West": "W"}

region_broadcast = sc.broadcast(regions)

In [46]:
# accumulator

high_value_count = sc.accumulator(0)

In [47]:
def enrich_region_and_count(record):
    pid, region, category, ammount = record
    if ammount > 800:
        high_value_count.add(1)
    return (pid, region_broadcast.value[region], category, ammount)

enriched_records = records.map(enrich_region_and_count)

enriched_records.collect()


[(2001, 'S', 'Electronics', 838),
 (2002, 'W', 'Furniture', 471),
 (2003, 'N', 'Electronics', 803),
 (2004, 'W', 'Furniture', 174),
 (2005, 'S', 'Clothing', 590),
 (2006, 'N', 'Furniture', 937),
 (2007, 'N', 'Electronics', 391),
 (2008, 'W', 'Electronics', 961),
 (2009, 'N', 'Electronics', 305),
 (2010, 'E', 'Clothing', 213),
 (2011, 'E', 'Electronics', 615),
 (2012, 'S', 'Clothing', 573),
 (2013, 'E', 'Clothing', 352),
 (2014, 'W', 'Clothing', 768),
 (2015, 'N', 'Electronics', 231),
 (2016, 'E', 'Furniture', 217),
 (2017, 'W', 'Clothing', 346),
 (2018, 'N', 'Books', 375),
 (2019, 'S', 'Electronics', 313),
 (2020, 'W', 'Furniture', 903),
 (2021, 'W', 'Clothing', 904),
 (2022, 'N', 'Furniture', 812),
 (2023, 'W', 'Furniture', 590),
 (2024, 'N', 'Books', 452),
 (2025, 'N', 'Books', 697),
 (2026, 'E', 'Clothing', 959),
 (2027, 'N', 'Books', 661),
 (2028, 'S', 'Books', 700),
 (2029, 'E', 'Clothing', 832),
 (2030, 'N', 'Furniture', 163),
 (2031, 'S', 'Books', 413),
 (2032, 'N', 'Furniture',

In [50]:
enriched_records_2 = records.map(enrich_region_and_count)

enriched_records_2.collect()


[(2001, 'S', 'Electronics', 838),
 (2002, 'W', 'Furniture', 471),
 (2003, 'N', 'Electronics', 803),
 (2004, 'W', 'Furniture', 174),
 (2005, 'S', 'Clothing', 590),
 (2006, 'N', 'Furniture', 937),
 (2007, 'N', 'Electronics', 391),
 (2008, 'W', 'Electronics', 961),
 (2009, 'N', 'Electronics', 305),
 (2010, 'E', 'Clothing', 213),
 (2011, 'E', 'Electronics', 615),
 (2012, 'S', 'Clothing', 573),
 (2013, 'E', 'Clothing', 352),
 (2014, 'W', 'Clothing', 768),
 (2015, 'N', 'Electronics', 231),
 (2016, 'E', 'Furniture', 217),
 (2017, 'W', 'Clothing', 346),
 (2018, 'N', 'Books', 375),
 (2019, 'S', 'Electronics', 313),
 (2020, 'W', 'Furniture', 903),
 (2021, 'W', 'Clothing', 904),
 (2022, 'N', 'Furniture', 812),
 (2023, 'W', 'Furniture', 590),
 (2024, 'N', 'Books', 452),
 (2025, 'N', 'Books', 697),
 (2026, 'E', 'Clothing', 959),
 (2027, 'N', 'Books', 661),
 (2028, 'S', 'Books', 700),
 (2029, 'E', 'Clothing', 832),
 (2030, 'N', 'Furniture', 163),
 (2031, 'S', 'Books', 413),
 (2032, 'N', 'Furniture',

In [44]:
high_value_count.value

24

In [None]:
# rdd to a new file

filtered = enriched_records.filter(lambda x:x[3]>800)

filtered.map(lambda x: ",".join(map(str,x))).repartition(2).saveAsTextFile("filtered_high_value_sales_3")

In [None]:
# rdd to a new file

filtered = enriched_records.filter(lambda x:x[3]>800)

filtered.map(lambda x: ",".join(map(str,x))).coalesce(1).saveAsTextFile("filtered_high_value_sales_2")

**Activity:**

1. find all product IDs where the amount is greater than 900.
2. Find all transactions that belong to the “Furniture” category.
3. Count how many transactions belong to the “Electronics” category.
4. Find average amount for each category.
5. Find the highest amount and the corresponding product ID.
6. Find the total number of unique categories.
7. For each category, find the product ID with the highest sale.
8. Count how many products were sold for less than 300.
9. Sort the transactions by amount in descending order.
10. Broadcast Variable: Category Discounts

```py
{"Electronics": 0.10, "Furniture": 0.15, "Clothing": 0.05, "Books": 0.20}
```
11. Accumulator: Count Transactions Below 300
12. Filter and Save Results
