# Homework 2: Map-reduce, Hadoop and Spark
### May 2022
### Authors: 
- Efrat Levy 301035184
- Edan Shahmoon 315374330
- Ilan Vasilevsky 322545682

## 1) Load the Microsoft.com data into HDFS

delete data folder and re-create it

In [155]:
%%bash
hdfs dfs -mkdir /user/stud10/ex2

put data and then show contents of folder

In [156]:
%%bash
hdfs dfs -put /home/stud10/bigdata_lab/HW2/microsoft-com.data /user/stud10/ex2/microsoft-com.data
hdfs dfs -put /home/stud10/bigdata_lab/HW2/countries.txt /user/stud10/ex2/countries.txt
hdfs dfs -ls /user/stud10/ex2

Found 2 items
-rw-r--r--   3 stud10 stud10        387 2022-05-29 14:21 /user/stud10/ex2/countries.txt
-rw-r--r--   3 stud10 stud10    1491520 2022-05-29 14:21 /user/stud10/ex2/microsoft-com.data


show last lines of data file

In [157]:
%%bash
hdfs dfs -cat /user/stud10/ex2/microsoft-com.data | tail -n 5

V,42709,1003,1
V,42710,1035,1
V,42710,1001,1
V,42710,1018,1
V,42711,1008,1


## 2) Parse the data according to the format specified in the .info file.

In [158]:
from pyspark import SparkContext, SparkConf

In [159]:
raw_rdd = sc.textFile("hdfs:/user/stud10/ex2/microsoft-com.data") # read data
lst_rdd = raw_rdd.map(lambda line: line.replace('"','').split(',')) # split each row by ,
lst_rdd.top(5)

[[u'V', u'42711', u'1008', u'1'],
 [u'V', u'42710', u'1035', u'1'],
 [u'V', u'42710', u'1018', u'1'],
 [u'V', u'42710', u'1001', u'1'],
 [u'V', u'42709', u'1003', u'1']]

## 3) only users that visited at least one page related to one of the given countries.
Given the list of countries in the file countries.txt (e.g., South Africa, Spain, Sweden, Switzerland), filter the data to include only users that visited at least one page related to one of the given countries.

**ASSUMPTION: the header is the same as the country name**

In [160]:
def filter_votes_by_countries(attr_rdd,votes_rdd,countries_rdd):
    # return votes for attributes which contains country name
    countries_lst = sc.broadcast(countries_rdd.collect()) # broadcast countries between nodes
    relevant_lines = attr_rdd.filter(lambda x: x[3] in countries_lst.value) # filter attribute lines which contains countries
    relevant_ids = relevant_lines.map(lambda x: x[1]) # get ids of relevant attribute line (which contains country)
    country_ids = sc.broadcast(relevant_ids.collect())
    country_votes = votes_rdd.filter(lambda x: x[2] in country_ids.value) # return the votes
    return country_votes

In [161]:
attr_rdd = lst_rdd.filter(lambda x: x[0]=='A') # filter attribute lines
votes_rdd = lst_rdd.filter(lambda x: x[0]=='V') # filter votes lines
countries_rdd = sc.textFile("hdfs:/user/stud10/ex2/countries.txt")

In [162]:
attr_rdd.top(3)

[[u'A', u'1297', u'1', u'Central America', u'/centroam'],
 [u'A', u'1295', u'1', u'Training', u'/train_cert'],
 [u'A', u'1294', u'1', u'Bookshelf', u'/bookshelf']]

In [163]:
votes_rdd.top(3)

[[u'V', u'42711', u'1008', u'1'],
 [u'V', u'42710', u'1035', u'1'],
 [u'V', u'42710', u'1018', u'1']]

In [164]:
countries_rdd.top(3)

[u'Venezuela', u'Uruguay', u'UK']

In [165]:
country_votes = filter_votes_by_countries(attr_rdd,votes_rdd,countries_rdd)

In [166]:
country_votes.top(10)

