![Spark Image](https://upload.wikimedia.org/wikipedia/commons/thumb/f/f3/Apache_Spark_logo.svg/1200px-Apache_Spark_logo.svg.png)

# Data Wrangling using RDDs

## Starting Spark Session

The programming language Python is used for the implementation in this course - for this we use 'pyspark. (PySpark documentation https://spark.apache.org/docs/latest/api/python/)
PySpark is an interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs, but also provides the PySpark shell for interactively analyzing your data in a distributed environment.

In [1]:
# ipmort libraries from pyspark
from pyspark import SparkConf, SparkContext

# set values for Spark configuration
conf = SparkConf().setMaster("local").setAppName("Data Analysis")

# get (if already running) or create a Spark Context
sc = SparkContext.getOrCreate(conf=conf)

22/01/24 16:40:49 WARN Utils: Your hostname, Gerhards-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.178.62 instead (on interface en8)
22/01/24 16:40:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/24 16:40:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# check (try) if Spark context variable (sc) exists and print information about the Spark context
try:
    sc
except NameError:
    print("Spark context does not context exist. Please create Spark context first (run cell above).")
else:
    configurations = sc.getConf().getAll()
    for item in configurations: print(item)

('spark.master', 'local')
('spark.rdd.compress', 'True')
('spark.app.id', 'local-1643038851310')
('spark.driver.host', '192.168.178.62')
('spark.serializer.objectStreamReset', '100')
('spark.app.startTime', '1643038849970')
('spark.submit.pyFiles', '')
('spark.executor.id', 'driver')
('spark.submit.deployMode', 'client')
('spark.driver.port', '60065')
('spark.app.name', 'Data Analysis')
('spark.ui.showConsoleProgress', 'true')


In [3]:
# print link to Spark UI, Version, Master and AppName
sc

> *For the Tutorials I will be using MovieLens 1M Dataset you can get it from the [Grouplens](https://grouplens.org/datasets/movielens/) website.*

In [7]:
ls data/ml-1m

README       movies.dat   ratings.dat  users.dat


In [8]:
!cat data/ml-1m/README

SUMMARY

These files contain 1,000,209 anonymous ratings of approximately 3,900 movies 
made by 6,040 MovieLens users who joined MovieLens in 2000.

USAGE LICENSE

Neither the University of Minnesota nor any of the researchers
involved can guarantee the correctness of the data, its suitability
for any particular purpose, or the validity of results based on the
use of the data set.  The data set may be used for any research
purposes under the following conditions:

     * The user may not state or imply any endorsement from the
       University of Minnesota or the GroupLens Research Group.

     * The user must acknowledge the use of the data set in
       publications resulting from the use of the data set
       (see below for citation information).

     * The user may not redistribute the data without separate
       permission.

     * The user may not use this information for any commercial or
       revenue-bearing purposes without first obtaining permission
       from a facult

*Lets read in the ratings.dat nad create a ratings RDDs*

In [6]:
 = sc.textFile("data/ml-1m/ratings.dat")

In [7]:
ratingsRDD.take(5)

['1::1193::5::978300760',
 '1::661::3::978302109',
 '1::914::3::978301968',
 '1::3408::4::978300275',
 '1::2355::5::978824291']

*Thats it We have read the Text file and we are printing out the first 5 rows using `take action` and make sure you don't use a collect action here because that will printout the whole RDD.*

*Now if you check the readme file provided in the Dataset these are the columns in the Data*

>*UserID::MovieID::Rating::Timestamp*

*Lets check counts on each ratings given, But first we need to split our data and for that we need to make use of a Transformation.*

In [8]:
ratings = ratingsRDD.map(lambda x: x.split('::')[2])

In [9]:
ratings.take(5)

['5', '3', '3', '4', '5']

In [10]:
result = ratings.countByValue()

In [12]:
type(result)

collections.defaultdict

In [13]:
result

defaultdict(int,
            {'5': 226310, '3': 261197, '4': 348971, '2': 107557, '1': 56174})

*So you can see how easy it was to get the ratings counter. As it has returned a dictionary lets sort and print the results.*

In [30]:
import collections
sortedResults = collections.OrderedDict(sorted(result.items()))
print(f"{'Ratings':10}{'Count'}\n")
for key, value in sortedResults.items():
    print(f"{'★'* int(key):{10}}{value}")

Ratings   Count

★         56174
★★        107557
★★★       261197
★★★★      348971
★★★★★     226310


*Lets look at another example and check which are the most rated movies.*

In [57]:
def loadMovieNames():
    movieNames = {}
    with open("data/ml-1m/movies.dat", encoding= 'ISO-8859-1') as f:
        for line in f:
            fields = line.split('::')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [58]:
nameDict = sc.broadcast(loadMovieNames())

In [59]:
movies = ratingsRDD.map(lambda x: (int(x.split("::")[1]), 1))

In [60]:
movies.take(5)

[(1193, 1), (661, 1), (914, 1), (3408, 1), (2355, 1)]

In [61]:
movieCounts = movies.reduceByKey(lambda x, y: x + y)

In [62]:
movieCounts.take(5)

[(1193, 1725), (661, 525), (914, 636), (3408, 1315), (2355, 1703)]

In [63]:
flipped = movieCounts.map( lambda x : (x[1], x[0]))
sortedMovies = flipped.sortByKey(ascending=False)

In [64]:
sortedMovies.take(5)

[(3428, 2858), (2991, 260), (2990, 1196), (2883, 1210), (2672, 480)]

In [65]:
sortedMoviesWithNames = sortedMovies.map(lambda countMovie : (nameDict.value[countMovie[1]], countMovie[0]))

In [67]:
sortedMoviesWithNames.take(10)

[('American Beauty (1999)', 3428),
 ('Star Wars: Episode IV - A New Hope (1977)', 2991),
 ('Star Wars: Episode V - The Empire Strikes Back (1980)', 2990),
 ('Star Wars: Episode VI - Return of the Jedi (1983)', 2883),
 ('Jurassic Park (1993)', 2672),
 ('Saving Private Ryan (1998)', 2653),
 ('Terminator 2: Judgment Day (1991)', 2649),
 ('Matrix, The (1999)', 2590),
 ('Back to the Future (1985)', 2583),
 ('Silence of the Lambs, The (1991)', 2578)]

*Now these are top 10 most rated movies.*

*Now lets look at movies with most 5 star ratings*

In [68]:
def filter_five_star(line):
    splited_line= line.split("::")
    if splited_line[2] == '5':
        return line
five_start_rattingsRDD= ratingsRDD.filter(lambda x: filter_five_star(x))
five_start_rattingsRDD.take(5)

['1::1193::5::978300760',
 '1::2355::5::978824291',
 '1::1287::5::978302039',
 '1::2804::5::978300719',
 '1::595::5::978824268']

In [69]:
five_start_movies = five_start_rattingsRDD.map(lambda x: (int(x.split("::")[1]), 1))
five_start_movieCounts = five_start_movies.reduceByKey(lambda x, y: x + y)
flipped = five_start_movieCounts.map( lambda x : (x[1], x[0]))
five_start_sortedMovies = flipped.sortByKey(ascending=False)
five_start_sortedMoviesWithNames = five_start_sortedMovies.map(lambda countMovie : (nameDict.value[countMovie[1]], countMovie[0]))

In [70]:
five_start_sortedMoviesWithNames.take(10)

[('American Beauty (1999)', 1963),
 ('Star Wars: Episode IV - A New Hope (1977)', 1826),
 ('Raiders of the Lost Ark (1981)', 1500),
 ('Star Wars: Episode V - The Empire Strikes Back (1980)', 1483),
 ("Schindler's List (1993)", 1475),
 ('Godfather, The (1972)', 1475),
 ('Shawshank Redemption, The (1994)', 1457),
 ('Matrix, The (1999)', 1430),
 ('Saving Private Ryan (1998)', 1405),
 ('Sixth Sense, The (1999)', 1385)]

*Lets look at number of movies produced in each year*

In [71]:
moviesRDD =sc.textFile("data/ml-1m/movies.dat") 

In [72]:
moviesRDD.take(5)

["1::Toy Story (1995)::Animation|Children's|Comedy",
 "2::Jumanji (1995)::Adventure|Children's|Fantasy",
 '3::Grumpier Old Men (1995)::Comedy|Romance',
 '4::Waiting to Exhale (1995)::Comedy|Drama',
 '5::Father of the Bride Part II (1995)::Comedy']

In [74]:
'Toy Story (1995)'[-6:]

'(1995)'

In [86]:
import re
re.search(r'\([0-9]{4}\)$','Grumpier Old Men (1995)').group(0)[1:-1]

1995

In [87]:
def get_year(line):
    split_line= line.split('::')
    year= re.search(r'\([0-9]{4}\)$',split_line[1]).group(0)[1:-1]
    return (year, 1)
year_RDD= moviesRDD.map(lambda x: get_year(x))

In [88]:
year_RDD.take(5)

[('1995', 1), ('1995', 1), ('1995', 1), ('1995', 1), ('1995', 1)]

In [89]:
yearCounts = year_RDD.reduceByKey(lambda x, y: x + y)

In [90]:
yearCounts.take(5)

[('1995', 342), ('1994', 257), ('1996', 345), ('1976', 21), ('1993', 165)]

In [93]:
ascending_sorted_yearCounts = yearCounts.sortByKey()

In [94]:
ascending_sorted_yearCounts.take(5)

[('1919', 3), ('1920', 2), ('1921', 1), ('1922', 2), ('1923', 3)]

In [97]:
descending_sorted_yearCounts = yearCounts.sortByKey(ascending= False)

In [98]:
descending_sorted_yearCounts.take(5)

[('2000', 156), ('1999', 283), ('1998', 337), ('1997', 315), ('1996', 345)]

*Years with most movies*

In [99]:
flipped = yearCounts.map( lambda x : (x[1], x[0]))
descending_sorted_yearCounts = flipped.sortByKey(ascending= False)

In [100]:
descending_sorted_yearCounts.take(10)

[(345, '1996'),
 (342, '1995'),
 (337, '1998'),
 (315, '1997'),
 (283, '1999'),
 (257, '1994'),
 (165, '1993'),
 (156, '2000'),
 (104, '1986'),
 (102, '1992')]

*Lets find out the which age group is most active on the platform*

In [101]:
ratingsRDD.take(5)

['1::1193::5::978300760',
 '1::661::3::978302109',
 '1::914::3::978301968',
 '1::3408::4::978300275',
 '1::2355::5::978824291']

In [104]:
def load_age_group():
    age_group= {'1':  "Under 18", '18':  "18-24", '25':  "25-34", '35':  "35-44", '45':  "45-49", '50':  "50-55", '56':  "56+"}
    user_ageGroup = {}
    with open("data/ml-1m/users.dat") as f:
        for line in f:
            fields = line.split('::')
            user_ageGroup[int(fields[0])] = age_group[fields[2]]
    return user_ageGroup

In [105]:
ageGroupDict = sc.broadcast(load_age_group())

In [106]:
users_ratings = ratingsRDD.map(lambda x: (int(x.split("::")[0]), 1))
count_user_ratings = users_ratings.reduceByKey(lambda x, y: x + y)

In [107]:
count_user_ratings.take(5)

[(1, 53), (2, 129), (3, 51), (4, 21), (5, 198)]

In [108]:
flipped = count_user_ratings.map( lambda x : (x[1], x[0]))
age_group_count = flipped.map(lambda countuser : (ageGroupDict.value[countuser[1]], countuser[0]))

In [111]:
age_group_counts= age_group_count.reduceByKey(lambda x , y: x + y)

In [112]:
age_group_counts.collect()

[('Under 18', 27211),
 ('56+', 38780),
 ('25-34', 395556),
 ('45-49', 83633),
 ('50-55', 72490),
 ('35-44', 199003),
 ('18-24', 183536)]

In [116]:
age_group_counts.map(lambda x: (x[1], x[0])).sortByKey(ascending= False).map(lambda x: (x[1], x[0])).collect()

[('25-34', 395556),
 ('35-44', 199003),
 ('18-24', 183536),
 ('45-49', 83633),
 ('50-55', 72490),
 ('56+', 38780),
 ('Under 18', 27211)]

*Lets Load in another fake social network dataset.*

In [117]:
friends = sc.textFile("data/fakefriends.csv")

In [118]:
friends.take(5)

['0,Will,33,385',
 '1,Jean-Luc,26,2',
 '2,Hugh,55,221',
 '3,Deanna,40,465',
 '4,Quark,68,21']

In [119]:
friends.count()

500

*Lets look at the average number of friends broken down by age in this Dataset.*

In [126]:
def parseLine(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)

In [121]:
friendsRDD = friends.map(parseLine)
friendsRDD.take(5)

[(33, 385), (26, 2), (55, 221), (40, 465), (68, 21)]

In [123]:
totalsByAge = friendsRDD.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

In [124]:
totalsByAge.take(5)

[(33, (3904, 12)),
 (26, (4115, 17)),
 (55, (3842, 13)),
 (40, (4264, 17)),
 (68, (2696, 10))]

In [128]:
averagesByAge = totalsByAge.mapValues(lambda x: round(x[0] / x[1], 2))

In [130]:
averagesByAge.take(5)

[(33, 325.33), (26, 242.06), (55, 295.54), (40, 250.82), (68, 269.6)]

*Lets load up another dataset*

In [131]:
temp = sc.textFile("data/1800.csv")

In [132]:
temp.take(5)

['ITE00100554,18000101,TMAX,-75,,,E,',
 'ITE00100554,18000101,TMIN,-148,,,E,',
 'GM000010962,18000101,PRCP,0,,,E,',
 'EZE00100082,18000101,TMAX,-86,,,E,',
 'EZE00100082,18000101,TMIN,-135,,,E,']

*Lets check the weather stations with minimum temperatures in 1800.*

In [133]:
def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

In [134]:
tempRDD = temp.map(parseLine)

In [135]:
tempRDD.take(5)

[('ITE00100554', 'TMAX', 18.5),
 ('ITE00100554', 'TMIN', 5.359999999999999),
 ('GM000010962', 'PRCP', 32.0),
 ('EZE00100082', 'TMAX', 16.52),
 ('EZE00100082', 'TMIN', 7.699999999999999)]

In [136]:
minTemps = tempRDD.filter(lambda x: "TMIN" in x[1])

In [137]:
minTemps.take(5)

[('ITE00100554', 'TMIN', 5.359999999999999),
 ('EZE00100082', 'TMIN', 7.699999999999999),
 ('ITE00100554', 'TMIN', 9.5),
 ('EZE00100082', 'TMIN', 8.599999999999998),
 ('ITE00100554', 'TMIN', 23.72)]

In [138]:
stationTemps = minTemps.map(lambda x: (x[0], x[2]))

In [141]:
minTemps = stationTemps.reduceByKey(lambda x, y: round(min(x,y), 2))

In [142]:
minTemps.collect()

[('ITE00100554', 5.36), ('EZE00100082', 7.7)]

*Lets do another word count on a text file*

In [143]:
book = sc.textFile("data/Book.txt")

In [144]:
book.take(2)

['Self-Employment: Building an Internet Business of One',
 'Achieving Financial and Personal Freedom through a Lifestyle Technology Business']

In [146]:
def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

In [147]:
words = book.flatMap(normalizeWords)

In [148]:
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

In [149]:
wordCounts.take(5)

[('self', 111),
 ('employment', 75),
 ('building', 33),
 ('an', 178),
 ('internet', 26)]

In [152]:
wordCountsSorted = wordCounts.map(lambda x: (x[1], x[0])).sortByKey(ascending= False)

In [153]:
wordCountsSorted.take(10)

[(1878, 'you'),
 (1828, 'to'),
 (1420, 'your'),
 (1292, 'the'),
 (1191, 'a'),
 (970, 'of'),
 (934, 'and'),
 (772, ''),
 (747, 'that'),
 (649, 'it')]

----