# Distributed Computing
A distributed system is a system whose components are located on different networked computers, which communicate and coordinate their actions by passing messages to one another. Distributed computing is a field of computer science that studies distributed systems [link](https://en.wikipedia.org/wiki/Distributed_computing])

In [1]:
%%timeit
def square(x):
    return x * x

#The list command is to make a readable output
#in python3 map and filter create generators
list(map(square, range(10**7)))[:10]

1.25 s ± 63.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [3]:
%%timeit
from functools import reduce

def square(x):
    return x * x

def add(x, y):
    return x + y

reduce(add, map(square, range(10**7)))

1.66 s ± 55.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [5]:
%%timeit
from functools import reduce

reduce(lambda x, y: x + y, map(lambda x: x * x, range(10**7)))

1.65 s ± 45.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Parallel Computing with Spark

In [None]:
from pyspark import SparkContext

sc = SparkContext("local[*]", "temp")

In [7]:
lines = sc.textFile('./data/docs/')
lines.flatMap(lambda line: line.split(" ")) \
     .map(lambda word: (word.lower(), 1)) \
     .reduceByKey(lambda x, y: x + y) \
     .sortByKey() \
     .saveAsTextFile("counts")

                                                                                

In [8]:
! cat counts/part-00000 | grep "^('[a-z]" | head

('a', 13342)
('a!_', 1)
('a)', 9)
('a).', 1)
('a,', 10)
('a--e', 2)
('a--p.', 1)
('a--well,', 1)
('a-t-il', 1)
('a.', 93)
grep: write error: Broken pipe
cat: write error: Broken pipe


In [9]:
type(lines)

pyspark.rdd.RDD

In [12]:
print(lines.flatMap(lambda line: line.split(" ")),'\n',lines.flatMap(lambda line: line.split(" ")).count())

PythonRDD[17] at RDD at PythonRDD.scala:53 
 663351


### Spark DataFrame

In [20]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = lines.flatMap(lambda line: line.split(" ")) \
          .map(lambda word: (word.lower(), 1)) \
          .reduceByKey(lambda x, y: x + y) \
          .toDF(['word', 'count'])

                                                                                

In [22]:
df.printSchema()

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)



In [23]:
df.describe('count').show()

+-------+-----------------+
|summary|            count|
+-------+-----------------+
|  count|            74405|
|   mean|8.915408910691486|
| stddev|266.3647125527789|
|    min|                1|
|    max|            46691|
+-------+-----------------+



## Spark SQL

In [25]:
df.createOrReplaceTempView('counts')
sqlContext.sql('select * from counts where count > 5000').show()

+----+-----+
|word|count|
+----+-----+
| the|46691|
|  of|24986|
|    |34179|
|  it| 5819|
|  is| 7530|
| and|18239|
|that| 6452|
|  to|12389|
|   a|13342|
|  in|12717|
+----+-----+



### Machine Learning

In [27]:
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import UserDefinedFunction as udf
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.ml.linalg import VectorUDT, Vectors

In [28]:
word_length = udf(lambda x: Vectors.dense(len(x)), VectorUDT())
feat_df = df.withColumn("features", word_length("word")) \
            .withColumnRenamed("count", "label")

In [29]:
linreg = LinearRegression()
model = linreg.fit(feat_df)
model.coefficients

23/02/23 16:55:09 WARN Instrumentation: [54e32a0a] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

DenseVector([-3.5131])

## Spark setup
Calculate the number of partitions

In [4]:
lines = sc.textFile('./data/docs/', 10)

In [9]:
s1=lines.getNumPartitions()

In [10]:
lines2 = lines.repartition(1)
s2 = lines2.getNumPartitions()

In [11]:

assert s2 == lines2.getNumPartitions()

lines3 = (lines2.map(lambda x : (x, 1))
              .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]), 
                          numPartitions=10)
            )
s3 = lines3.getNumPartitions()

print(s1, s2, s3)

11 1 10


### Spark Variables
In spark variables are used as shared variables for parallel computing. There are two types of shared variables:
- Broadcast
- Accumulator

The variables are sent to each node of cluster for parallel processing.

