# Spark
## Table of contents
[Step-0](#step-0-)
[Step-1](#step-1-)

### Step-0 :
Now create the SparkContext,A SparkContext represents the connection to a Spark cluster, and can be used to create an RDD and broadcast variables on that cluster.

In [3]:
import pyspark
import findspark
from pyspark import SparkContext

print(pyspark.__version__)
findspark.init()

# Initialize Spark
sc = SparkContext("local","SparkLab")
sc.setCheckpointDir('checkpoint')

4.0.0


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/24 16:15:40 WARN Utils: Your hostname, dell-Precision-3260, resolves to a loopback address: 127.0.1.1; using 10.194.175.3 instead (on interface wlp0s20f3)
25/07/24 16:15:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/24 16:15:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Step-1 : 
Let's creating Resilient Distributed Datasets (RDDs). Pyspark has multiple methods to convert different datatypes into rdd. So, We first create our text file data.

Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs. 

1. **From Data files RDD creation**
We will use textFile method from the SparkContext and method will read a text file from HDFS, a local file system(available on nodes) or any Hadoop-supported file system URI, and return it as an RDD of Strings.

2. **Actions on RDD**
RDDs have actions, which return values and we can easly perform the operations on rdd object such as counting the rows.

3. Transformations, which return pointers to new RDDs.Thus, transformation won't display the output until the action is called ? We can use the transformation like filter transformation will return a new RDD with a subset of item in the file.



Term                   |Definition
----                   |-------
RDD                    |Resilient Distributed Dataset
Transformation         |Spark operation that produces an RDD
Action                 |Spark operation that produces a local object
Spark Job              |Sequence of transformations on data with a final action

How to create the RDD ?
There are two common ways to create the RDD

Method                      |Result
----------                               |-------
`sc.parallelize(array)`                  |Create RDD of elements of array (or list)
`sc.textFile(path/to/file)`                      |Create RDD of lines from file

We can use transformations to create a set of instructions we want to preform on the RDD
Transformation Example                          |Result
----------                               |-------
`filter(lambda x: x % 2 == 0)`           |Discard non-even elements
`map(lambda x: x * 2)`                   |Multiply each RDD element by `2`
`map(lambda x: x.split())`               |Split each string into words
`flatMap(lambda x: x.split())`           |Split each string into words and flatten sequence
`sample(withReplacement=True,0.25)`      |Create sample of 25% of elements with replacement
`union(rdd)`                             |Append `rdd` to existing RDD
`distinct()`                             |Remove duplicates in RDD
`sortBy(lambda x: x, ascending=False)`   |Sort elements in descending order

We perform the transformation on the old rdd object and it returned pointer to the new rdd object then we can execute them by calling an action to return the output of new rdd.

Action                             |Result
----------                             |-------
`collect()`                            |Convert RDD to in-memory list 
`take(3)`                              |First 3 elements of RDD 
`top(3)`                               |Top 3 elements of RDD
`takeSample(withReplacement=True,3)`   |Create sample of 3 elements with replacement
`sum()`                                |Find element sum (assumes numeric elements)
`mean()`                               |Find element mean (assumes numeric elements)
`stdev()`                              |Find element deviation (assumes numeric elements)

We create transformation using the filter() method, is a filter function will only return element that satisfy the condition.

Let's see the line that contain the word *programming*.


In [19]:
%%writefile dataset/example.txt
Apache Spark is an open-source, distributed computing system designed 
or processing large-scale data quickly and efficiently. 
It provides an intuitive programming model for working with structured and 
unstructured data, enabling users to perform transformations and actions on datasets using high-level APIs.


Overwriting dataset/example.txt


In [21]:
# load the txt file data and create rdd
textFile = sc.textFile('dataset/example.txt')
print(textFile)

# Action like count, first etc on rdd object
print(textFile.count())
print(textFile.first())

# apply the transformation of the rdd object
secfind = textFile.filter(lambda line: 'programming' in line)
print(secfind)

secfind2 = secfind.map(lambda line:line.split()).collect()
# performing action on the rdd object because transformation returns pointer to the new-rdd.
print(secfind.collect())
print(secfind.count())
print(secfind2)

dataset/example.txt MapPartitionsRDD[18] at textFile at NativeMethodAccessorImpl.java:0
4
Apache Spark is an open-source, distributed computing system designed 
PythonRDD[21] at RDD at PythonRDD.scala:56
['It provides an intuitive programming model for working with structured and ']
1
[['It', 'provides', 'an', 'intuitive', 'programming', 'model', 'for', 'working', 'with', 'structured', 'and']]


Lambda expression allows developers to create anonymous functions, means we can quickly create function without defining function property def.


Function objects returned by running lambda expressions work exactly the same as those created and assigned by defs. There is key difference that makes lambda useful in specialized roles:

The lambda's body is similar to what we would put in a def body's return statement. We simply type the result as an expression instead of explicitly returning it. Because it is limited to an expression, a lambda is less general that a def. We can only squeeze design, to limit program nesting. lambda is designed for coding simple functions, and def handles the larger tasks.

Note : lambda's body is a single expression, not a block of statements.


In [None]:
# square function
def square(num):
    return num**2

# 2. 
def square(num): return num**2

#3. 
lambda num: num**2
square = lambda num: num**2

#
even = lambda x: x%2==0

# grad the first character of the string
first = lambda x: x[0]

# reverse a string
rev = lambda x: x[::-1]

# we can accept more than one function in a lambda expression
addition = lambda x, y : x + y

# lambda function better used in conjuction with the map(),
# filter() and reduce() 

What is the difference between Map vs flatMap ?

We know RDD now, How to aggregate the value with them, it's only possible if we understand the concepts of working with Key Value Pairs.

In [28]:
# collecting every thing as a single flat map
text2 = sc.textFile('dataset/example.txt')
print(text2.count())
text2.flatMap(lambda line: line.split()).collect()
print(text2)

4
dataset/example.txt MapPartitionsRDD[35] at textFile at NativeMethodAccessorImpl.java:0


In [29]:
%%writefile dataset/service-example.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
301         07/05/2023    105       FL       143          250.00
305         07/10/2023    210       IL       119          199.99
302         07/08/2023    307       TX       163          350.50
307         07/12/2023    111       CA       155          425.00
303         07/09/2023    208       NY       147          675.25
309         07/13/2023    120       FL       128          120.00
304         07/09/2023    219       WA       126          305.75
308         07/13/2023    305       OR       149          880.90
306         07/11/2023    410       NV       161          405.60


Writing dataset/service-example.txt


In [None]:
serv = sc.textFile('dataset/service-example.txt')
print(serv.take(2))

map1 = serv.map(lambda x: x.split()).take(3)
print(map1)

# removing the first hashtag
serv_new = serv.map(lambda x: x[1:] if x[0]=='#' else x).collect()
print(serv_new)

serv2 = serv.map(lambda x: x[1:] if x[0]=='#' else x).map(lambda x: x.split()).collect()
print(serv2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount', '301         07/05/2023    105       FL       143          250.00']
[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'], ['301', '07/05/2023', '105', 'FL', '143', '250.00'], ['305', '07/10/2023', '210', 'IL', '119', '199.99']]
['EventId    Timestamp    Customer   State    ServiceID    Amount', '301         07/05/2023    105       FL       143          250.00', '305         07/10/2023    210       IL       119          199.99', '302         07/08/2023    307       TX       163          350.50', '307         07/12/2023    111       CA       155          425.00', '303         07/09/2023    208       NY       147          675.25', '309         07/13/2023    120       FL       128          120.00', '304         07/09/2023    219       WA       126          305.75', '308         07/13/2023    305       OR       149          880.90', '306         07/11/2023    410       NV       161          405.60']


We use methods that combine lambda expressions via use a ByKey argument. These ByKey methods will assume that data is in a Key,Value form.

find out the total sales per state ?
using the key value pair for the operation on the rdd object

In [None]:
# clean the data
serv_clean = serv.map(lambda x: x[1:] if  x[0] == '#' else x).map(lambda x: x.split())
serv_clean1 = serv_clean.collect()
serv_clean1

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['301', '07/05/2023', '105', 'FL', '143', '250.00'],
 ['305', '07/10/2023', '210', 'IL', '119', '199.99'],
 ['302', '07/08/2023', '307', 'TX', '163', '350.50'],
 ['307', '07/12/2023', '111', 'CA', '155', '425.00'],
 ['303', '07/09/2023', '208', 'NY', '147', '675.25'],
 ['309', '07/13/2023', '120', 'FL', '128', '120.00'],
 ['304', '07/09/2023', '219', 'WA', '126', '305.75'],
 ['308', '07/13/2023', '305', 'OR', '149', '880.90'],
 ['306', '07/11/2023', '410', 'NV', '161', '405.60']]

In [44]:
serv_data = serv_clean.map(lambda lst: (lst[3], lst[-1])).collect()
serv_data

[('State', 'Amount'),
 ('FL', '250.00'),
 ('IL', '199.99'),
 ('TX', '350.50'),
 ('CA', '425.00'),
 ('NY', '675.25'),
 ('FL', '120.00'),
 ('WA', '305.75'),
 ('OR', '880.90'),
 ('NV', '405.60')]

In [45]:
serv_clean.map(lambda lst: (lst[3],lst[-1]))\
         .reduceByKey(lambda amt1,amt2 : amt1+amt2)\
         .collect()

[('State', 'Amount'),
 ('FL', '250.00120.00'),
 ('IL', '199.99'),
 ('TX', '350.50'),
 ('CA', '425.00'),
 ('NY', '675.25'),
 ('WA', '305.75'),
 ('OR', '880.90'),
 ('NV', '405.60')]

In [46]:
# it forget that amount are stil string thus 
serv_clean.map(lambda lst: (lst[3],lst[-1]))\
         .reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
         .collect()

[('State', 'Amount'),
 ('FL', 370.0),
 ('IL', '199.99'),
 ('TX', '350.50'),
 ('CA', '425.00'),
 ('NY', '675.25'),
 ('WA', '305.75'),
 ('OR', '880.90'),
 ('NV', '405.60')]

In [None]:
# Grab state and amounts
# Add them
# Get rid of ('State','Amount')
# Sort them by the amount value

# there is the some error related to the task manager 
serv_clean3 = serv.map(lambda x: x[1:] if  x[0] == '#' else x).map(lambda x: x.split())
serv_clean3.map(lambda lst: (lst[3],lst[-1]))\
.reduceByKey(lambda amt1,amt2 : float(amt1)+float(amt2))\
.filter(lambda x: not x[0]=='State')\
.sortBy(lambda stateAmount: stateAmount[1], ascending=False)\
.collect()