# 103590450 四資三 馬茂源
## Programming Exercise: MapReduce
* Goal: A MapReduce program for analyzing the check-in records

        Input: Check-in records in social networking site foursquare
        Check-in records: <user_id, venue_id, checkin_time>
        Venues: <venue_id, category, latitude, longitude>

* Output: analysis results (to be detailed later)
* WARNING: sudo pip3 install geopy

In [1]:
import findspark
findspark.init()
import pyspark, time
from operator import add
from datetime import datetime
from geopy.geocoders import Nominatim
sc = pyspark.SparkContext()
geolocator = Nominatim()

In [2]:
def row_split(string):
    items = string.split(',')[1:]
    return (items[0], ' '.join(items[1:]))

## Load file

In [3]:
checking_local_dedup = sc.textFile('4sq_data/checking_local_dedup.txt')
local_place = sc.textFile('4sq_data/local_place.txt')
venue_info = sc.textFile('4sq_data/venue_info.txt')

## Lists the top checked-in venues (most popular)

In [4]:
ts = time.time()
checking_place_id = checking_local_dedup.map(lambda venues : (venues.split(',')[1], 1)) \
                                        .reduceByKey(lambda x, y: x+y)
    
place_name = local_place.map(row_split).reduceByKey(lambda x, y: ' '.join([x,y]))

d = checking_place_id.leftOuterJoin(place_name)
result1= d.sortBy(lambda x: x[1][0], ascending=False)
print('time:%.2f'%(time.time()-ts))


time:8.63


In [5]:
result1.take(10)

[('4b0bd124f964a520e03323e3', (23790, '<Changi International Airport (SIN)>')),
 ('4c775cda93faa093a2c5f0fb', (10870, '<Cineleisure Orchard>')),
 ('4b058818f964a520a7b122e3', (9649, '<VivoCity>')),
 ('4b08e700f964a520421323e3', (8578, '<ION Orchard>')),
 ('4b5da988f964a520ae6529e3', (8454, '<nex>')),
 ('50874d10e4b0fa42fc6cc568', (6902, '<ITE College Central>')),
 ('4b058817f964a5203eb122e3', (6796, '<Jurong Point>')),
 ('4bb9c1a6935e9521d3512790', (6469, '<Bugis Junction>')),
 ('4b058815f964a520afb022e3', (6277, '<Plaza Singapura>')),
 ('4b189466f964a52049d423e3', (6259, '<313@Somerset>'))]

## Lists the most popular categories

In [6]:
def row_split2(line):
    items = line.split(',')[:-2]
    return (items[0], ' '.join(items[1:]))

In [7]:
ts = time.time()
categories =venue_info.map(row_split2) \
                        .distinct() \
                        .join(result1) \
                        .map(lambda item:(item[1][0], item[1][1][0])) \
                        .reduceByKey(add) \
                        .sortBy(lambda item: item[1], ascending = False)
print('time:%.2f'%(time.time()-ts))


time:8.41


In [8]:
categories.take(10)

[('Mall', 200240),
 ('Train Station', 57643),
 ('Coffee Shop', 51816),
 ('Café', 48711),
 ('Food Court', 47332),
 ('Home (private)', 45151),
 ('Building', 44447),
 ('Asian Restaurant', 41370),
 ('General College & University', 35717),
 ('General Entertainment', 35349)]

## Lists the top checked-in users

In [9]:
ts = time.time()
user = checking_local_dedup.map(lambda line : (line.split(',')[0], 1)) \
                             .reduceByKey(add) \
                             .sortBy(lambda item: item[1], ascending=False)
print('time:%.2f'%(time.time()-ts))        


time:5.36


In [10]:
user.take(10)

[('23116182', 10013),
 ('942443', 6737),
 ('5295751', 4369),
 ('15852099', 4284),
 ('5406597', 4176),
 ('951387', 4138),
 ('5488934', 4125),
 ('10057162', 4034),
 ('4706660', 4021),
 ('8355536', 3707)]

##  Lists the most popular time for check-ins 
### (in time slots in hours, for example, 7:00-8:00 or 18:00-19:00)


In [11]:
def convert_stamp(line):
    stamp = line.split(',')[-1]
    time = int(datetime.utcfromtimestamp(float(stamp)).strftime('%H'))
    return ('%2d:00~%2d:00'%(time, time+1), 1)

In [12]:
ts = time.time()
checkin_time = checking_local_dedup.map(convert_stamp) \
                                   .reduceByKey(add) \
                                   .sortBy(lambda item: item[1], ascending=False)
print('time:%.2f'%(time.time()-ts)) 

time:8.51


In [13]:
checkin_time.take(20)

[('11:00~12:00', 123743),
 ('10:00~11:00', 109244),
 ('12:00~13:00', 101916),
 (' 5:00~ 6:00', 93842),
 (' 9:00~10:00', 89397),
 (' 6:00~ 7:00', 89210),
 (' 4:00~ 5:00', 87843),
 (' 7:00~ 8:00', 85688),
 (' 8:00~ 9:00', 84058),
 ('13:00~14:00', 76449),
 (' 3:00~ 4:00', 67036),
 (' 2:00~ 3:00', 61943),
 ('14:00~15:00', 61167),
 (' 1:00~ 2:00', 59931),
 (' 0:00~ 1:00', 57657),
 ('15:00~16:00', 53826),
 ('23:00~24:00', 43781),
 ('16:00~17:00', 39566),
 ('17:00~18:00', 23073),
 ('22:00~23:00', 20765)]

## Optional functions 
* Using other attributes: latitude, longitude, …

In [14]:
def get_name(line):
    latitude, longitude = line.split(',')[-2:]
    return ('%s, %s'%(latitude, longitude), 1)

In [15]:
ts = time.time()
countries = venue_info.map(get_name) \
                      .reduceByKey(add) \
                      .sortBy(lambda item: item[1], ascending=False)
print('time:%.2f'%(time.time()-ts)) 

time:3.79


In [16]:
for item in countries.take(5):
    print('%s [%d]'%(geolocator.reverse(item[0]), item[1]))

Blk 106A, Punggol Field, Punggol, Northeast, Singapore [82]
619, Walking Trail, Jurong West, Southwest, Singapore [74]
Fengshan Primary School, 307, Bedok North Road, Bedok, Southeast, 469680, Singapore [72]
288, Tampines Street 22, Northeast, 520288, Singapore [69]
206B, Compassvale Drive, Sengkang, Northeast, Singapore [67]
