# Analysis of Loudacre data with Spark

First, save the paths to the necessary files as objects and create RDDs from the objects

In [38]:
accountsPath = "/loudacre/accounts/part-m-*"
weblogsPath = "/loudacre/weblogs/FlumeData.*"
baseStationsPath = "/loudacre/base_stations.tsv"

accountsRDD = sc.textFile(accountsPath)
weblogsRDD = sc.textFile(weblogsPath)
baseStationsRDD = sc.textFile(baseStationsPath)

**How many users are there in total?**

Assuming each user has a distinct user ID, map by user ID and count the number of distinct entries

In [39]:
totalUsers = accountsRDD.map(lambda line:(line.split(',')[0]))\
    .distinct()\
    .count()
print('there are ' + str(totalUsers) + ' unique users in total')

there are 129764 unique users in total


**How many cities have users?**

In other words, how many unique cities appear in the user accounts records?
As above, we map on the city column and count the number of distinct entries (the city is stored in column 6 - this is sense-checked in the commented line).

In [40]:
totalUserCities = accountsRDD.map(lambda line:(line.split(',')[6]))\
    .distinct()
    
#print(totalUserCities.take(100))

totalUserCities=totalUserCities.count()

print('there are ' + str(totalUserCities) + ' unique cities with users')

there are 56 unique cities with users


**How many cities have base stations?**

In other words, how many unique cities are listed in the base stations records?
Map on the city column (note, don't split by comma this time) and count the number of distinct entries

In [46]:
totalBaseStationCities = baseStationsRDD .map(lambda line:(line.split('\t')[2]))\
    .distinct()\
#sense-check to make sure we have the correct column
#print(totalBaseStationCities.take(5))

totalBaseStationCities=totalBaseStationCities.count()

print('there are ' + str(totalBaseStationCities) + ' unique cities with base stations')


[u'Blue', u'Loomis', u'Baker City', u'Packwood', u'Bullhead City']
there are 238 unique cities with base stations


**How many base stations are there?**

Let's not assume each station key (column 0) has been entered sequentially, even though it looks like they have been. Instead count the number of unique keys

In [42]:
totalBaseStations = baseStationsRDD.map(lambda line:(line.split('\t')[0]))\
    .distinct()\
    .count()
print('there are ' + str(totalBaseStations) + ' base stations in total')

there are 377 base stations in total


**Number of users in each city in Oregon**


To do this, first create an RDD from the accounts data with city as the key and 1 as the value. 



<font color='red'> We could reducebycount on this, but it wouldn't include cities with zero users (I am conservatively assuming we want cities with base stations but no users included too). </font>


To include these, create a RDD from the base station data with city as the key and 0 as the value.
Union these RDDs and reduceByKey to get the number of users for each city.

All cities with users or base stations are shown, ordered by total number of users (descending)

In [43]:
#always filter first for efficiency - map by city
ORuserCityRDD = accountsRDD.filter(lambda line: line.split(',')[7] == "OR")\
    .map(lambda line:(line.split(',')[6],1))

# map base station data by city but value =0   
ORcityBaseStationRDD = baseStationsRDD.filter(lambda line: line.split('\t')[3]=="OR")\
    .map(lambda line:(line.split('\t')[2],0))

#union to get list of all cities and reduce to count users
ORuserCityRDD=ORuserCityRDD.union(ORcityBaseStationRDD)\
    .reduceByKey(lambda v1,v2: v1+v2)\
    .sortBy(lambda x : -x[1])

for (city,users) in ORuserCityRDD.collect():
    print city , users


Portland 4602
Bend 1528
Eugene 1520
Medford 1511
Salem 1496
Klamath Falls 1463
Pendleton 1455
Umatilla 0
Butte Falls 0
Saint Benedict 0
Beaver 0
Bridgeport 0
Gaston 0
Oregon City 0
Riverside 0
Government Camp 0
Wilbur 0
Bates 0
Trail 0
Long Creek 0
Bridal Veil 0
North Powder 0
Molalla 0
Corvallis 0
Halsey 0
Baker City 0
Dillard 0
Junction City 0


**Count of users and bases for each city.**
To achieve this we map both the user and baseStation RDDs with city as the key. We can then reducebykey to count the number of each and join (full join necessary to include cities with no base station)
The city is taken as a user input (for example 'Sacramento' provides the answer to the original question; 'Salem' is an example with zero base stations - giving 'None')

In [44]:
#users in each city (key is lowercase)
usersByCity = accountsRDD.map(lambda line:(line.split(',')[6].lower(),1))\
    .reduceByKey(lambda v1,v2: v1+v2)

#base stations in each city
baseStationsByCity = baseStationsRDD.map(lambda line:(line.split('\t')[2].lower(),1))\
    .reduceByKey(lambda v1,v2: v1+v2)

#join the above RDDs to get users and stations per city
countsByCity=usersByCity.fullOuterJoin(baseStationsByCity)\
    .filter(lambda(key, value): value[0]!=None) #filter to only include cities with users


#take user input and return data for city
requestedCity=raw_input('please enter city: ')
cityRow = countsByCity.lookup(requestedCity.lower())

if len(cityRow) > 0:
    for (users, values) in cityRow:
        print 'city:' , requestedCity, '|\ttotal users:', users, '\ttotal base stations:', values

else:
    print 'city name not recognised or city has no users'

please enter city: sacramento
city: sacramento |	total users: 6820 	total base stations: 4


**Number of requests from the most active user, by state**
This requires mapping the weblogs RDD on userID and reducing (reducebykey) to get the total number of requests per user.
The resulting RDD is then joined with the accounts data and reducebykey is used again to get the max user in each state by requests (negating the need for a group by operation).



In [45]:
#sum total requests by user
requestsPerUser = weblogsRDD.map(lambda line: (line.split(' ')[2],1))\
    .reduceByKey(lambda v1,v2: v1+v2)

#make a RDD of each user with key=user_id and value=state
requestsByState = accountsRDD.map(lambda line:(line.split(',')[0],line.split(',')[7]))

#join the above RDDs to get state and request data for every user (right join to only include users we have data on)
#re-map to take state as key
requestsByState = requestsByState.rightOuterJoin(requestsPerUser)\
    .map(lambda(user,(state,count)):(state,(user,count)))\
    .reduceByKey(lambda v1 , v2: max(v1, v2, key = lambda line: line[-1])) #use nested lambda function to get max count (ie last element) for each key value

    
for (state, values) in requestsByState.collect():
    print state , '|','\tBiggest User:' , values[0] , '\tRequests: ' , values[1]


CA | 	Biggest User: 192 	Requests:  1538
NV | 	Biggest User: 20382 	Requests:  125
OR | 	Biggest User: 60223 	Requests:  122
AZ | 	Biggest User: 50367 	Requests:  120
