# Homework 2: Map-reduce, Hadoop and Spark

In [93]:
#First we copied the Microsoft.com data to the stud33@bdl1.eng.tau.ac.il machine using scp -r:
#scp -r microsoft-com.data-20200612 stud33@bdl1.eng.tau.ac.il:/home/stud33/bigdata_lab/

In [94]:
#we delete the folders in case the folders and the files exists from previous running

In [95]:
%%bash
hdfs dfs -rm -r /user/stud33/hw_2

20/06/21 17:32:16 INFO fs.TrashPolicyDefault: Moved: 'hdfs://bdl0.eng.tau.ac.il:8020/user/stud33/hw_2' to trash at: hdfs://bdl0.eng.tau.ac.il:8020/user/stud33/.Trash/Current/user/stud33/hw_21592749936875


# 1. Data Loading

In [96]:
#create a folder for homework 2

In [97]:
%%bash
hdfs dfs -mkdir /user/stud33/hw_2

In [98]:
#load the microsoft-com to hw_2 folder

In [99]:
%%bash
hdfs dfs -put /home/stud33/bigdata_lab/microsoft-com.data-20200612 /user/stud33/hw_2

In [100]:
%%bash
hdfs dfs -ls /user/stud33/hw_2/microsoft-com.data-20200612/microsoft-com.data

Found 3 items
-rw-r--r--   3 stud33 stud33        387 2020-06-21 17:32 /user/stud33/hw_2/microsoft-com.data-20200612/microsoft-com.data/countries.txt
-rw-r--r--   3 stud33 stud33    1491520 2020-06-21 17:32 /user/stud33/hw_2/microsoft-com.data-20200612/microsoft-com.data/microsoft-com.data
-rw-r--r--   3 stud33 stud33       3629 2020-06-21 17:32 /user/stud33/hw_2/microsoft-com.data-20200612/microsoft-com.data/microsoft-com.info


In [101]:
import os
import sys
import pyspark
from pyspark import SparkContext, SparkConf
import pandas as pd

In [102]:
sc

<pyspark.context.SparkContext at 0x7f1cd7dc4d90>

In [103]:
microsoft_data = sc.textFile("hdfs:/user/stud33/hw_2/microsoft-com.data-20200612/microsoft-com.data/microsoft-com.data")

# 2. Data Parsing

In [104]:
# remove noise from the data:
#1. remove empty lines
#2. remove '"' character

In [105]:
microsoft_data_filtered = microsoft_data.map(lambda line : line.replace('"','')).filter(lambda line: len(line)>0)

In [106]:
microsoft_data_filtered.take(5)

[u'A,1287,1,International AutoRoute,/autoroute',
 u'A,1288,1,library,/library',
 u'A,1289,1,Master Chef Product Information,/masterchef',
 u'A,1297,1,Central America,/centroam',
 u'A,1215,1,For Developers Only Info,/developer']

In [107]:
microsoft_data_filtered.count()

98948

In [108]:
countries_data = sc.textFile("hdfs:/user/stud33/hw_2/microsoft-com.data-20200612/microsoft-com.data/countries.txt")

In [109]:
countries_data.take(5)

[u'Argentina', u'Australia', u'Belgium', u'Brazil', u'Canada']

In [110]:
countries_data.count()

42

In [111]:
#split each row by ',' seperator

In [112]:
microsoft_data_splited = microsoft_data_filtered.map(lambda line: line.split(','))

In [113]:
microsoft_data_splited.take(5)

[[u'A', u'1287', u'1', u'International AutoRoute', u'/autoroute'],
 [u'A', u'1288', u'1', u'library', u'/library'],
 [u'A', u'1289', u'1', u'Master Chef Product Information', u'/masterchef'],
 [u'A', u'1297', u'1', u'Central America', u'/centroam'],
 [u'A', u'1215', u'1', u'For Developers Only Info', u'/developer']]

In [114]:
#slice the rdd to attribute and votes rdds

In [115]:
#Attribute data structure:
# A, attribute ID number, 1, title of the Vroot, URL relative

In [116]:
microsoft_data_attribute = microsoft_data_splited.filter(lambda line: line[0]  =='A').map(lambda line: (line[1],line[3]))

In [117]:
microsoft_data_attribute.count()

294

In [118]:
microsoft_data_attribute.take(5)