- Broadcast variables: 
These variables are available in all executors and they are cached a head of time. These variables are only sent to executors once and they are available for all tasks in the executors. There are pros and cons. The pros are the distribution of variables among nodes which reduces the number of work to transfer data and reduces the time of shuffling data. The broadcast variables should not be used in large datasets with the size of GB and they should be implemented in MB size datasets. 

- Accumulator:
Accumulators are used when are called through the workers. The important functions which are used in $Accumulator$ are:
    - **sparkContext.accumulator()** which is used to define the accumulator variables
    - **add()** to add and update the accumulator values
    - **value** which is the main property of the accumulator 
    
#### Broadcast Variable Example:

In [21]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('employees').getOrCreate()

companies = {"FB":"Facebook", "TSLA":"TESLA", "GOOGL":"Google", "MSFT":"Microsoft"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("John","Jones","USA","FB"),
    ("Jack","Jackson","USA","TSLA"),
    ("Jeniffer","Fred","USA","GOOGL"),
    ("Jasmin","Jake","USA","MSFT")
  ]

rdd = spark.sparkContext.parallelize(data)

def employee_convert(code):
    return broadcastStates.value[code]

result = rdd.map(lambda x: (x[0],x[1],x[2], employee_convert(x[3]))).collect()
print(result)


[('John', 'Jones', 'USA', 'Facebook'), ('Jack', 'Jackson', 'USA', 'TESLA'), ('Jeniffer', 'Fred', 'USA', 'Google'), ('Jasmin', 'Jake', 'USA', 'Microsoft')]


In [23]:
spark = SparkSession.builder.appName('employee').getOrCreate()

companies = {"FB":"Facebook", "TSLA":"TESLA", "GOOGL":"Google", "MSFT":"Microsoft"}
broadcastStates = spark.sparkContext.broadcast(states)

data = [("John","Jones","USA","FB"),
    ("Jack","Jackson","USA","TSLA"),
    ("Jeniffer","Fred","USA","GOOGL"),
    ("Jasmin","Jake","USA","MSFT")
  ]

columns = ["First_Name","Last_Name","Country","Company_Name"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)

def state_convert(code):
    return broadcastStates.value[code]

result = df.rdd.map(lambda x: (x[0],x[1],x[2], employee_convert(x[3]))).toDF(columns)
result.show(truncate=False)

root
 |-- First_Name: string (nullable = true)
 |-- Last_Name: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Company_Name: string (nullable = true)

+----------+---------+-------+------------+
|First_Name|Last_Name|Country|Company_Name|
+----------+---------+-------+------------+
|John      |Jones    |USA    |FB          |
|Jack      |Jackson  |USA    |TSLA        |
|Jeniffer  |Fred     |USA    |GOOGL       |
|Jasmin    |Jake     |USA    |MSFT        |
+----------+---------+-------+------------+

+----------+---------+-------+------------+
|First_Name|Last_Name|Country|Company_Name|
+----------+---------+-------+------------+
|John      |Jones    |USA    |Facebook    |
|Jack      |Jackson  |USA    |TESLA       |
|Jeniffer  |Fred     |USA    |Google      |
|Jasmin    |Jake     |USA    |Microsoft   |
+----------+---------+-------+------------+



#### Accumulator Variable Example:

In [38]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("salary_accumulator").getOrCreate()

accum=spark.sparkContext.accumulator(0)
rdd=spark.sparkContext.parallelize([20, 30, 50, 100])

print(accum.value) #Accessed by driver
variables_sum=spark.sparkContext.accumulator(0)
def summation(x):
    global variables_sum
    variables_sum+=x
rdd.foreach(summation)
print('\nsum of all variables defined in rdd:\n', variables_sum.value)


variable_count=spark.sparkContext.accumulator(0)
rdd2=spark.sparkContext.parallelize([1,2,3,4,5])
rdd2.foreach(lambda x:variable_count.add(1))
print("\nThe number of variables defined in rdd2:\n", variable_count.value)

0

sum of all variables defined in rdd:
 200

The number of variables defined in rdd2:
 5
