#So far, we have worked with data saved on personal computer(RAM). If we work with big(large) data that needs to be saved, either we use SQL database to move the data into hard drive instead of RAM or use distributive systems to distribute the large data to multiple computers/machines. A local machine is a personal computer with its RAM and hard drives while a distributed system involves one machine controlling multiple machines. The distributed system has more processing power,core and more storage compared to the local machine.

#A local process uses computational resources of a single machine. A distributed system has access to computational resources of multiple machines, connected via a network. It's easier to scale out to many lower CPU machines than scale up to a single machine with a high CPU. Distributed machines have advantage of scaling by adding more machines. If one machine fails, the whole network can still go on. This is called fault tolerance. Distributed architecture uses Hadoop.

# Hadoop:

#This is a way to distribute large files across many multiple machines. It uses the Hadoop Distributed File System(HDFS). HDFS works with large dataset, duplicates blocks of data and distributes them for fault tolerance. It also then uses MapReduce, which allows computations on that data. Multiple copies of a block prevents loss of data due to failure of a node.

# MapReduce

#This involves splitting a computation task to a distributed set of files(HDFS). It consists of a Job tracker which sends multiple task trackers to each distributed files. These task trackers work on the CPU and RAM of each distributed machines.

# Spark

#Spark improves on the concept of using distribution of big data. It uses Hadoop distributed file systems but has many advantages of using MapReduce. Spark can use stored data in AWS S3, HDFS, Casandra etc. While MapReduce requires HDFS, Spark requires lots more than HDFS, works 100 times faster than MapReduce by storing to memory.

#Spark Resilient Distributed Data(RDD) has 4 main features: distributed collection of data, fault-tolerant, ability to use many data sources, paralled operation-partitioned. RDD are immutable, lazily evaluated and cacheable. 2 types of RDD operations to be coded with Python are: Transformations, Actions. Basic Actions: first,collect,count,take. Basic Transformations: filter,map,flatmap.

#Pair RDD: often RDDs will be holding their values in tuples (key,value), which offers better partitioning of the data and leads to functinality based on reductions. Reduce() performs an action that will aggregate RDD elements using a function that returns a single element. ReduceByKey() performs an action that will aggregate Pair RDD elements using a function that returns a Pair RDD. This is similar to Groupby()

#To use Spark on a virtual machine, set up an EC2 instance on AWS, let the instance run and use a secure shell(SSH) to connect the instance on AWS via a network, to your local computer. Use puttygen and putty to change the downloaded .pem key to .ppk key, so the the instance can run on putty.

#Install anaconda on instance with this: wget https://repo.continuum.io/archive/Anaconda3-4.1.1-Linux-x86_64.sh or whatever version of Anaconda you want from Linux. After installation, type bash Anaconda3-4.1.1-Linux-x86_64.sh

In [1]:
import findspark
findspark.init()

In [2]:
# to test if pyspark was successfully installed

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [3]:
#Testing Pyspark installation

import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\Users\\emman\\Documents\\Spark Hadoop\\spark-3.4.1-bin-hadoop3'

In [4]:
#to stop the spark cluster running.

spark.stop()   

# Lambda Expression Review

#Lambda expression allows creation of anonymous function i.e creation an adhoc function without using def

In [5]:
def square(num):
    return num**3

In [6]:
square(5)

125

In [7]:
def div(x,y): return x//y

In [8]:
div(5,2)

2

In [9]:
divmod(5,2)

(2, 1)

In [10]:
#to get the lambda expression of the former def function

lambda num : num**3

<function __main__.<lambda>(num)>

In [11]:
number = list(range(0,10))

In [12]:
number

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [13]:
list(map(lambda x : x**2, number))

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [14]:
#check if a number is even

even = lambda num : num%2 == 0

In [15]:
even(3), even(4)

(False, True)

In [16]:
#check the square of a number

sq = lambda num : num**2

In [17]:
sq(10)

100

In [18]:
#get the first character of a string

first = lambda x : x[0]

In [19]:
first('catholic')

'c'

In [20]:
#reverse a string

rev = lambda s : s[::-1]

In [21]:
rev('catholic')

'cilohtac'

In [22]:
def add(a,b):
    return a+b

In [23]:
add(10,3)

13

In [24]:
add = lambda a,b : a+b

In [25]:
add(2,4)

6

# Introduction to Spark and Python

#Spark context represents connection to the Spark clusters, used to create RDD and broadcast variables on that cluster. Keep in mind you can have only one Spark context at a time when running things.

In [26]:
from pyspark import SparkContext

In [27]:
sc = SparkContext()

#To create and write a text file

In [29]:
%%writefile example.txt                     
first line
second line
third line
fourth line

Writing example.txt


#to create a RDD of a textfile method based off sc(SparkContest). Also, create actions and transformation on the RDD. Actions return values while Transformation returns new RDD

In [30]:
textFile = sc.textFile('example.txt')

In [31]:
#count the number of lines in the textfile example.txt

textFile.count()

4

In [32]:
#get the first line in the textfile example.txt

textFile.first()

'first line'

In [33]:
#get the first 3 lines of the textfile example.txt

textFile.take(3)

['first line', 'second line', 'third line']

In [34]:
textFile.collect()

['first line', 'second line', 'third line', 'fourth line']

In [35]:
#filter transformation on textFile RDD to create a new secfind RDD on the second line

