In [88]:
import os
import sys
import pyspark
from pyspark import SparkContext, SparkConf
import pandas as pd


In [90]:
%%bash
hdfs dfs -ls /user/stud12/ #Presenting the folders that exsist in the HDFS

Found 2 items
drwx------   - stud12 stud12          0 2021-06-12 11:08 /user/stud12/.Trash
drwxr-xr-x   - stud12 stud12          0 2021-06-10 21:57 /user/stud12/.sparkStaging


In [91]:
%%bash
hdfs dfs -mkdir /user/stud12/test ##Creating a folder for our data for this assignment 

In [92]:
%%bash

hdfs dfs -put /home/stud12/Desktop/microsoft-com.data/ /user/stud12/test/ ##Uploaing the files to HDFS

In [94]:
%%bash
hdfs dfs -ls /user/stud12/test/microsoft-com.data #showing the current files in the HDFS 

Found 3 items
-rw-r--r--   3 stud12 stud12        387 2021-06-12 11:09 /user/stud12/test/microsoft-com.data/countries.txt
-rw-r--r--   3 stud12 stud12    1491520 2021-06-12 11:09 /user/stud12/test/microsoft-com.data/microsoft-com.data
-rw-r--r--   3 stud12 stud12       3629 2021-06-12 11:09 /user/stud12/test/microsoft-com.data/microsoft-com.info


In [95]:
##Converting data to RDDs 
myrdd = sc.textFile("hdfs:/user/stud12/test/microsoft-com.data/microsoft-com.data")

countries_rdd=sc.textFile("hdfs:/user/stud12/test/microsoft-com.data/countries.txt")

In [97]:
# Get list of unique countris from the data  
countries_list = list(set(countries_rdd.collect())) 
print countries_list



[u'Brazil', u'Canada', u'Caribbean', u'Czech Republic', u'France', u'Slovakia', u'Ireland', u'Italy', u'Argentina', u'Norway', u'Israel', u'Australia', u'Turkey', u'Venezuela', u'China', u'Chile', u'Jakarta', u'Belgium', u'Germany', u'Hong Kong', u'Spain', u'Netherlands', u'UK', u'Denmark', u'Poland', u'Finland', u'Sweden', u'Korea', u'Thailand', u'Switzerland', u'Uruguay', u'New Zealand', u'Russia', u'Portugal', u'Mexico', u'South Africa', u'India', u'Peru', u'Colombia', u'Hungary', u'Taiwan', u'Slovenija']


In [103]:
## Attributes data - filtering the data and keeping only the rows with the relevant countries 
## input: original RDD
##output: RDD with relevant countries ([attribute ID, country,url])

Attributes=myrdd.filter(lambda line: len(line.split()) >0)\
.map(lambda line: line.split(','))\
.filter(lambda line: 'A' in line[0])\
.map(lambda line: [str(line[1]),str(line[3]).replace('"',''),str(line[4]).replace('"','')])\
.filter(lambda line: line[1] in countries_list)




In [104]:
## Vote data -  filtering the relevant rows 
##input: original data
##output: [attribue ID,userID]. we changed the order in order to join attributes with votes 
votes=myrdd.filter(lambda line: len(line.split()) >0)\
.map(lambda line: line.split(','))\
.filter(lambda line: 'V' in line[0])\
.map(lambda line: [str(line[2]),str(line[1])])




In [105]:
# Join attributes with Votes, the key is attribue ID (=page number)
#output: [attribute ID, (country,userID)]
countries_users = Attributes.join(votes)
countries_users.take(2)

[('1142', ('South Africa', '10372')), ('1142', ('South Africa', '13352'))]

In [106]:
##question4
##filtering the unique lines and creating map reduce of [country,number of users per country] 
number_of_users_per_country= countries_users.map(lambda line: (line[0],line[1])).distinct()\
.map(lambda line: (line[1][0],1))\
 .reduceByKey(lambda v1, v2: v1+v2)
    
