<a href="https://colab.research.google.com/github/GauraoM/ML-Basics-Definitions/blob/main/RDD_Transformations_and_actions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
from pyspark import SparkContext

In [8]:
sc = SparkContext()

In [9]:
# Create a text file
%%writefile example.txt
first line
second line
third line
fourth line

Writing example.txt


In [10]:
#read a text file
textFile = sc.textFile("example.txt") 

In [11]:
# Perform s transformation to RDD
textFile.count()

4

In [13]:
secfind = textFile.filter(lambda line: 'second' in line)
secfind

PythonRDD[3] at RDD at PythonRDD.scala:53

In [14]:
# Perform action on transformation
secfind.collect()

['second line']

#### Creating an RDD from a text file:

In [15]:
%%writefile example2.txt
first 
second line
the third line
then a fourth line

Writing example2.txt


In [16]:
from pyspark import SparkContext

In [18]:
# Show RDD
sc.textFile("example2.txt")

example2.txt MapPartitionsRDD[5] at textFile at NativeMethodAccessorImpl.java:0

In [19]:
# Save a reference to this RDD
text_rdd = sc.textFile('example2.txt')

In [20]:
# Map a function (or lambda expression) to each line
# Then collect the results.
words = text_rdd.map(lambda line:line.split())

In [21]:
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

#### Map vs flatMap

In [23]:
text_rdd.flatMap(lambda line:line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

#### RDDs and Key Value Pairs

In [24]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


In [25]:
services = sc.textFile('services.txt')

In [26]:
# Display top 2 lines
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [27]:
# Split line and display top3
services.map(lambda line:line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [30]:
# Remove hash from EventId
clean = services.map(lambda line:line[1:] if line[0]=='#' else line)

In [31]:
clean = clean.map(lambda line:line.split())

In [32]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

#### Using Key Value Pairs for Operations

In [33]:
# Grabbing fields
pairs = clean.map(lambda lst: (lst[3],lst[-1]))

In [34]:
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [37]:
#reducekey
rekey = pairs.reduceByKey(lambda amt1,amt2: float(amt1)+ float(amt2))

In [38]:
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [39]:
# Grab (state,Amount)
step1 = clean.map(lambda lst: (lst[3],lst[-1]))
# Reduce by Key
step2 = step1.reduceByKey(lambda amt1,amt2: float(amt1)+ float(amt2))
# Get rid of Amount & State title
step3 = step2.filter(lambda x: not x[0]=="State")
# Sort Result by Amount
step4 = step3.sortBy(lambda stAmount: stAmount[1], ascending=False)
#Action
step4.collect()  

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

In [40]:
# Unpacking for readability
x = ['ID','State','Amount']

In [42]:
def func(id_st_amt):
  (id,state,amount) = id_st_amt
  return amount

In [43]:
func(x)

'Amount'