Check whether the target directory exist in Hadoop and if so, delete it, then create it

In [1]:
%%bash
target_dir=/user/stud22/hw2
hadoop fs -test -d ${target_dir}
if [ $? -eq 0 ]
then
    hdfs dfs -rm -r /user/stud22/hw2
    hdfs dfs -mkdir /user/stud22/hw2
else
    hdfs dfs -mkdir /user/stud22/hw2
fi

19/06/12 23:07:20 INFO fs.TrashPolicyDefault: Moved: 'hdfs://bdl0.eng.tau.ac.il:8020/user/stud22/hw2' to trash at: hdfs://bdl0.eng.tau.ac.il:8020/user/stud22/.Trash/Current/user/stud22/hw2


Check that the relevant directory was created and it's empty now

In [3]:
%%bash
hdfs dfs -ls /user/stud22/hw2

Copy the data files to Hadoop into the directory we justed created

In [4]:
%%bash
hdfs dfs -put /home/stud22/bigdata_lab/microsoft-com.data/ /user/stud22/hw2/

Check to make sure the files were copied to Hadoop

In [5]:
%%bash
hdfs dfs -cat /user/stud22/hw2/microsoft-com.data/microsoft-com.data | tail -n 3

V,42710,1001,1
V,42710,1018,1
V,42711,1008,1


After the files have been successfully copied to HDFS, we can start working with spark to analyze them

In [6]:
import os
import sys
import pyspark
from pyspark import SparkContext, SparkConf
import pandas as pd
sc

<pyspark.context.SparkContext at 0x7fcd00081d90>

In [7]:
#Load the data to an RDD
msData = sc.textFile("hdfs:/user/stud22/hw2/microsoft-com.data/microsoft-com.data")
msData.count()

98948

In [8]:
#Load the countries file to and RDD and store it in a Python list
countriesRdd = sc.textFile("hdfs:/user/stud22/hw2/microsoft-com.data/countries.txt")
countries=countriesRdd.collect()

In [9]:
def dataLineMap(record):
    record=record.replace('"','')
    return record.split(',')

In [10]:
#Get the attribute lines from the msData rdd and remove the " character
attributesRdd=msData.filter(lambda record: record[0]=='A').map(dataLineMap)
attributesRdd.take(3)

[[u'A', u'1287', u'1', u'International AutoRoute', u'/autoroute'],
 [u'A', u'1288', u'1', u'library', u'/library'],
 [u'A', u'1289', u'1', u'Master Chef Product Information', u'/masterchef']]

In [11]:
#Get the attribute lines from the msData rdd and remove the " character
votesRdd=msData.filter(lambda record: record[0]=='V').map(dataLineMap)
votesRdd.count()

98654

In [12]:
votesRdd.take(3)

[[u'V', u'10001', u'1000', u'1'],
 [u'V', u'10001', u'1001', u'1'],
 [u'V', u'10001', u'1002', u'1']]

In [14]:
#Create a dictionary of the sites that match the countries by the site id
countrySites=attributesRdd.filter(lambda record: record[3] in countries)\
                              .map(lambda record: (record[1],record[3])).collect()
countrySitesDict=dict(countrySites)

In [15]:
#Get the users that visited the country sites
countryUsersVotesRdd=votesRdd.filter(lambda record: record[2] in countrySitesDict)
#Create a list of these users' ids
countryUsersList=countryUsersVotesRdd.map(lambda record: record[1]).collect()
#Remove users that appear twice (remove duplicates)
countryUsersList = list(dict.fromkeys(countryUsersList))
countryUsersVotesRdd.count()

3334

In [16]:
countryUsersVotesRdd.take(3)

[[u'V', u'10004', u'1005', u'1'],
 [u'V', u'10014', u'1023', u'1'],
 [u'V', u'10035', u'1053', u'1']]

In [17]:
#Write a map to create (country-user,1) pairs to reduce in the next steps
countryVisitPairRdd = countryUsersVotesRdd.map(lambda record: (countrySitesDict[record[2]]+'-'+record[1],1))
countryVisitPairRdd.take(3)