[[u'V', u'42708', u'1123', u'1'],
 [u'V', u'42706', u'1059', u'1'],
 [u'V', u'42698', u'1005', u'1'],
 [u'V', u'42688', u'1053', u'1'],
 [u'V', u'42665', u'1023', u'1'],
 [u'V', u'42649', u'1223', u'1'],
 [u'V', u'42647', u'1023', u'1'],
 [u'V', u'42641', u'1053', u'1'],
 [u'V', u'42637', u'1053', u'1'],
 [u'V', u'42636', u'1053', u'1']]

In [167]:
relevant_users = country_votes.map(lambda x: x[1]).distinct() # drop duplicates

In [168]:
relevant_users.top(10)

[u'42708',
 u'42706',
 u'42698',
 u'42688',
 u'42665',
 u'42649',
 u'42647',
 u'42641',
 u'42637',
 u'42636']

In [169]:
print("number of relevant users: %s, number of relevant votes: %s"%(relevant_users.count(),country_votes.count()))

number of relevant users: 3199, number of relevant votes: 3334


In [170]:
# filter votes by the user ids we have found 
user_ids = sc.broadcast(relevant_users.collect()) 
relevant_data = votes_rdd.filter(lambda x: x[1] in user_ids.value)

In [171]:
relevant_data.top(5)

[[u'V', u'42708', u'1123', u'1'],
 [u'V', u'42708', u'1041', u'1'],
 [u'V', u'42708', u'1038', u'1'],
 [u'V', u'42708', u'1027', u'1'],
 [u'V', u'42708', u'1026', u'1']]

In [172]:
relevant_data.count()

13111

## 4.a) For each user, the number of (unique) pages that he visited. 
For each user, the number of (unique) pages that he visited. show the top-10 users.

In [173]:
# we will use Map-Reduce in order to get the unique number of pages for each user:
# The first map function's output is a tuple of (user, page_id), taking only unique page ids per each user
# The second map function's output is a tuple of (user, 1)
# The reduceByKey function sums the values of unique pages

pages_per_user = votes_rdd.map(lambda x: (x[1],x[2])).distinct()\
                          .map(lambda x: (x[0], 1))\
                          .reduceByKey(lambda v1,v2: v1+v2)\
                          .sortBy(keyfunc=lambda x: -x[1])

pages_per_user.take(10)

[(u'40310', 35),
 (u'25185', 31),
 (u'12147', 30),
 (u'41066', 28),
 (u'12815', 28),
 (u'25922', 28),
 (u'10348', 28),
 (u'31809', 26),
 (u'19860', 26),
 (u'40122', 24)]

## 4.b) For each country, the number of users that visited a page of that country
For each country, the number of users that visited a page of that country. Exclude from your
report countries with a name longer than one word.

**ASSUMPTION: country names are splitted only by spaces**

**ASSUMPTION: titles are unique (validated on the last block)**

In [174]:
one_word_countries = countries_rdd.filter(lambda x: len(x.split(" ")) < 2) # get only one-word countries
one_word_lst = one_word_countries.collect() # small, constant size list - so it's fine to collect
country_name_mapping = dict(attr_rdd.filter(lambda x: x[3] in one_word_lst) \
.map(lambda x: (x[1],x[3])).collect()) # dict of attribute line id -> country name

In [175]:
print(country_name_mapping)

{u'1227': u'Argentina', u'1165': u'Poland', u'1166': u'Mexico', u'1084': u'UK', u'1223': u'Finland', u'1208': u'Israel', u'1080': u'Brazil', u'1262': u'Chile', u'1123': u'Germany', u'1267': u'Caribbean', u'1203': u'Denmark', u'1073': u'Taiwan', u'1209': u'Turkey', u'1217': u'Ireland', u'1053': u'Jakarta', u'1079': u'Australia', u'1059': u'Sweden', u'1195': u'Portugal', u'1194': u'China', u'1107': u'Slovakia', u'1112': u'Canada', u'1115': u'Hungary', u'1116': u'Switzerland', u'1258': u'Peru', u'1172': u'Belgium', u'1179': u'Colombia', u'1152': u'Russia', u'1229': u'Uruguay', u'1153': u'Venezuela', u'1188': u'Korea', u'1023': u'Spain', u'1183': u'Italy', u'1180': u'Slovenija', u'1241': u'India', u'1240': u'Thailand', u'1105': u'France', u'1005': u'Norway'}


