# Intro To Spark Assignment 5
submitted by Himani Anil Deshpande

## Describe what are Accumulators and Broadcast Variables in Spark and when to use these shared variables?

### Shared Variables:

When a function passed to a Spark operation (such as a map or reduction) to run on a remote cluster node, it doesnt work on the original copy of the variables but works with a separate copy of all the variables used in the function. 


These copies of variables are available in all the remote nodes and while the operation runs on these nodes the updates are done on these copies of variables and not the original copy, but the results from these nodes doesnt return the updates done on these copies of variables to the driver program.



Supporting common read / write shared variables between tasks is inefficient. But, Spark provides two restricted types of shared variables for two common usage patterns: broadcast variables and accumulators.


### Broadcast Variables:

Spark uses efficient broadcast algorithms to distribute broadcast variables and keep the communication cost to the minimum. These variables are for keeping a read-only copy of the variables which are cached on each node rather than transferring the copy with the job/task. As they are read-only no updates need to be propogated.
These are useful when we need to keep a copy of a large dataset on every node.
The easiest way to meet read-only requirements is to pass a reference to a primitive value or immutable object. In such cases, you can only change the value of the broadcast variable in the driver code.

Spark actions are performed through a series of stages separated by a distributed "shuffle" operation. Automatically, Spark sends the common data needed for each stage of the task. The data transferred in this way is cached in a serialized format and deserialized before  each task is performed. In other words, explicitly creating broadcast variables  only makes sense if the task requires the same data across multiple stages, or if it is important to cache the data in a deserialized format.


Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. 

### Accumulators:



Accumulators provide a simple syntax for aggregating values from worker nodes back to the driver program

Accumulators are variables that are "added" only by associative law and commutative operations, so they can  be efficiently supported in parallel. 

These can be used to implement counters, similar to MapReduce counters or sums. Spark by default supports numeric accumulators with added support for new types. 
we can create named or unnamed accumulators. A named accumulator  appears in the web UI of the stage.

In [1]:
pip install py4j

Collecting py4j
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 4.6 MB/s 
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.5


In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 49.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=c0ee23ee2784eecc9f5a4f13b8cc897530d960aac762e699f4c019f9eb34513d
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.5
    Uninstalling py4j-0.10.9.5:
      Successfully uninstalled py4j-0.10.9.5
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, IntegerType, DateType
from pyspark.sql.functions import col, asc,desc
import json
import csv
from io import StringIO
from pyspark.sql import HiveContext
sc = SparkContext()

## Examples of Accumulator

In [8]:

num = sc.accumulator(0) 
def func_add(x): 
   global num 
   num+=x 
  #  print(['*']*num)
rdd = sc.parallelize([1,2,3,4,5]) 
rdd.foreach(func_add) 
final = num.value 
print ("Accumulated value is ->", final)

Accumulated value is -> 15


In the above example the values of RDD are summed and stored in Accumulator

In [20]:
# from pyspark.sql import SparkSession
# import pyspark
from functools import partial

def remove_odd(item, accumulator):
    if item % 2 == 0:
        accumulator += 1
    return '0' in str(item)


accumulator = sc.accumulator(0)
count_filter = partial(remove_odd, accumulator=accumulator)

print(sc.range(0, 100).filter(count_filter).sum())

print('accum', accumulator)

450
accum 50


In the above example we add up the accumulator only for even numbers from 1 to 100
We use the functools.partial to create the counting filter, which remembers our accumulator variable and then print the sum and the final value of the accumulator

## Broadcast Variable Example

In [24]:


from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)


country = {"IN":"India", "KR":"South Korea", "DK":"Denmark", "AR":"Argentina"}
broadcast_country = sc.broadcast(country)

data = [("Shaun","Verghese","IN"),
    ("Chandra","Shah","IN"),
    ("Paula","Williams","DK"),
    ("Maria","D'costa","AR"),
     ("Kim","Unn Park","KR")
  ]

columns = ["firstname","lastname","country"]
df = sqlContext.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

def state_convert(code):
    return broadcast_country.value[code]

result = df.rdd.map(lambda x: (x[0],x[1],state_convert(x[2]))).toDF(columns)
result.show(truncate=False)



root
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- country: string (nullable = true)

+---------+--------+-------+
|firstname|lastname|country|
+---------+--------+-------+
|Shaun    |Verghese|IN     |
|Chandra  |Shah    |IN     |
|Paula    |Williams|DK     |
|Maria    |D'costa |AR     |
|Kim      |Unn Park|KR     |
+---------+--------+-------+

+---------+--------+-----------+
|firstname|lastname|country    |
+---------+--------+-----------+
|Shaun    |Verghese|India      |
|Chandra  |Shah    |India      |
|Paula    |Williams|Denmark    |
|Maria    |D'costa |Argentina  |
|Kim      |Unn Park|South Korea|
+---------+--------+-----------+



I the above example, I am creating a dataframe containing Person data and a braodcast variables containing the lookup for a few counties codes. 
Then I use the bradcast variable to convert these country codes(Alphabetical codes) to the Full country name.

References:

https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#shared-variables


Canvas Modules