# We will learn: 

- `JOIN()`
- `Broadcast` Variable
- `repartition`
- `coalesce`
- `cache`

# SparkSession

In [1]:
spark

## Create `SparkContext`

In [2]:
sc = spark.sparkContext
sc

# `join()`

- Return an RDD containing all pairs of elements with matching keys in
`self` and `other`.

- Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
(k, v1) is in `self` and (k, v2) is in `other`.

In [1]:
# Loading the orders data
data_set = 's3://fcc-spark-example/dataset/2023/orders.csv'
rdd1 = sc.textFile(data_set)

orders_rdd = rdd1.map(lambda line: (line.split(',')[2], line.split(',')[3])) 


In [2]:
orders_rdd.take(5)

                                                                                

[('order_customer_id', 'order_status'),
 ('11599', 'CLOSED'),
 ('256', 'PENDING_PAYMENT'),
 ('12111', 'COMPLETE'),
 ('8827', 'CLOSED')]

In [3]:
# Loading the customers data
data_set = 's3://fcc-spark-example/dataset/2023/customers.csv'
rdd1 = sc.textFile(data_set)

In [4]:
rdd1.take(5)

                                                                                

['customer_id,customer_fname,customer_lname,customer_email,customer_password,customer_street,customer_city,customer_state,customer_zipcode',
 '1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521',
 '2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126',
 '3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725',
 '4,Mary,Jones,XXXXXXXXX,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069']

In [5]:
customer_rdd = rdd1.map(lambda x: (x.split(',')[0], x.split(',')[-1]))

In [6]:
customer_rdd.take(5)

                                                                                

[('customer_id', 'customer_zipcode'),
 ('1', '78521'),
 ('2', '80126'),
 ('3', '00725'),
 ('4', '92069')]

In [7]:
joined_rdd = customer_rdd.join(orders_rdd)

In [8]:
joined_rdd.take(5)

                                                                                

[('6241', ('60004', 'COMPLETE')),
 ('6241', ('60004', 'CLOSED')),
 ('6241', ('60004', 'ON_HOLD')),
 ('6243', ('60618', 'ON_HOLD')),
 ('6243', ('60618', 'PENDING'))]

In [22]:
joined_rdd.saveAsTextFile('s3://fcc-spark-example/output/joined_rdd')

                                                                                

![Alt Text](../img/JOIN.png)


### Processed data 

![Alt Text](../img/output_joined_rdd.png)

## Types of `JOIN`

In [44]:
# Create the first RDD
rdd1 = sc.parallelize([('a', 10), ('b', 20), ('c', 30), ('d', 40)])

# Create the second RDD
rdd2 = sc.parallelize([('a', 50), ('b', 60), ('e', 70), ('f', 80)])


In [45]:

# Inner join
inner_join = rdd1.join(rdd2)
print("Inner join:")
print("*****************")
_ =  inner_join.collect()

for i in _:
    print(i)
print("*****************")

Inner join:
*****************
('a', (10, 50))
('b', (20, 60))
*****************


In [46]:
# Left outer join
left_outer_join = rdd1.leftOuterJoin(rdd2)
print("Left outer join:")
print("*****************")
_ =  left_outer_join.collect()

for i in _:
    print(i)
print("*****************")


Left outer join:
*****************
('a', (10, 50))
('b', (20, 60))
('c', (30, None))
('d', (40, None))
*****************


In [47]:

# Right outer join
right_outer_join = rdd1.rightOuterJoin(rdd2)
print("Right outer join:")
print("*****************")
_ =  right_outer_join.collect()

for i in _:
    print(i)
print("*****************")

Right outer join:
*****************
('a', (10, 50))
('e', (None, 70))
('b', (20, 60))
('f', (None, 80))
*****************


In [48]:
# Full outer join
full_outer_join = rdd1.fullOuterJoin(rdd2)
print("Full outer join:")
print("*****************")
_ =  full_outer_join.collect()

for i in _:
    print(i)
print("*****************")

Full outer join:
*****************
('a', (10, 50))
('e', (None, 70))
('b', (20, 60))
('c', (30, None))
('d', (40, None))
('f', (None, 80))
*****************


# Broadcast Variable

Broadcast a read-only variable to the cluster, returning a :class:`Broadcast`
object for reading it in distributed functions. The variable will
be sent to each cluster only once.