[(u'1287', u'International AutoRoute'),
 (u'1288', u'library'),
 (u'1289', u'Master Chef Product Information'),
 (u'1297', u'Central America'),
 (u'1215', u'For Developers Only Info')]

In [119]:
#vote data structure:
# V, case ID number of a user, attribute ID number, 1

In [120]:
microsoft_data_vote = microsoft_data_splited.filter(lambda line: line[0]  =='V').map(lambda line: (line[2],line[1]))

In [121]:
microsoft_data_vote.count()

98654

In [122]:
microsoft_data_vote.take(5)

[(u'1000', u'10001'),
 (u'1001', u'10001'),
 (u'1002', u'10001'),
 (u'1001', u'10002'),
 (u'1003', u'10002')]

In [123]:
#join attribute and votes data, key - attribute ID number, join - simple inner join
#joined data structure:
# attribute ID number, title of the Vroot, case ID number of a user

In [124]:
#in addition, we exclude irrelevant fields

In [125]:
microsoft_data_joined = microsoft_data_attribute.join(microsoft_data_vote).map(lambda line: (line[0],line[1][0],line[1][1]))

In [126]:
microsoft_data_joined.count()

98654

In [127]:
microsoft_data_joined.take(5)

[(u'1142', u'South Africa', u'10372'),
 (u'1142', u'South Africa', u'13352'),
 (u'1142', u'South Africa', u'19019'),
 (u'1142', u'South Africa', u'24124'),
 (u'1142', u'South Africa', u'25638')]

# 3. filter the data by the chosen countries

In [128]:
# change the fields order for the join, country field first

In [129]:
microsoft_title_user = microsoft_data_joined.map(lambda line: (line[1],line[2]))

In [130]:
microsoft_title_user.take(5)

[(u'South Africa', u'10372'),
 (u'South Africa', u'13352'),
 (u'South Africa', u'19019'),
 (u'South Africa', u'24124'),
 (u'South Africa', u'25638')]

In [131]:
#filter the data by countries rdd
#we also adding a 1 field for 4.a and 4.b reports

In [132]:
microsoft_title_user_filtered = microsoft_title_user.join(countries_data.map(lambda line: (line,1)))\
                                .map(lambda line: (line[0],1))

In [133]:
microsoft_title_user_filtered.count()

3334

In [134]:
microsoft_title_user_filtered.take(5)

[(u'Turkey', 1),
 (u'Turkey', 1),
 (u'Turkey', 1),
 (u'Turkey', 1),
 (u'Turkey', 1)]

# 4.a report - For each country, the number of users that visited a page of that country

In [135]:
microsoft_num_of_users_by_country = microsoft_title_user_filtered.reduceByKey(lambda v1,v2: int(v1)+int(v2))

In [136]:
microsoft_num_of_users_by_country.take(5)

[(u'Turkey', 9),
 (u'Caribbean', 5),
 (u'Norway', 42),
 (u'Poland', 38),
 (u'Germany', 372)]

In [137]:
# in order to present reports 4.a and 4.b nicely we use pandas data frame 

In [138]:
country_list = []
for country in microsoft_num_of_users_by_country.collect():
    country_list.append(list(country))

country_df=pd.DataFrame(country_list , columns =['Country','number of users'])

In [139]:
print 'Number of users by country:'
display(country_df)

Number of users by country:


Unnamed: 0,Country,number of users
0,Turkey,9
1,Caribbean,5
2,Norway,42
3,Poland,38
4,Germany,372
5,Peru,3
6,Hong Kong,35
7,Spain,191
8,Thailand,11
9,South Africa,19


# 4.b report - The top 5 visited countries

In [140]:
# we are changing the columns order and using top function in order to get the top 5 visited countries

In [141]:
microsoft_num_of_users_by_country.map(lambda line: (line[1],line[0])).top(5)

[(670, u'Jakarta'),
 (372, u'Germany'),
 (258, u'Sweden'),
 (204, u'Taiwan'),
 (191, u'Spain')]

In [142]:
print 'Top 5 visited countries:'
display(country_df.sort_values(by='number of users', ascending=False).head(5))

Top 5 visited countries:


Unnamed: 0,Country,number of users
22,Jakarta,670
4,Germany,372
15,Sweden,258
21,Taiwan,204
7,Spain,191