In [176]:
country_votes_one = filter_votes_by_countries(attr_rdd,votes_rdd,one_word_countries) # relevant vote lines for one word countries

In [177]:
uid_vid = country_votes_one.map(lambda x: x[1:3]) # user id and attribute line id
uid_vid.top(5)

[[u'42708', u'1123'],
 [u'42706', u'1059'],
 [u'42698', u'1005'],
 [u'42688', u'1053'],
 [u'42665', u'1023']]

In [178]:
uid_country = uid_vid.map(lambda x: (x[0], country_name_mapping[x[1]])) # use the dict we created to get country name
uid_country.top(5)

[(u'42708', u'Germany'),
 (u'42706', u'Sweden'),
 (u'42698', u'Norway'),
 (u'42688', u'Jakarta'),
 (u'42665', u'Spain')]

In [179]:
uid_country_unique = uid_country.distinct() # count only once each user

In [180]:
countries = uid_country_unique.map(lambda x: x[1])
sum_by_country = countries.countByValue()
sum_by_country.items()

[(u'Brazil', 121),
 (u'Canada', 128),
 (u'Italy', 167),
 (u'Peru', 3),
 (u'France', 183),
 (u'Slovakia', 11),
 (u'Ireland', 13),
 (u'Caribbean', 5),
 (u'Argentina', 32),
 (u'Venezuela', 8),
 (u'Israel', 34),
 (u'Korea', 94),
 (u'Norway', 42),
 (u'Germany', 372),
 (u'Chile', 4),
 (u'Denmark', 55),
 (u'Belgium', 45),
 (u'Thailand', 11),
 (u'Poland', 38),
 (u'Spain', 191),
 (u'UK', 186),
 (u'Jakarta', 670),
 (u'Turkey', 9),
 (u'Finland', 29),
 (u'Sweden', 258),
 (u'Australia', 136),
 (u'Switzerland', 31),
 (u'Russia', 52),
 (u'Portugal', 15),
 (u'Mexico', 33),
 (u'Uruguay', 4),
 (u'India', 9),
 (u'China', 26),
 (u'Colombia', 11),
 (u'Hungary', 15),
 (u'Taiwan', 204),
 (u'Slovenija', 9)]

## 4.c) The top 5 visited countries.

In [181]:
country_votes = filter_votes_by_countries(attr_rdd,votes_rdd,countries_rdd)
country_votes_ids = country_votes.map(lambda x: x[2])
votes_count = country_votes_ids.countByValue() # returns a python dict - it's fine, because we deal with small dict
top_5_votes_count = sorted(votes_count.items(), key=lambda x:x[1], reverse=True)[:5]

In [182]:
print("The top 5 visited countries:")
for i in range(5):
    country_id, cnt = top_5_votes_count[i]
    print("%s. %s: %s"%(i+1,country_name_mapping[country_id],cnt))

The top 5 visited countries:
1. Jakarta: 670
2. Germany: 372
3. Sweden: 258
4. Taiwan: 204
5. Spain: 191


## 5) Write the report (4.a) to HDFS for future use

In [183]:
pages_per_user.saveAsTextFile("hdfs:/user/stud10/ex2/output")

In [184]:
%%bash
hdfs dfs -ls /user/stud10/ex2/output

Found 3 items
-rw-r--r--   3 stud10 stud10          0 2022-05-29 14:21 /user/stud10/ex2/output/_SUCCESS
-rw-r--r--   3 stud10 stud10     318899 2022-05-29 14:21 /user/stud10/ex2/output/part-00000
-rw-r--r--   3 stud10 stud10     139930 2022-05-29 14:21 /user/stud10/ex2/output/part-00001


## *verify that titles are unique

