# Big Data Fundamentals with PySpark

## Introduction to Big Data analysis with Spark

In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity='all'

from pyspark import SparkConf
from pyspark import SparkContext

In [2]:
sc = SparkContext()
sc

In [3]:
sc.version
sc.pythonVer
sc.master

'2.4.5'

'3.7'

'local[*]'

### Load data in PySpark

In [4]:
numb = range(1, 101)
spark_data = sc.parallelize(numb)
spark_data
type(spark_data)

PythonRDD[1] at RDD at PythonRDD.scala:53

pyspark.rdd.PipelinedRDD

In [6]:
lines = sc.textFile("../README.md")

In [7]:
lines
type(lines)

../README.md MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

pyspark.rdd.RDD

## Programming in PySpark RDD’s
### RDDs from Parallelized collections

In [14]:
# Create an RDD from a list of words
RDD = sc.parallelize(["Spark", "is", "a", "framework", "for", "Big Data processing"])

# Print out the type of the created object
print("The type of RDD is", type(RDD))
RDD.getNumPartitions()

The type of RDD is <class 'pyspark.rdd.RDD'>


4

### RDDs from External Datasets

In [37]:
# Print the file_path
file_path = './Big Data Fundamentals with PySpark/data/ham.txt'
print("The file_path is", file_path)

# Create a fileRDD from file_path
fileRDD = sc.textFile(file_path)

# Check the type of fileRDD
print("The file type of fileRDD is", type(fileRDD))

The file_path is ./Big Data Fundamentals with PySpark/data/ham.txt
The file type of fileRDD is <class 'pyspark.rdd.RDD'>


### Partitions in your data

In [40]:
# Check the number of partitions in fileRDD
print("Number of partitions in fileRDD is", fileRDD.getNumPartitions())

# Create a fileRDD_part from file_path with 5 partitions
fileRDD_part = sc.textFile(file_path, minPartitions = 5)

# Check the number of partitions in fileRDD_part
print("Number of partitions in fileRDD_part is", fileRDD_part.getNumPartitions())

Number of partitions in fileRDD is 2
Number of partitions in fileRDD_part is 5


### Map and Collect

In [31]:
# Create map() transformation to cube numbers
cubedRDD = spark_data.map(lambda x: x ** 3)

# Collect the results
numbers_all = cubedRDD.collect()

# Print the numbers from numbers_all
for numb in numbers_all:
    print(numb)

1
8
27
64
125
216
343
512
729
1000
1331
1728
2197
2744
3375
4096
4913
5832
6859
8000
9261
10648
12167
13824
15625
17576
19683
21952
24389
27000
29791
32768
35937
39304
42875
46656
50653
54872
59319
64000
68921
74088
79507
85184
91125
97336
103823
110592
117649
125000
132651
140608
148877
157464
166375
175616
185193
195112
205379
216000
226981
238328
250047
262144
274625
287496
300763
314432
328509
343000
357911
373248
389017
405224
421875
438976
456533
474552
493039
512000
531441
551368
571787
592704
614125
636056
658503
681472
704969
729000
753571
778688
804357
830584
857375
884736
912673
941192
970299
1000000


### Filter and Count

In [51]:
# Filter the fileRDD to select lines with Spark keyword
fileRDD_filter = fileRDD.filter(lambda line: 'the' in line.split(' '))

# How many lines are there in fileRDD?
print("The total number of lines with the keyword Spark is", fileRDD_filter.count())

# Print the first four lines of fileRDD
for line in fileRDD_filter.take(4): 
    print(line)

The total number of lines with the keyword Spark is 815
if you aren't here in the next  &lt;#&gt;  hours imma flip my shit
No. I meant the calculation is the same. That  &lt;#&gt; units at  &lt;#&gt; . This school is really expensive. Have you started practicing your accent. Because its important. And have you decided if you are doing 4years of dental school or if you'll just do the nmde exam.
I'm taking derek &amp; taylor to walmart, if I'm not back by the time you're done just leave the mouse on my desk and I'll text you when priscilla's ready
I can't believe how attached I am to seeing you every day. I know you will do the best you can to get to me babe. I will go to teach my class at your midnight