# 5. Write the report (4.a) to HDFS for future use

In [143]:
microsoft_num_of_users_by_country.saveAsTextFile("hdfs:/user/stud33/hw_2/number_of_users_by_country")

In [144]:
%%bash
hdfs dfs -ls /user/stud33/hw_2/number_of_users_by_country

Found 7 items
-rw-r--r--   3 stud33 stud33          0 2020-06-21 17:32 /user/stud33/hw_2/number_of_users_by_country/_SUCCESS
-rw-r--r--   3 stud33 stud33        131 2020-06-21 17:32 /user/stud33/hw_2/number_of_users_by_country/part-00000
-rw-r--r--   3 stud33 stud33         80 2020-06-21 17:32 /user/stud33/hw_2/number_of_users_by_country/part-00001
-rw-r--r--   3 stud33 stud33        169 2020-06-21 17:32 /user/stud33/hw_2/number_of_users_by_country/part-00002
-rw-r--r--   3 stud33 stud33         55 2020-06-21 17:32 /user/stud33/hw_2/number_of_users_by_country/part-00003
-rw-r--r--   3 stud33 stud33        172 2020-06-21 17:32 /user/stud33/hw_2/number_of_users_by_country/part-00004
-rw-r--r--   3 stud33 stud33         98 2020-06-21 17:32 /user/stud33/hw_2/number_of_users_by_country/part-00005


In [148]:
# we get a distrebuted file

In [149]:
%%bash
hdfs dfs -getmerge /user/stud33/hw_2/number_of_users_by_country /home/stud33/bigdata_lab/number_of_users_by_country

# 6. The average number of pages per user

In [150]:
#remineder on the data structure:
#attribute ID number, title of the Vroot, case ID number of a user

In [151]:
# we are going to calculate the AVG using 2 values we are going to extract:
#1. count distinct of users
#2. total number of pages
#and then we divide #2 by #1

In [152]:
# we are using distinct functiom in order to coumt the number of users

In [153]:
microsoft_distinct_users=microsoft_data_joined.map(lambda line: (line[2])).distinct()
microsoft_distinct_users.count()

32711

In [154]:
microsoft_users=microsoft_data_joined.map(lambda line: (line[2],1))
microsoft_users.take(5)

[(u'10372', 1), (u'13352', 1), (u'19019', 1), (u'24124', 1), (u'25638', 1)]

In [155]:
microsoft_num_of_votes_by_user=microsoft_users.reduceByKey(lambda v1, v2: int(v1)+int(v2))

In [156]:
microsoft_num_of_votes_by_user.take(5)

[(u'35540', 3), (u'35236', 1), (u'35544', 2), (u'35548', 1), (u'24029', 1)]

In [160]:
# we are using reduce and not reduceByKey in order to sum all the pages together

In [161]:
microsoft_total_pages=microsoft_num_of_votes_by_user.map(lambda line: (line[1])).reduce(lambda v1, v2: (int(v1)+int(v2)))

In [162]:
print microsoft_total_pages

98654


In [163]:
avg_num_of_pages_per_user = microsoft_total_pages*1.0/microsoft_distinct_users.count()
print 'Average number of pages per user: {}'.format(avg_num_of_pages_per_user)

Average number of pages per user: 3.01592736388


# 7. The page id with maximum number of visits

In [165]:
microsoft_pages=microsoft_data_joined.map(lambda line: (line[0],1))

In [166]:
microsoft_num_of_visits_per_page=microsoft_pages.reduceByKey(lambda v1, v2: int(v1)+int(v2))

In [167]:
microsoft_num_of_visits_per_page.take(5)

[(u'1142', 19), (u'1128', 1), (u'1164', 49), (u'1146', 79), (u'1263', 2)]

In [168]:
# we are changing the columns order and get the top 1 page

In [169]:
microsoft_page_with_max_visits = microsoft_num_of_visits_per_page.map(lambda line: (line[1],line[0])).top(1) 

In [170]:
print microsoft_page_with_max_visits

[(10836, u'1008')]


In [171]:
print 'The page id with maximum number of visits is {} with {} visits'\
        .format(microsoft_page_with_max_visits[0][1],microsoft_page_with_max_visits[0][0])

The page id with maximum number of visits is 1008 with 10836 visits