In [185]:
countries_rdd = sc.textFile("hdfs:/user/stud10/ex2/countries.txt")
countries_lst = sc.broadcast(countries_rdd.collect())
tmp  = attr_rdd.filter(lambda x: x[3] in countries_lst.value).map(lambda x: x[3])
print("number of country titels: %s"%(tmp.count()))
print("number of distinct country titels: %s"%(tmp.distinct().count()))
assert(tmp.count() == tmp.distinct().count())
print("they are equal - so the assumption that the titles are unique holds...")

number of country titels: 41
number of distinct country titels: 41
they are equal - so the assumption that the titles are unique holds...


## 6) The average number of visits per country

In [186]:
# creating a pair rdd with (page_id, user_id)
pages_users = votes_rdd.map(lambda x: (x[2], x[1]))
# creating a pair rdd with (page_id, country)
pages_countries = attr_rdd.filter(lambda x: x[3] in countries_lst.value).map(lambda x: (x[1], x[3]))
country_visits = pages_users.join(pages_countries) # join of the 2 rdds by page_id

# The map function's output is a tuple of (country, 1)
# The reduce function sums the number of visits for each country
visits_per_country = country_visits.map(lambda x: (x[1][1], 1)).reduceByKey(lambda v1,v2: v1+v2)

# Calculating the average number of pages per user
avg_visits_per_country = float(visits_per_country.map(lambda x: x[1]).mean())
print "The average number of visits per country is: {}".format(avg_visits_per_country)

The average number of visits per country is: 81.3170731707


## 7) The page id with minimum number of visits

#### Checking if there are pages with 0 visits:

In [187]:
visited_pages = votes_rdd.map(lambda x: x[2]).distinct().sortBy(keyfunc=lambda x: x)
all_pages = attr_rdd.map(lambda x: x[1]).sortBy(keyfunc=lambda x: x)
pages_with_0_visits = all_pages.subtract(visited_pages)

if pages_with_0_visits.count() > 0:
    zero_visits = True
    print "There are {} pages with 0 visits. \n" \
            "For example: page id {}".format(pages_with_0_visits.count(), pages_with_0_visits.takeSample(False, 1, 42)[0])
else:
    zero_visits = False
    print "There are no pages with 0 visits"

There are 9 pages with 0 visits. 
For example: page id 1291


#### In case that there are no pages with 0 visits:

In [188]:
# The map function's output is a tuple of (page_id, 1)
# The reduce function sums the number of visits for each page
# The sort function gives us the page_id with the lowest number of visits
if not zero_visits:
    visits_per_page = votes_rdd.map(lambda tpl: (tpl[2], 1))\
                                   .reduceByKey(lambda v1,v2: v1+v2)\
                                   .sortBy(keyfunc=lambda x: -x[1], ascending=False).take(1)

    visits_per_page#.collect()
    print "The page id with the lowest number of visits is page {}\n" \
          "Which had {} visits in total".format(visits_per_page[0][0], visits_per_page[0][1])

## 8) The average number of pages per user

In [189]:
# The first map function's output is a tuple of (user, page_id), taking only unique page ids per each user
# The second map function's output is a tuple of (user, 1)
# The reduceByKey function sums the values of unique pages

pages_per_user = votes_rdd.map(lambda x: (x[1],x[2])).distinct()\
                          .map(lambda x: (x[0], 1))\
                          .reduceByKey(lambda v1,v2: v1+v2)

# Calculating the average number of pages per user
avg_pages_per_user = float(pages_per_user.map(lambda tpl: tpl[1]).mean())

print "The average number of pages per user is: {}".format(avg_pages_per_user)

The average number of pages per user is: 3.01592736388


### Removing our directories from HDFS:

In [190]:
%%bash
hdfs dfs -rm -r /user/stud10/ex2

22/05/29 14:22:08 INFO fs.TrashPolicyDefault: Moved: 'hdfs://bdl0.eng.tau.ac.il:8020/user/stud10/ex2' to trash at: hdfs://bdl0.eng.tau.ac.il:8020/user/stud10/.Trash/Current/user/stud10/ex21653823328449