In [9]:
# Loading the orders data
data_set = 's3://fcc-spark-example/dataset/2023/orders_cust_id_999/*'
rdd1 = sc.textFile(data_set)

orders_rdd = rdd1.map(lambda line: (int(line.split(',')[2]), line.split(',')[-1])) 

In [10]:
# Loading the customers data
data_set = 's3://fcc-spark-example/dataset/2023/customers_cust_id_999/*'
rdd1 = sc.textFile(data_set)

customer_rdd = rdd1.map(lambda x: (int(x.split(',')[0]), x.split(',')[-1]))

In [16]:
orders_rdd.take(10)

                                                                                

[(256, 'PENDING_PAYMENT'),
 (918, 'PAYMENT_REVIEW'),
 (333, 'COMPLETE'),
 (656, 'COMPLETE'),
 (196, 'PROCESSING'),
 (662, 'PENDING_PAYMENT'),
 (674, 'PROCESSING'),
 (824, 'ON_HOLD'),
 (395, 'PROCESSING'),
 (104, 'PROCESSING')]

In [15]:
customer_rdd.take(10)

[(1, '78521'),
 (2, '80126'),
 (3, '00725'),
 (4, '92069'),
 (5, '00725'),
 (6, '07055'),
 (7, '00725'),
 (8, '01841'),
 (9, '00725'),
 (10, '22554')]

In [28]:
# Note RDD can not be broadcasted and thats why we are using `collect()`

#broadcast_var = sc.broadcast(dict(customer_rdd.collect()))

broadcast_var = sc.broadcast(dict(customer_rdd.collect()))

In [29]:
broadcast_var.value.get(4)

'92069'

In [33]:
# With seperate lambda function 
def get_zip_code(customer_id):
    return broadcast_var.value.get(customer_id)

joined_rdd = orders_rdd.map(lambda x: (x[0], get_zip_code(x[0]), x[1]))

In [34]:
joined_rdd.take(5)

[(256, '60625', 'PENDING_PAYMENT'),
 (918, '00725', 'PAYMENT_REVIEW'),
 (333, '00725', 'COMPLETE'),
 (656, '91767', 'COMPLETE'),
 (196, '48126', 'PROCESSING')]

In [206]:
# Without seperate lambda function 

joined_rdd = orders_rdd.map(lambda x: (x[0], broadcast_var.value.get(x[0]), x[1]))


In [207]:
joined_rdd.take(10)

                                                                                

[(999, '55124', 'PENDING'),
 (999, '55124', 'COMPLETE'),
 (999, '55124', 'CLOSED'),
 (999, '55124', 'CLOSED'),
 (999, '55124', 'PENDING_PAYMENT'),
 (999, '55124', 'PENDING_PAYMENT'),
 (998, '92805', 'PENDING_PAYMENT'),
 (998, '92805', 'PENDING_PAYMENT'),
 (998, '92805', 'PENDING_PAYMENT'),
 (998, '92805', 'PENDING_PAYMENT')]

# Repartition

- We can increase or decrease the no. of partition in an RDD 
- Increase parallelism 
- Decrease when we add filter and after that the data is sparsed across many machines 
    - For example, assume we have 1000 machine 
    - And we are processing 1000 GB of data, with a block size of 128MB 
    - So, there will be 8000 partitions, 8 per machine 
    - Now, lets say we applied a filter which filters 128MB of data to 1MB 
    - In that case we will still have 8000 partitions where each partition would hold only 1MB of data 
    - In those cases, it would be better if we repartition to a smaller number (may be 80 partitions), so that we make sure each partition if filled with data. 
- It will do complete reshuffling of the data 
- Use `coalesce` when you want to decrease the no. partitions 
- Use `repartition` when you want to increase the no. of partitions 

In [224]:
data_set = 's3://amazon-reviews-pds/tsv/amazon_reviews_us_Books*'

In [226]:
rdd1 = sc.textFile(data_set)

In [228]:
rdd1.getNumPartitions()

3

### Increase the partition

In [230]:
new_rdd = rdd1.repartition(10)
new_rdd.getNumPartitions()

10

### Descrease the partition

In [232]:
new_rdd_2 = rdd1.repartition(2)
new_rdd_2.getNumPartitions()

2

# Coalesce

- It can ONLY decrease the number of partition in the RDD 
- It can CAN NOT increase the partition 
- It wont do complete reshuffling 
- It would try to merge the partition within each machine
- Its intend is to avoid shuffling 
- Size of the paritions might be different across different machines 