[(u'Norway-10004', 1), (u'Spain-10014', 1), (u'Jakarta-10035', 1)]

Check if a "number_of_users_by_country" directory exists, if it does, delete it

In [18]:
%%bash
target_dir=/user/stud22/hw2/number_of_users_by_country
hadoop fs -test -d ${target_dir}
if [ $? -eq 0 ]
then 
    hdfs dfs -rm -r /user/stud22/hw2/number_of_users_by_country
fi

In [19]:
#Take the (country-user,1) map, reduce it, than map it again to (country,1) reduce again, and print it
countryUserVistsRdd=countryVisitPairRdd.reduceByKey(lambda v1,v2: v1+v2).map(lambda record: (record[0].split('-')[0],1))\
                                      .reduceByKey(lambda v1,v2: v1+v2)\
                                      .sortBy(lambda record: record[1]*-1)
countryUserVistsRdd.map(lambda record: record[0]+':'+str(record[1]))\
                   .saveAsTextFile("hdfs:/user/stud22/hw2/number_of_users_by_country")
df = pd.DataFrame(countryUserVistsRdd.collect())
display(df)

Unnamed: 0,0,1
0,Jakarta,670
1,Germany,372
2,Sweden,258
3,Taiwan,204
4,Spain,191
5,UK,186
6,France,183
7,Italy,167
8,Australia,136
9,Canada,128


Check that a file was created and its content is in the right format

In [20]:
%%bash
hdfs dfs -cat /user/stud22/hw2/number_of_users_by_country/part-00000 | head -n 3

Jakarta:670
Germany:372
Sweden:258


In [21]:
#Take the (country,1) map, reduce it, sort it, and print the top 5
countryByVisitsRdd=countryVisitPairRdd.map(lambda record: (record[0].split('-')[0],1))\
                                      .reduceByKey(lambda v1,v2: v1+v2).sortBy(lambda record: record[1]*-1)
df = pd.DataFrame(countryByVisitsRdd.take(5))
display(df)

Unnamed: 0,0,1
0,Jakarta,670
1,Germany,372
2,Sweden,258
3,Taiwan,204
4,Spain,191


We can see the data is the same, that's because no user visited a country's site twice (so counting users visits per country vs total visits per country result in the same number)

In [22]:
def reduceAverage(v1,v2):
    amount1=float(v1[0])
    count1=float(v1[1])
    amount2=float(v2[0])
    count2=float(v2[1])
    amount=(amount1*count1+amount2*count2)/(count1+count2)
    return (amount,count1+count2)

In [23]:
#Get all the votes, map them (user_id,1) then reduce by the user_id, so we get the number of pages visited by
#each user. Then take the resulting RDD and map it to ('Avg',(pages_visited,1)) pairs, then reduce by the
#key (which is "Avg", the result will be 1 line with the average and the count of users)
usersViewsRdd=votesRdd.map(lambda record: (record[1],1))\
                                           .reduceByKey(lambda v1,v2: v1+v2)
usersAverageRdd=usersViewsRdd.map(lambda record: ('Avg',(record[1],1))).reduceByKey(reduceAverage)
usersAverageRdd.collect()

[('Avg', (3.0159273638837134, 32711.0))]

In [24]:
def reduceMax(v1,v2):
    if int(v1)>=int(v2):
        return int(v1)
    return int(v2)

In [25]:
#Take the usersViewRDD that we created in the previous step, map to ("Max",pages_visited) pairs and reduce by
#the key (which is "Max", the result will be 1 line with the max visits)
usersMaxRdd=usersViewsRdd.map(lambda record: ('Max',record[1])).reduceByKey(reduceMax)
usersMaxRdd.collect()              

[('Max', 35)]

Delete the directory we created in Hadoop (and all the files and sub directories in it)

In [27]:
%%bash
hdfs dfs -rm -r /user/stud22/hw2/

rm: `/user/stud22/hw2/': No such file or directory


We'll check to verify the directory no longer exists

In [28]:
%%bash
hdfs dfs -ls /users/studd22/hw2

ls: `/users/studd22/hw2': No such file or directory