##Convering to dataframe for visualization     
report1=number_of_users_per_country.toDF(['Country','CountUser'])
report1.show()

##Exctracting the top 5 results 
top_5 = number_of_users_per_country.top(5, key=lambda x: x[1])
print top_5




+--------------+---------+
|       Country|CountUser|
+--------------+---------+
|        Brazil|      121|
|        Canada|      128|
|         Italy|      167|
|     Hong Kong|       35|
|        Turkey|        9|
|         India|        9|
|     Venezuela|        8|
|   Switzerland|       31|
|         China|       26|
|      Slovakia|       11|
|       Belgium|       45|
|     Caribbean|        5|
|       Germany|      372|
|        Norway|       42|
|Czech Republic|       16|
|       Denmark|       55|
|       Jakarta|      670|
|       Hungary|       15|
|        Russia|       52|
|      Thailand|       11|
+--------------+---------+
only showing top 20 rows

[('Jakarta', 670), ('Germany', 372), ('Sweden', 258), ('Taiwan', 204), ('Spain', 191)]


In [107]:
##Saving the file in HDFS
number_of_users_per_country.saveAsTextFile("hdfs:/user/stud12/test/number_of_users_per_country")


In [108]:
%%bash
hdfs dfs -cat /user/stud12/test/number_of_users_per_country/part-* | head ##showing the file was saved in HDFS


('Brazil', 121)
('Canada', 128)
('Italy', 167)
('Hong Kong', 35)
('Turkey', 9)
('India', 9)
('Venezuela', 8)
('Switzerland', 31)
('China', 26)
('Slovakia', 11)


cat: Unable to write to output stream.
cat: Unable to write to output stream.
cat: Unable to write to output stream.


In [110]:
#question #6
#1. orgnizinf the data
## Attributes data - filtering and orgnizing the data 
## input: original RDD
##output: filtered RDD ([attribute ID, country,url])

pages=myrdd.filter(lambda line: len(line.split()) >0)\
.map(lambda line: line.split(','))\
.filter(lambda line: 'A' in line[0])\
.map(lambda line: [str(line[1]),str(line[3]).replace('"',''),str(line[4]).replace('"','')])

#2. join with votes data

pages_users=pages.join(votes)
pages_users.take(3)




[('1142', ('South Africa', '10372')),
 ('1142', ('South Africa', '13352')),
 ('1142', ('South Africa', '19019'))]

In [114]:

##finding the average number of pages per user 

##creating list of unique users
users_rdd=votes.map(lambda line: line[1])
unique_users=list(set(users_rdd.collect()))


##creating tuple of[user,number of pages] using map reduce
count_pages_per_user=pages_users.map(lambda line: (line[0],line[1][1])).distinct()\
                .map(lambda line:(line[1],1))\
 .reduceByKey(lambda v1,v2:(v1+v2))

##Total number of pages 
total_pages=count_pages_per_user.map(lambda line: (1,line[1]))\
.reduceByKey(lambda v1,v2:(v1+v2))

##calculating the average per user 
average=1.0*total_pages.take(1)[0][1]/len(unique_users)
print "the average number is " + str(float(average))


                         



the average number is 3.01592736388


In [117]:
#Question #7
##creating tuple of[page,count_visits] and finding the page ID with maximum number of visit 
results=pages_users.map(lambda line: (line[0],line[1][1])).distinct()\
                .map(lambda line:(line[0],1))\
 .reduceByKey(lambda v1,v2:(v1+v2)).top(1, key=lambda x: x[1])

print "The page id with the maximum number of visits is: {} ,\
the number of visits: {}".format(results[0][0], results[0][1])


The page id with the maximum number of visits is: 1008 ,the number of visits: 10836


In [86]:
%%bash 

hdfs dfs -rm -r /user/stud12/test ### Deleting our files from HDFS



rm: `/user/stud12/test': No such file or directory