In [233]:
data_set = 's3://amazon-reviews-pds/tsv/amazon_reviews_us_Books*'

In [234]:
rdd1 = sc.textFile(data_set)

In [235]:
rdd1.getNumPartitions()

3

In [237]:
new_rdd = rdd1.coalesce(10) # It wont give error, but it wont change the no. of partition 

In [238]:
new_rdd.getNumPartitions()

3

In [242]:
new_rdd = rdd1.coalesce(1) # Only decreasing partition would work  

In [240]:
new_rdd.getNumPartitions()

1

# Cache 

When an RDD is cached, its partitions are stored in the memory of the worker nodes. This allows Spark to reuse the RDD without recalculating it each time it is needed, reducing the overall processing time. The cached RDD can be reused across multiple jobs or stages, which helps to speed up the overall execution time of the Spark application.

In [246]:
data_set = 's3://fcc-spark-example/dataset/2023/orders.txt'

In [247]:
rdd1 = sc.textFile(data_set)

In [248]:
rdd1.take(5)

                                                                                

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

#### Task 1

In [249]:
# Lets take only the orders which are NOT in COMPLETE state 
non_complete_orders = rdd1.filter(lambda x: x.split(',')[-1] != 'COMPLETE')

#### Task 2 

In [None]:
# Lets count the no. of orders each customer made 
no_of_customer = (non_complete_orders 
                      .map(lambda x: (x.split(',')[2], 1))
                      .reduceByKey(lambda x, y: x + y)
                 )

#### Task 3 

In [265]:
# Lets now filter customers who ID is more than 100
customer_rdd = (non_complete_orders.filter(lambda x: int(x.split(',')[2]) > 100)
                                   .sortBy(lambda x: int(x.split(',')[2]), ascending=True)
               )

                                                                                

In [266]:
customer_rdd.take(5)

['33831,2014-02-18 00:00:00.0,101,PROCESSING',
 '36362,2014-03-05 00:00:00.0,101,CANCELED',
 '39068,2014-03-23 00:00:00.0,101,PENDING',
 '13397,2013-10-15 00:00:00.0,102,PENDING_PAYMENT',
 '24662,2013-12-25 00:00:00.0,102,PENDING']

In [250]:
non_complete_orders.take(5)

                                                                                

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '8,2013-07-25 00:00:00.0,2911,PROCESSING',
 '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT']

In [256]:
no_of_customer.take(5)

[('256', 9), ('2911', 5), ('5657', 9), ('9149', 3), ('9842', 7)]

In [271]:
# customer_rdd.collect()

In [272]:
# non_complete_orders.collect()

In [273]:
# no_of_customer.collect()

In [274]:
# Lets take only the orders which are NOT in COMPLETE state 
non_complete_orders = rdd1.filter(lambda x: x.split(',')[-1] != 'COMPLETE')

# Lets count the no. of orders each customer made 
no_of_customer = (non_complete_orders 
                      .map(lambda x: (x.split(',')[2], 1))
                      .reduceByKey(lambda x, y: x + y)
                 )

# Lets now filter customers who ID is more than 100
customer_rdd = (non_complete_orders.filter(lambda x: int(x.split(',')[2]) > 100)
                                   .sortBy(lambda x: int(x.split(',')[2]), ascending=True)
               )

customer_rdd.cache()

                                                                                

PythonRDD[608] at RDD at PythonRDD.scala:53

In [275]:
customer_rdd.collect()

