# Solution-1: Introduction to PySpark RDD


## Import and initialize SparkContext

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[4]").appName("solution-1").getOrCreate()
sc = spark.sparkContext

## Create Parallelized Collections

In [2]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

type(distData)

pyspark.rdd.RDD

## TODO: Read external data file
### Use SparkContext's textFile function to read in a text file

In [3]:
textFilePath = './emails.txt'

emails = sc.textFile(textFilePath)
type(emails)

pyspark.rdd.RDD

## Generate a list of data, from 1 to 10

In [5]:
data = list(range(1,11))
print(data)

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


## Parallelize the data with 2 partitions

In [5]:
numbers = sc.parallelize(data,2)

## Print RDD

In [6]:
print(numbers)

print(numbers.collect())

ParallelCollectionRDD[3] at parallelize at PythonRDD.scala:475
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


## Get only even numbers, and collect them

In [7]:
numbers.filter(lambda x: x % 2 == 0).collect()

[2, 4, 6, 8, 10]

## TODO: find emails with hotmail domain

In [8]:
emails.filter(lambda e: '@hotmail' in e).collect()

['leslie@hotmail.com', 'andrew@hotmail.com']

## Square all the numbers in the list using the map operation

In [9]:
numbers.map(lambda x: x*x).collect()

[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

## Use flatMap to apply a function that returns a list and flatten the result

In [10]:
m = numbers.map(lambda x: [x**2, x**3]).collect()

fm = numbers.flatMap(lambda x: [x**2, x**3]).collect()

print(m)
print(fm)

[[1, 1], [4, 8], [9, 27], [16, 64], [25, 125], [36, 216], [49, 343], [64, 512], [81, 729], [100, 1000]]
[1, 1, 4, 8, 9, 27, 16, 64, 25, 125, 36, 216, 49, 343, 64, 512, 81, 729, 100, 1000]


## TODO: separate username and domain from all emails

### eg: marshuang80@gmail.com  ->  [marshuang80, gmail.com]

In [11]:
# Hint use the pyhton split() function
username_domain = emails.map(lambda x: x.split('@'))
username_domain.collect()

[['zhu', 'icloud.com'],
 ['zhu', 'live.com'],
 ['anna', 'sbcglobal.net'],
 ['andrew', 'yahoo.ca'],
 ['andrew', 'gmail.com'],
 ['teddy', 'comcast.net'],
 ['teddy', 'live.com'],
 ['james', 'yahoo.ca'],
 ['james', 'msn.com'],
 ['leslie', 'optonline.net'],
 ['jaarnial', 'outlook.com'],
 ['leslie', 'hotmail.com'],
 ['teddy', 'yahoo.ca'],
 ['jeffcovey', 'outlook.com'],
 ['gator', 'live.com'],
 ['conteb', 'verizon.net'],
 ['aegreene', 'me.com'],
 ['nichoj', 'icloud.com'],
 ['andrew', 'hotmail.com'],
 ['kyle', 'att.net'],
 ['teddy', 'comcast.net'],
 ['kyle', 'yahoo.ca'],
 ['gator', 'msn.com'],
 ['albert', 'yahoo.com'],
 ['nichoj', 'verizon.net'],
 ['albert', 'yahoo.com'],
 ['albert', 'mac.com'],
 ['andrew', 'sbcglobal.net'],
 ['leslie', 'optonline.net'],
 ['kyle', 'msn.com'],
 ['anna', 'aol.com'],
 ['anna', 'msn.com'],
 ['adamk', 'yahoo.ca']]

## RDD Key-Value Pairs

In [12]:
username_domain.keys().collect()

['zhu',
 'zhu',
 'anna',
 'andrew',
 'andrew',
 'teddy',
 'teddy',
 'james',
 'james',
 'leslie',
 'jaarnial',
 'leslie',
 'teddy',
 'jeffcovey',
 'gator',
 'conteb',
 'aegreene',
 'nichoj',
 'andrew',
 'kyle',
 'teddy',
 'kyle',
 'gator',
 'albert',
 'nichoj',
 'albert',
 'albert',
 'andrew',
 'leslie',
 'kyle',
 'anna',
 'anna',
 'adamk']

In [13]:
username_domain.values().collect()

['icloud.com',
 'live.com',
 'sbcglobal.net',
 'yahoo.ca',
 'gmail.com',
 'comcast.net',
 'live.com',
 'yahoo.ca',
 'msn.com',
 'optonline.net',
 'outlook.com',
 'hotmail.com',
 'yahoo.ca',
 'outlook.com',
 'live.com',
 'verizon.net',
 'me.com',
 'icloud.com',
 'hotmail.com',
 'att.net',
 'comcast.net',
 'yahoo.ca',
 'msn.com',
 'yahoo.com',
 'verizon.net',
 'yahoo.com',
 'mac.com',
 'sbcglobal.net',
 'optonline.net',
 'msn.com',
 'aol.com',
 'msn.com',
 'yahoo.ca']

## Reduce by key

In [14]:
data = ["a", "b", "a", "a", "b", "b", "b", "b"]
rdd = sc.parallelize(data)

pairRDD = rdd.map(lambda x: (x, 1))

pairRDD.reduceByKey(lambda x,y: x+y).collect()

[('b', 5), ('a', 3)]

## TODO: count the number of domains with the same username

In [15]:
# do another mapping operation to make all domains in a list
username_domain = username_domain.map(lambda x: (x[0],1))
print("** Results from mapping values to list")
print(username_domain.top(3))

print("\n** Results from reduceByKey ** ")
username_domain.reduceByKey(lambda val1, val2: val1 + val2).collect()

** Results from mapping values to list
[('zhu', 1), ('zhu', 1), ('teddy', 1)]

** Results from reduceByKey ** 


[('anna', 3),
 ('andrew', 4),
 ('james', 2),
 ('leslie', 3),
 ('jeffcovey', 1),
 ('gator', 2),
 ('aegreene', 1),
 ('nichoj', 2),
 ('adamk', 1),
 ('zhu', 2),
 ('teddy', 4),
 ('jaarnial', 1),
 ('conteb', 1),
 ('kyle', 3),
 ('albert', 3)]

## Very important to stop Spark

In [16]:
sc.stop()