### ReduceBykey and Collect

In [52]:
# Create PairRDD Rdd with key value pairs
Rdd = sc.parallelize([(1, 2), (3, 4), (3, 6), (4, 5)])

# Apply reduceByKey() operation on Rdd
Rdd_Reduced = Rdd.reduceByKey(lambda x, y: x + y)

# Iterate over the result and print the output
for num in Rdd_Reduced.collect(): 
    print("Key {} has {} Counts".format(num[0], num[1]))

Key 4 has 5 Counts
Key 1 has 2 Counts
Key 3 has 10 Counts


### SortByKey and Collect

In [53]:
# Sort the reduced RDD with the key by descending order
Rdd_Reduced_Sort = Rdd_Reduced.sortByKey(ascending=False)

# Iterate over the result and print the output
for num in Rdd_Reduced_Sort.collect():
    print("Key {} has {} Counts".format(num[0], num[1]))

Key 4 has 5 Counts
Key 3 has 10 Counts
Key 1 has 2 Counts


### CountingBykeys

In [61]:
Rdd.collect()

[(1, 2), (3, 4), (3, 6), (4, 5)]

In [58]:
# Transform the rdd with countByKey()
total = Rdd.countByKey()

# What is the type of total?
print("The type of total is", type(total))

# Iterate over the total and print the output
for k, v in total.items(): 
    print("key", k, "has", v, "counts")

The type of total is <class 'collections.defaultdict'>
key 1 has 1 counts
key 3 has 2 counts
key 4 has 1 counts


### Create a base RDD and transform it

In [63]:
# Create a baseRDD from the file path
baseRDD = sc.textFile('./Big Data Fundamentals with PySpark/data/Complete_Shakespeare.txt')

# Split the lines of baseRDD into words
splitRDD = baseRDD.flatMap(lambda x: x.split(' '))

# Count the total number of words
print("Total number of words in splitRDD:", splitRDD.count())

Total number of words in splitRDD: 194074


In [69]:
stop_words = ['i','me','my','myself','we','our','ours','ourselves','you','your','yours','yourself','yourselves','he',
'him','his','himself','she','her','hers','herself','it','its','itself','they','them','their','theirs','themselves',
'what','which','who','whom','this','that','these','those','am','is','are','was','were','be','been','being','have',
'has','had','having','do','does','did','doing','a','an','the','and','but','if','or','because','as','until','while',
'of','at','by','for','with','about','against','between','into','through','during','before','after','above','below',
'to','from','up','down','in','out','on','off','over','under','again','further','then','once','here','there','when',
'where','why','how','all','any','both','each','few','more','most','other','some','such','no','nor','not','only',
'own','same','so','than','too','very','can','will','just','don','should','now']

In [70]:
# Convert the words in lower case and remove stop words from stop_words
splitRDD_no_stop = splitRDD.filter(lambda x: x.lower() not in stop_words)

# Create a tuple of the word and 1 
splitRDD_no_stop_words = splitRDD_no_stop.map(lambda w: (w, 1))

# Count of the number of occurences of each word
resultRDD = splitRDD_no_stop_words.reduceByKey(lambda x, y: x + y)

In [72]:
# Display the first 10 words and their frequencies
for word in resultRDD.take(10):
    print(word)

# Swap the keys and values 
resultRDD_swap = resultRDD.map(lambda x: (x[1], x[0]))

# Sort the keys in descending order
resultRDD_swap_sort = resultRDD_swap.sortByKey(ascending=False)

# Show the top 10 most frequent words and their frequencies
for word in resultRDD_swap_sort.take(10):
    print("{} has {} counts". format(word[1], word[0]))

('Project', 9)
('EBook', 1)
('Shakespeare', 12)
('', 65498)
('use', 38)
('anyone', 1)
('anywhere', 1)
('restrictions', 1)
('whatsoever.', 1)
('may', 162)
 has 65498 counts
thou has 650 counts
thy has 574 counts
shall has 393 counts
would has 311 counts
good has 295 counts
thee has 286 counts
love has 273 counts
Enter has 269 counts
th' has 254 counts