['33831,2014-02-18 00:00:00.0,101,PROCESSING',
 '36362,2014-03-05 00:00:00.0,101,CANCELED',
 '39068,2014-03-23 00:00:00.0,101,PENDING',
 '13397,2013-10-15 00:00:00.0,102,PENDING_PAYMENT',
 '24662,2013-12-25 00:00:00.0,102,PENDING',
 '52414,2014-06-19 00:00:00.0,102,PROCESSING',
 '57964,2013-08-02 00:00:00.0,102,PENDING_PAYMENT',
 '790,2013-07-29 00:00:00.0,103,PENDING_PAYMENT',
 '29473,2014-01-23 00:00:00.0,103,PENDING_PAYMENT',
 '32346,2014-02-10 00:00:00.0,103,ON_HOLD',
 '54091,2014-07-01 00:00:00.0,103,PENDING_PAYMENT',
 '115,2013-07-26 00:00:00.0,104,PROCESSING',
 '1968,2013-08-04 00:00:00.0,105,CLOSED',
 '31522,2014-02-05 00:00:00.0,105,CANCELED',
 '36634,2014-03-06 00:00:00.0,105,PENDING_PAYMENT',
 '1574,2013-08-02 00:00:00.0,106,PENDING_PAYMENT',
 '44556,2014-04-27 00:00:00.0,106,PENDING_PAYMENT',
 '8242,2013-09-14 00:00:00.0,107,PROCESSING',
 '30358,2014-01-30 00:00:00.0,107,PENDING_PAYMENT',
 '40289,2014-03-31 00:00:00.0,107,PROCESSING',
 '52141,2014-06-16 00:00:00.0,107,PENDI

![Alt Text](../img/cache.png)

In [276]:
customer_rdd.collect()

['33831,2014-02-18 00:00:00.0,101,PROCESSING',
 '36362,2014-03-05 00:00:00.0,101,CANCELED',
 '39068,2014-03-23 00:00:00.0,101,PENDING',
 '13397,2013-10-15 00:00:00.0,102,PENDING_PAYMENT',
 '24662,2013-12-25 00:00:00.0,102,PENDING',
 '52414,2014-06-19 00:00:00.0,102,PROCESSING',
 '57964,2013-08-02 00:00:00.0,102,PENDING_PAYMENT',
 '790,2013-07-29 00:00:00.0,103,PENDING_PAYMENT',
 '29473,2014-01-23 00:00:00.0,103,PENDING_PAYMENT',
 '32346,2014-02-10 00:00:00.0,103,ON_HOLD',
 '54091,2014-07-01 00:00:00.0,103,PENDING_PAYMENT',
 '115,2013-07-26 00:00:00.0,104,PROCESSING',
 '1968,2013-08-04 00:00:00.0,105,CLOSED',
 '31522,2014-02-05 00:00:00.0,105,CANCELED',
 '36634,2014-03-06 00:00:00.0,105,PENDING_PAYMENT',
 '1574,2013-08-02 00:00:00.0,106,PENDING_PAYMENT',
 '44556,2014-04-27 00:00:00.0,106,PENDING_PAYMENT',
 '8242,2013-09-14 00:00:00.0,107,PROCESSING',
 '30358,2014-01-30 00:00:00.0,107,PENDING_PAYMENT',
 '40289,2014-03-31 00:00:00.0,107,PROCESSING',
 '52141,2014-06-16 00:00:00.0,107,PENDI

In [278]:
customer_rdd.collect()

['33831,2014-02-18 00:00:00.0,101,PROCESSING',
 '36362,2014-03-05 00:00:00.0,101,CANCELED',
 '39068,2014-03-23 00:00:00.0,101,PENDING',
 '13397,2013-10-15 00:00:00.0,102,PENDING_PAYMENT',
 '24662,2013-12-25 00:00:00.0,102,PENDING',
 '52414,2014-06-19 00:00:00.0,102,PROCESSING',
 '57964,2013-08-02 00:00:00.0,102,PENDING_PAYMENT',
 '790,2013-07-29 00:00:00.0,103,PENDING_PAYMENT',
 '29473,2014-01-23 00:00:00.0,103,PENDING_PAYMENT',
 '32346,2014-02-10 00:00:00.0,103,ON_HOLD',
 '54091,2014-07-01 00:00:00.0,103,PENDING_PAYMENT',
 '115,2013-07-26 00:00:00.0,104,PROCESSING',
 '1968,2013-08-04 00:00:00.0,105,CLOSED',
 '31522,2014-02-05 00:00:00.0,105,CANCELED',
 '36634,2014-03-06 00:00:00.0,105,PENDING_PAYMENT',
 '1574,2013-08-02 00:00:00.0,106,PENDING_PAYMENT',
 '44556,2014-04-27 00:00:00.0,106,PENDING_PAYMENT',
 '8242,2013-09-14 00:00:00.0,107,PROCESSING',
 '30358,2014-01-30 00:00:00.0,107,PENDING_PAYMENT',
 '40289,2014-03-31 00:00:00.0,107,PROCESSING',
 '52141,2014-06-16 00:00:00.0,107,PENDI

![Alt Text](../img/cache2.png)

# Summary 

### We will learn: 

In this section we learn:

- `JOIN()`
- `Broadcast` Variable
- `repartition`
- `coalesce`
- `cache`