![Spark Image](https://upload.wikimedia.org/wikipedia/commons/thumb/f/f3/Apache_Spark_logo.svg/1200px-Apache_Spark_logo.svg.png)

# Data Wrangling using RDDs

## Starting/initialising Spark Session

The programming language Python is used for the implementation in this course - for this we use 'pyspark. (PySpark documentation https://spark.apache.org/docs/latest/api/python/)
PySpark is an interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs, but also provides the PySpark shell for interactively analyzing your data in a distributed environment.

In [None]:
# ipmort libraries from pyspark 
from pyspark import SparkConf, SparkContext

# set values for Spark configuration
conf = SparkConf().setMaster("local").setAppName("Data Analysis")

# get (if already running) or create a Spark Context
sc = SparkContext.getOrCreate(conf=conf)

In [4]:
# check (try) if Spark context variable (sc) exists and print information about the Spark context
try:
    sc
except NameError:
    print("Spark context does not context exist. Please create Spark context first (run cell above).")
else:
    configurations = sc.getConf().getAll()
    for item in configurations: print(item)

('spark.master', 'local')
('spark.app.startTime', '1643722169725')
('spark.app.id', 'local-1643722170645')
('spark.driver.port', '55393')
('spark.rdd.compress', 'True')
('spark.driver.host', '192.168.178.62')
('spark.serializer.objectStreamReset', '100')
('spark.submit.pyFiles', '')
('spark.executor.id', 'driver')
('spark.submit.deployMode', 'client')
('spark.app.name', 'Data Analysis')
('spark.ui.showConsoleProgress', 'true')


In [5]:
# print link to Spark UI, Version, Master and AppName
sc

> *For the Tutorials I will be using MovieLens 1M Dataset you can get it from the [Grouplens](https://grouplens.org/datasets/movielens/) website.*

*Lets read in the ratings.dat nad create a ratings RDDs*

In [6]:
# The data dataset is pointed to by path.
# The path can be either a single text file or a directory - in this case a sinlge file
ratingsRDD = sc.textFile("data/ml-1m/ratings.dat")

In [7]:
# display the first five rows of the RDD
ratingsRDD.take(5)

                                                                                

['1::1193::5::978300760',
 '1::661::3::978302109',
 '1::914::3::978301968',
 '1::3408::4::978300275',
 '1::2355::5::978824291']

*Thats it We have read the Text file and we are printing out the first 5 rows using `take action` and make sure you don't use a collect action here because that will printout the whole RDD.*

*Now if you check the readme file provided in the Dataset these are the columns in the Data*

>*UserID::MovieID::Rating::Timestamp*

*Lets check counts on each ratings given, But first we need to split our data and for that we need to make use of a Transformation.*

In [8]:
# split each row of the text file at '::' string and select the third element of each row
ratings = ratingsRDD.map(lambda x: x.split('::')[2])

In [9]:
# display the class type of the 'ratings' object
# type() method returns class type of the argument(object) passed as parameter.
# Knowing the class type is often important in order to be able to use the object correctly in other functions.
type(ratings)

pyspark.rdd.PipelinedRDD

In [10]:
# display the first five rows
ratings.take(5)

['5', '3', '3', '4', '5']

In [11]:
# Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.
result = ratings.countByValue()

                                                                                

In [12]:
# display the class type of the 'ratings' object
type(result)

collections.defaultdict

In [13]:
# display the 'result' object. The tuples display the rating and the count (rating:count).
result

defaultdict(int,
            {'5': 226310, '3': 261197, '4': 348971, '2': 107557, '1': 56174})

*So you can see how easy it was to get the ratings counter. As it has returned a dictionary lets sort and print the results.*

In [14]:
# the object 'results' is of class type collections
# import collections library be able to apply function on this type.
import collections

# sort the collection by ratings to object 'sortedResults'
sortedResults = collections.OrderedDict(sorted(result.items()))
# Let's create a nice output - print heading
print(f"{'Ratings':10}{'Count'}\n")
# loop through 'sortedResults' and replace rating digit by number of '★'
for key, value in sortedResults.items():
    print(f"{'★'* int(key):{10}}{value}")

Ratings   Count

★         56174
★★        107557
★★★       261197
★★★★      348971
★★★★★     226310


*Lets look at another example and check which are the most rated movies.*

In [15]:
# Movie information is stored in the file "movies.dat" and of the following format: 'MovieID::Title::Genres'
# define the function 'loadMovieNames' to load the file and extract the movie title from each row 
def loadMovieNames():
    movieNames = {}
    with open("data/ml-1m/movies.dat", encoding= 'ISO-8859-1') as f:
        for line in f:
            fields = line.split('::')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [16]:
# Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather 
# than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a 
# large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using 
# efficient broadcast algorithms to reduce communication cost. For more information please refer to this link
# https://spark.apache.org/docs/3.2.0/rdd-programming-guide.html#broadcast-variables.

nameDict = sc.broadcast(loadMovieNames())

In [17]:
# let's extract the 'MovieID' from the previously created 'ratingsRDD' dataset
movies = ratingsRDD.map(lambda x: (int(x.split("::")[1]), 1))

In [18]:
# display the first five rows of the 'movies' dataset
movies.take(5)

[(1193, 1), (661, 1), (914, 1), (3408, 1), (2355, 1)]

In [19]:
# Sum the number of entries for each MovieID
movieCounts = movies.reduceByKey(lambda x, y: x + y)

In [20]:
# display the first five rows (output tuple: MovieID, Sum of entries for each MovieID)
movieCounts.take(5)

                                                                                

[(1193, 1725), (661, 525), (914, 636), (3408, 1315), (2355, 1703)]

In [21]:
# flipp the tuple. From (MovieID, Sum of entries for each MovieID) to (Sum of entries for each MovieID, MovieID)
flipped = movieCounts.map( lambda x : (x[1], x[0]))
# sort the entries by sum of entries
sortedMovies = flipped.sortByKey(ascending=False)

In [22]:
# display the first five rows of the 'flipped' dataset
flipped.take(5)

[(1725, 1193), (525, 661), (636, 914), (1315, 3408), (1703, 2355)]

In [23]:
# display the first five rows of the 'sorted' dataset
sortedMovies.take(5)

[(3428, 2858), (2991, 260), (2990, 1196), (2883, 1210), (2672, 480)]

In [24]:
# replace the MovieID with movie names loaded via the 'nameDict' broadcast variable
sortedMoviesWithNames = sortedMovies.map(lambda countMovie : (nameDict.value[countMovie[1]], countMovie[0]))

In [25]:
# display the first ten rows
sortedMoviesWithNames.take(10)

[('American Beauty (1999)', 3428),
 ('Star Wars: Episode IV - A New Hope (1977)', 2991),
 ('Star Wars: Episode V - The Empire Strikes Back (1980)', 2990),
 ('Star Wars: Episode VI - Return of the Jedi (1983)', 2883),
 ('Jurassic Park (1993)', 2672),
 ('Saving Private Ryan (1998)', 2653),
 ('Terminator 2: Judgment Day (1991)', 2649),
 ('Matrix, The (1999)', 2590),
 ('Back to the Future (1985)', 2583),
 ('Silence of the Lambs, The (1991)', 2578)]

*Now these are top 10 most rated movies.*

*Now lets look at movies with most 5 star ratings*

In [26]:
# define the function 'filter_five_star' to filter the rows with five star ratings only
def filter_five_star(line):
    splited_line= line.split("::")
    if splited_line[2] == '5':
        return line
        
# create new dataset using the 'filter_five_star' function
five_start_rattingsRDD= ratingsRDD.filter(lambda x: filter_five_star(x))
# display the first five rows
five_start_rattingsRDD.take(5)

['1::1193::5::978300760',
 '1::2355::5::978824291',
 '1::1287::5::978302039',
 '1::2804::5::978300719',
 '1::595::5::978824268']

In [27]:
# let's repeat the steps of the 'top 10 most rated movies' example from above on the five star ratings RDD
# let's extract the 'MovieID' from the previously created 'ratingsRDD' dataset
five_start_movies = five_start_rattingsRDD.map(lambda x: (int(x.split("::")[1]), 1))
# Sum the number of entries for each MovieID
five_start_movieCounts = five_start_movies.reduceByKey(lambda x, y: x + y)
# flipp the tuple. From (MovieID, Sum of entries for each MovieID) to (Sum of entries for each MovieID, MovieID)
flipped = five_start_movieCounts.map( lambda x : (x[1], x[0]))
# sort the entries by sum of entries
five_start_sortedMovies = flipped.sortByKey(ascending=False)
# replace the MovieID with movie names loaded via the 'nameDict' broadcast variable
five_start_sortedMoviesWithNames = five_start_sortedMovies.map(lambda countMovie : (nameDict.value[countMovie[1]], countMovie[0]))

In [28]:
# display the first ten rows
five_start_sortedMoviesWithNames.take(10)

                                                                                

[('American Beauty (1999)', 1963),
 ('Star Wars: Episode IV - A New Hope (1977)', 1826),
 ('Raiders of the Lost Ark (1981)', 1500),
 ('Star Wars: Episode V - The Empire Strikes Back (1980)', 1483),
 ("Schindler's List (1993)", 1475),
 ('Godfather, The (1972)', 1475),
 ('Shawshank Redemption, The (1994)', 1457),
 ('Matrix, The (1999)', 1430),
 ('Saving Private Ryan (1998)', 1405),
 ('Sixth Sense, The (1999)', 1385)]

*Lets look at number of movies produced in each year*

In [29]:
# Movie information in the file "movies.dat" is stored in the following format: 'MovieID::Title::Genres'
# read the text file and create a new RDD
moviesRDD =sc.textFile("data/ml-1m/movies.dat") 

In [30]:
# display the first five rows
moviesRDD.take(5)

["1::Toy Story (1995)::Animation|Children's|Comedy",
 "2::Jumanji (1995)::Adventure|Children's|Fantasy",
 '3::Grumpier Old Men (1995)::Comedy|Romance',
 '4::Waiting to Exhale (1995)::Comedy|Drama',
 '5::Father of the Bride Part II (1995)::Comedy']

In [31]:
# it is necessary to extract the 'year' of the movie from a string
# this could be achieved by using 'array slicing' (part of python language)
'Toy Story (1995)'[-6:]

'(1995)'

In [32]:
# to be more flexible in the extraction of the 'year' value we are using regular expressions
# therefore we have to import the python library 're'
import re
# now apply a 'regex' (regular expression) to a string and 'deselect' the parentheses
re.search(r'\([0-9]{4}\)$','Grumpier Old Men (1995)').group(0)[1:-1]

'1995'

In [33]:
# define a function to extract the year from every row using regex 
def get_year(line):
    split_line= line.split('::')
    year= re.search(r'\([0-9]{4}\)$',split_line[1]).group(0)[1:-1]
    return (year, 1)

# create a dataset with only the year value from each row    
year_RDD= moviesRDD.map(lambda x: get_year(x))

In [34]:
# display the first five rows
year_RDD.take(5)

[('1995', 1), ('1995', 1), ('1995', 1), ('1995', 1), ('1995', 1)]

In [35]:
# Sum the number of entries for each year
yearCounts = year_RDD.reduceByKey(lambda x, y: x + y)

In [36]:
# display the first five rows
yearCounts.take(5)



[('1995', 342), ('1994', 257), ('1996', 345), ('1976', 21), ('1993', 165)]

In [37]:
# sort the entries ascending by the 'year'
ascending_sorted_yearCounts = yearCounts.sortByKey()

In [38]:
# display the first five rows
ascending_sorted_yearCounts.take(5)

[('1919', 3), ('1920', 2), ('1921', 1), ('1922', 2), ('1923', 3)]

In [39]:
# sort the entries descending by the 'year'
descending_sorted_yearCounts = yearCounts.sortByKey(ascending= False)

In [40]:
# display the first five rows
descending_sorted_yearCounts.take(5)

[('2000', 156), ('1999', 283), ('1998', 337), ('1997', 315), ('1996', 345)]

*Years with most movies*

In [41]:
# flipp the tuple. From (year, Sum of entries for each year) to (Sum of entries for each year, year)
flipped = yearCounts.map( lambda x : (x[1], x[0]))
# sort the entries by sum of entries for each year
descending_sorted_yearCounts = flipped.sortByKey(ascending= False)

In [42]:
# display the first ten rows
descending_sorted_yearCounts.take(10)

[(345, '1996'),
 (342, '1995'),
 (337, '1998'),
 (315, '1997'),
 (283, '1999'),
 (257, '1994'),
 (165, '1993'),
 (156, '2000'),
 (104, '1986'),
 (102, '1992')]

*Lets find out the which age group is most active on the platform*

In [43]:
# display the first five rows of the original ratingsRDD
# date format of the data is UserID::MovieID::Rating::Timestamp
ratingsRDD.take(5)

['1::1193::5::978300760',
 '1::661::3::978302109',
 '1::914::3::978301968',
 '1::3408::4::978300275',
 '1::2355::5::978824291']

In [44]:

# User information is stored in the file "users.dat" and of the following 
# format: 'UserID::Gender::Age::Occupation::Zip-code'
# The values in the Age field only have values from the set 1, 18, 25, 35, 45, 50, 56. 
# This corresponds to the age groups (see readme file).

# Define the function 'load_age_group' to load the file and extract the age/age group from each row
def load_age_group():
    # define dictionary 'age_group' with 7 bins
    age_group= {'1':  "Under 18", '18':  "18-24", '25':  "25-34", '35':  "35-44", '45':  "45-49", '50':  "50-55", '56':  "56+"}
    # define empty dictionary 'user_ageGroup'
    user_ageGroup = {}
    # open the file and 'loop' trough all lines
    with open("data/ml-1m/users.dat") as f:
        for line in f:
            # select age fieled value
            fields = line.split('::')
            # add info to the dict user_ageGroup) and replace the age value by the 
            # corresponding age_group range value: 18 -> "18-24"
            user_ageGroup[int(fields[0])] = age_group[fields[2]]
    return user_ageGroup

In [45]:
# define broadcast variable ageGroupDict and load age_groups from file (using function load_age_group)
ageGroupDict = sc.broadcast(load_age_group())

In [46]:
# display the first 5 istems of the 'ageGroupDict' broadcast variable
list(ageGroupDict.value.items())[:5]

[(1, 'Under 18'), (2, '56+'), (3, '25-34'), (4, '45-49'), (5, '25-34')]

In [47]:
# select the UserID field from ratings dataset
users_ratings = ratingsRDD.map(lambda x: (int(x.split("::")[0]), 1))
# sum the number of ratings by UserID
count_user_ratings = users_ratings.reduceByKey(lambda x, y: x + y)

In [48]:
# display the first five rows
count_user_ratings.take(5)

                                                                                

[(1, 53), (2, 129), (3, 51), (4, 21), (5, 198)]

In [49]:
# flipp the tuple. From (UserID, Sum of entries for each UserID) to (Sum of entries for each UserID, UserID)
flipped = count_user_ratings.map( lambda x : (x[1], x[0]))
# replace the UserID with the age group of the user
age_group_count = flipped.map(lambda countuser : (ageGroupDict.value[countuser[1]], countuser[0]))

In [50]:
# sum the number of ratings by age group
age_group_counts= age_group_count.reduceByKey(lambda x , y: x + y)

In [51]:
# sort the age_age_group_counts dataset ascending on age groups
age_group_counts.map(lambda x: (x[1], x[0])).sortByKey(ascending= False).map(lambda x: (x[1], x[0])).collect()

[('25-34', 395556),
 ('35-44', 199003),
 ('18-24', 183536),
 ('45-49', 83633),
 ('50-55', 72490),
 ('56+', 38780),
 ('Under 18', 27211)]

*Lets Load in another fake social network dataset.*

In [52]:
# create a new RDD friends and load a text file 
# data format structure is ID, Name, Age, Number of fake friends
friends = sc.textFile("data/fakefriends.csv")

In [53]:
# display the first five rows
friends.take(5)

['0,Will,33,385',
 '1,Jean-Luc,26,2',
 '2,Hugh,55,221',
 '3,Deanna,40,465',
 '4,Quark,68,21']

In [54]:
# display the number of rows in the dataset
friends.count()

500

*Lets look at the average number of friends broken down by age in this Dataset.*

In [55]:
# define the funtion parseLine to extract age and numFriends fields from one row
def parseLine(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)

In [56]:
# change the friendsRDD by extracting age and numFriends fields from all rows (using the parseLine function)
friendsRDD = friends.map(parseLine)
# display the first five rows
friendsRDD.take(5)

[(33, 385), (26, 2), (55, 221), (40, 465), (68, 21)]

In [57]:
# For each age we want to sum the number of fake friends AND how many row with the age exist.
# Therefore we add a 'tuple' to the 'number of fake friends' field with the second value as '1'.
# The output of the 'mapValues(lambda x: (x, 1))' function would look like:
# [(33, (385, 1)), (26, (2, 1)), (55, (221, 1)), (40, (465, 1)), (68, (21, 1))]

# In the second part of the function ('reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))')
# the sum of the 'number of fake friends' and the '1' values for each 'age' is formed.
totalsByAge = friendsRDD.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

In [58]:
# display the first five rows
totalsByAge.take(5)



[(33, (3904, 12)),
 (26, (4115, 17)),
 (55, (3842, 13)),
 (40, (4264, 17)),
 (68, (2696, 10))]

In [59]:
# to calculate the average of fake friends for a person of a specific age we need to divide 
# the 'sum of fake friends' by 'number of people with specific age'
averagesByAge = totalsByAge.mapValues(lambda x: round(x[0] / x[1], 2))

In [60]:
# display the first five rows
averagesByAge.take(5)

[(33, 325.33), (26, 242.06), (55, 295.54), (40, 250.82), (68, 269.6)]

*Lets load up another dataset*

In [61]:
# create a new RDD friends and load a text file 
# data format structure is stationID, timestamp, entryType, temperature ... followed by none relevant fields
temp = sc.textFile("data/1800.csv")

In [62]:
# display the first five rows
temp.take(5)

['ITE00100554,18000101,TMAX,-75,,,E,',
 'ITE00100554,18000101,TMIN,-148,,,E,',
 'GM000010962,18000101,PRCP,0,,,E,',
 'EZE00100082,18000101,TMAX,-86,,,E,',
 'EZE00100082,18000101,TMIN,-135,,,E,']

*Lets check the weather stations with minimum temperatures in 1800.*

In [63]:
# define the funtion parseLine to extract stationID, entryType and numFriends fields from one row
def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

In [64]:
# create new RDD with fields stationID, entryType and temperature
tempRDD = temp.map(parseLine)

In [65]:
# display the first five rows
tempRDD.take(5)

[('ITE00100554', 'TMAX', 18.5),
 ('ITE00100554', 'TMIN', 5.359999999999999),
 ('GM000010962', 'PRCP', 32.0),
 ('EZE00100082', 'TMAX', 16.52),
 ('EZE00100082', 'TMIN', 7.699999999999999)]

In [66]:
# filter rows with entryType 'TMIN'
minTemps = tempRDD.filter(lambda x: "TMIN" in x[1])

In [67]:
# display the first five rows
minTemps.take(5)

[('ITE00100554', 'TMIN', 5.359999999999999),
 ('EZE00100082', 'TMIN', 7.699999999999999),
 ('ITE00100554', 'TMIN', 9.5),
 ('EZE00100082', 'TMIN', 8.599999999999998),
 ('ITE00100554', 'TMIN', 23.72)]

In [68]:
# select field "0" (stationID) and field "2" (temperature) and create dataset stationTemps
stationTemps = minTemps.map(lambda x: (x[0], x[2]))

In [69]:
# display the first five rows
stationTemps.take(5)

[('ITE00100554', 5.359999999999999),
 ('EZE00100082', 7.699999999999999),
 ('ITE00100554', 9.5),
 ('EZE00100082', 8.599999999999998),
 ('ITE00100554', 23.72)]

In [70]:
# select the minimum (rounded) of each stationID
minTemps = stationTemps.reduceByKey(lambda x, y: round(min(x,y), 2))

In [71]:
# display the dataset - only two station reported minimum temperatures
minTemps.collect()



[('ITE00100554', 5.36), ('EZE00100082', 7.7)]

*Lets do another word count on a text file*

In [72]:
# create the dataset book and read the text file
book = sc.textFile("data/Book.txt")

In [73]:
# display the first two rows
book.take(2)

['Self-Employment: Building an Internet Business of One',
 'Achieving Financial and Personal Freedom through a Lifestyle Technology Business']

In [74]:
# Define the function normalizeWords.

# The function uses re.compile(). Python’s re.compile() method is used to compile a regular expression 
# pattern provided as a string into a regex pattern object (re.Pattern). The command re.compile is explained 
# in the Python docs (https://docs.python.org/3/library/re.html#re.compile)

# The specific regex expression searches for groups that have alphanumerics (that's the \w part) or 
# apostrophes (which is also in those square brackets) that are 1 or longer. 
# Note that whitespace is not a match, so this, generally speaking, breaks a line into words.

def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

In [75]:
# create the dataset word by using the function normalizeWords and the Spark flatmap transformation

# flatMap() is a transformation operation that flattens the RDD/DataFrame (array/map DataFrame columns) 
# after applying the function on every element and returns a new PySpark RDD/DataFrame.

words = book.flatMap(normalizeWords)

In [76]:
# count the number of occurrences of each word in the text
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

In [77]:
# display the first five rows
wordCounts.take(5)



[('self', 111),
 ('employment', 75),
 ('building', 33),
 ('an', 178),
 ('internet', 26)]

In [78]:
# sort the wordsCount dataset descending by he number of occurrences
wordCountsSorted = wordCounts.map(lambda x: (x[1], x[0])).sortByKey(ascending= False)

In [79]:
# display the first ten rows
wordCountsSorted.take(10)

[(1878, 'you'),
 (1828, 'to'),
 (1420, 'your'),
 (1292, 'the'),
 (1191, 'a'),
 (970, 'of'),
 (934, 'and'),
 (772, ''),
 (747, 'that'),
 (649, 'it')]

In [80]:
# stop the underlying SparkContext.
try:
    sc
except NameError:
    print("Spark context does not context exist - nothing to stop.")
else:
    sc.stop()

---
*This Notebook was an introduction to how to work with data using RDDs (Resilient distributed
datasets). Now we want to introduce a more modern way using Apache Spark using Structured APIs.*

**Next UP: [Structured APIs](./04_Structured_APIs.ipynb)**