# 8. write a command(s) that remove your directories from HDFS

In [172]:
%%bash
hdfs dfs -rm -r /user/stud33/hw_2

20/06/21 17:34:19 INFO fs.TrashPolicyDefault: Moved: 'hdfs://bdl0.eng.tau.ac.il:8020/user/stud33/hw_2' to trash at: hdfs://bdl0.eng.tau.ac.il:8020/user/stud33/.Trash/Current/user/stud33/hw_21592750059575


In [None]:
#Targil 1

In [None]:
info = sc.textFile("hdfs://seinfeld2018/episode_info.csv")
scripts = sc.textFile("hdfs://seinfeld2018/scripts.csv")

In [None]:
#1
main_characters = ['JERRY','GEORGE','KRAMER','ELAINE']
avg_words_per_mainuser_episode =scripts.map(lambda line: line.split(','))\
                                .filter(lambda line: line[0]!= 'Id' and line[2] in main_characters)\
                                .map(lambda line: (line[1], line[2]),' '.join(line[3:]).split(' '))\
                                .reduceByKey(lambda v1, v2: len(v1[0])+len(v2[0]))\
                                .map(lambda line: line[0][1], (line[1],1))\
                                .reduceByKey(lambda v1, v2: (v1[0]+v1[0],v1[1]+v1[1] ))\
                                .mapvalues(lambda line: float(line[0])/float(line[1]))

avg_words_per_mainuser_episode.collect()

In [None]:
#2
main_characters = ['JERRY','GEORGE','KRAMER','ELAINE']
not_main_characters =scripts.map(lambda line: line.split(','))\
                    .filter(lambda line: line[0]!= 'Id' and line[2] not in main_characters)\
                    .map(lambda line: line[1],line[2])
characters_1990=info.map(lambda line: line.split(','))\
                    .filter(lambda line: line[0]!= 'IdInfo' and '1990' in line[-1] )\
                    .map(lambda line: line[0])
count_of_not_main_characters_in_1990=not_main_characters.join(characters_1990)\
                                    .map(lambda line: line[1]).distinct().count()
print(count_of_not_main_characters_in_1990)


In [None]:
#Targil 2

In [None]:
movies = sc.textFile("hdfs://moviesdb2018/movies_metadata.csv")
ratings = sc.textFile("hdfs://moviesdb2018/ratings_small.csv")

In [None]:
#1
join_data=movies.map(lambda line: line.split(',')[0])\
                .filter(lambda line: line!= 'MovieId')\
                .leftOuterjoin(ratings.map(lambda line: line.split(','))\
                                      .filter(lambda line: line[0]!= 'UserId')\
                                      .map(lambda line: line[1],line[2],1))\
                .reduceByKey(lambda v1, v2: (float(v1[0])+float(v2[0]),v1[1]+v2[1]))\
                .mapvalues(lambda line: float(line[0])/float(line[1]))\
                .filter(lambda line: line[1]<3 or line[1] is None)\
                .map(lambda line: line[0])\
                .distinct()
join_data.collect()

In [None]:
#additonal option 

In [None]:
#1
all_movies.=movies.map(lambda line: line.split(',')[0])\
                .filter(lambda line: line!= 'MovieId')

best_movies =ratings.map(lambda line: line.split(','))\
            .filter(lambda line: line[0]!= 'UserId')\
            .map(lambda line: line[1](line[2],1))\
            .reduceByKey(lambda v1, v2: (float(v1[0])+float(v2[0]),v1[1]+v2[1]))\
            .mapvalues(lambda line: float(line[0])/float(line[1]))\
            .filter(lambda line: line[1]>=3)\
            .map(lambda line: line[0])
bad_movies = all_movies.substract(best_movies)

bad_movies.collect()

In [None]:
#2
missing_budget_movies.=movies.map(lambda line: line.split(','))\
                .filter(lambda line: line[0]!= 'MovieId' and line[2]==0)\
                .map(lambda line: line[0])

number_of_rates_per_movie = ratings.map(lambda line: line.split(','))\
            .filter(lambda line: line[0]!= 'UserId')\
            .map(lambda line: line[1],1)\
            .reduceByKey(lambda v1, v2: v1+v2)

join_data = missing_budget_movies.leftOuterjoin(number_of_rates_per_movie)

join_data.collect()