secfind = textFile.filter(lambda line : 'second' in line)

In [36]:
secfind

PythonRDD[5] at RDD at PythonRDD.scala:53

In [37]:
#performing an action on the transformed RDD secfind from RDD textFile

secfind.collect(), secfind.count(), secfind.take(1)

(['second line'], 1, ['second line'])

In [38]:
thirdline = textFile.filter(lambda line : 'first' in line)

In [39]:
thirdline

PythonRDD[8] at RDD at PythonRDD.scala:53

In [40]:
#performing an action on the transformed RDD thirdline from RDD textFile

thirdline.collect(), thirdline.count()

(['first line'], 1)

# RDD Transformations and Actions

#RDD: Resilent Distributed Dataset.

#Transformation: Spark operation that produces an RDD or a form of recipe to do something. Always call an action on RDD.

#Action : Spark operation that produces a local object.

#Spark Job: sequence of transformations on data with a final action. 

#Creating an RDD: (1) sc.parallelize(array): create RDD of elements of array or list

#(2) sc.textFile(path/to/file): create RDD of line from file.

#from IPython.display import Image         Image(filename='path of image')

In [41]:
%%writefile example2.txt
first
second line
the third line
then a fourth line

Writing example2.txt


In [42]:
#perform transformation and action on the textfile above by creating RDD

sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[11] at textFile at NativeMethodAccessorImpl.java:0

In [43]:
text_rdd = sc.textFile('example2.txt')

In [44]:
text_rdd.collect()

['first', 'second line', 'the third line', 'then a fourth line']

In [45]:
#mapping the rdd with lambda function

words = text_rdd.map(lambda line: line.split())

In [46]:
words

PythonRDD[14] at RDD at PythonRDD.scala:53

In [47]:
#calling an action on words

words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [81]:
#using a flatmap transformation on rdd and perform an action to get a single list

text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

# RDD and Key-Value Pairs

In [49]:
%%writefile services.txt
#EventId   Timestamp   Customer  State  ServiceID  Amount
201        10/13/2017  100       NY     131        100.00
204        10/18/2017  700       TX     129        450.00
202        10/15/2017  203       CA     121        200.00
206        10/19/2017  202       CA     131        500.00
203        10/17/2017  101       NY     173        750.00
205        10/19/2017  202       TX     121        200.00

Writing services.txt


In [50]:
services = sc.textFile('services.txt')

In [51]:
#grab the first two lines of services.txt to get single string per line

services.take(2)

['#EventId   Timestamp   Customer  State  ServiceID  Amount',
 '201        10/13/2017  100       NY     131        100.00']

In [52]:
#split first three lines of services and perform the take action on it. it gives list of each item

services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [55]:
get_rid = services.map(lambda line: line.split()).take(1)

In [57]:
get_rid

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount']]

In [62]:
get_rid[0][0].split('#')[1]

'EventId'

In [75]:
#remove the # sign in the first item of every string, else return the entire line if # not present

services.map(lambda line: line[1:] if line[0]=='#' else line).collect()

['EventId   Timestamp   Customer  State  ServiceID  Amount',
 '201        10/13/2017  100       NY     131        100.00',
 '204        10/18/2017  700       TX     129        450.00',
 '202        10/15/2017  203       CA     121        200.00',
 '206        10/19/2017  202       CA     131        500.00',
 '203        10/17/2017  101       NY     173        750.00',
 '205        10/19/2017  202       TX     121        200.00']

In [65]:
#same as above to remove the # sign rom every line and return lines without #

clean_data = services.map(lambda line: line[1:] if line[0]=='#' else line)

In [66]:
clean = clean_data.map(lambda line: line.split())

In [67]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [69]:
#to grab the state and amount columns as a tuple

clean.map(lambda lst: (lst[3],lst[5])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [70]:
# same as above but in two processes.

pair = clean.map(lambda lst: (lst[3],lst[-1]))

In [71]:
pair.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [72]:
# to reduce key(states) to a single amount. reducebtkey assumes the first item in tuple is the key. in this case, States.
# reducebyKey performs the lambda expression on the second item of the tuple. in this case, amount.

rekey = pair.reduceByKey(lambda amt1,amt2: float(amt1) + float(amt2))

In [73]:
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [76]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

In [77]:
# Grab (State, Amount) in the form of a tuple.
step1 = clean.map(lambda lst: (lst[3],lst[-1]))
# Reduce by Key
step2 = step1.reduceByKey(lambda amt1,amt2 : float(amt1) + float(amt2))
# Get rid of State, Amount titles
step3 = step2.filter(lambda x : not x[0]=='State')
# Sort results by Amount
step4 = step3.sortBy(lambda stAmount : stAmount[1], ascending=False)
# Action
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [78]:
# Replacing the indexing method above with tuple unpacking
x = ['ID', 'State', 'Amount']

In [91]:
# instead of using lambda expression, you can also use def to get the actual amount
def func1(lst):
    return lst[-1]

In [95]:
def func2(id_st_amt):
    #unpack values
    (Id,st,amt) = id_st_amt
    return amt

In [96]:
func1(x)

'Amount'

In [97]:
func2(x)

'Amount'

In [111]:
# pop(0) can also be used to get rid of State, Amount titles.
# rekey.collect, then pop(0) to pop the first tuple: State, Amount
# check back for sc.parallelize()