# Week 6 Lab: Geotags

Data Analytics at Scale 2018

In [2]:
import pandas as pd
import numpy as np
import math

import gzip
import json
import glob

import pickle

import geopy.distance

import multiprocessing


# Background

The purpose of this lab is to count the number of geotagged tweets in each city in a sample from Twitter.

There are multiple ways a user can include location information in a Twitter profile or tweet. These are described in the documentation (https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/geo-objects), and there is some analysis in this (slighly dated) paper:
> Graham, M., Hale, S. A., and Gaffney, D. (2014). Where in the world are you? Geolocation and language identification in Twitter. Professional Geographer. doi:10.1080/00330124.2014.907699. Available at https://arxiv.org/pdf/1308.0683

For this lab, we will be focusing on the "coordinates" object. If present, this object represents an exact longitude, latitutde pair set by the device from which the user was tweeting (i.e., GPS on a smartphone). An example of the "coordinates" object is:
```
    "coordinates":
    {
        "coordinates":
        [
            -75.14310264,
            40.05701649
        ],
        "type":"Point"
    }
```

**Note** that users who geotag there tweets are special and not representative of the broader Twitter using population (which is turn is, of course, not representative of any national population). Brent Hecht (http://www.brenthecht.com/) has great research on this point.


# A note on runnig this notebook

I have prepared this lab as a Jupyter notebook. I may come to greatly regret that choice.

Most exercises are probably better run as standalone Python files on the server. If you run files as a standalone `.py` file on the server, be sure to include in the `gcdist` function as well as the code for `dfCities`.

Another option is to run this notebook directly on the server and load it in a local browser. To do this, please follow these instructions:
* ssh into the server and forward a remote port. For example, to forward port 1051, I can run: 

```
ssh -L 1051:localhost:1051 abcd1234@oii-lab-003.oii.ox.ac.uk
```

* On the server open `screen` and, if necessary, set the PATH variable:

```
screen
export PATH=/opt/anaconda3/bin:$PATH
```

* Start JupyterLab on the server with the specific port you forwarded. For example,

```
jupyter-lab --port 1051
```

* A URL will appear in the terminal. Copy and paste that URL into the browser on your local machine.



# Data

## Twitter

The data we are using comes from the `public/statues` streaming API on Twitter. This returns, "a small, random sample" of tweets. Earlier this was described as "approximately 1%" but that text was removed from the documentation a few years ago.

Only a small fraction of tweets are geotagged, and I hvae filtered the sample to only include tweets with a "coordinates" object for the purposes of this course.

The data is stored on `oii-lab-003.oii.ox.ac.uk` in the `/data/twitter-geo/` directory. 

If you download the data (or a subset of it) to your local machine, please abide by Twitter's Terms of Service and general ethical principles do not attribute quotations to named individuals, etc. If you have any questions, please ask.

## Cities

The latitude and longitude of cities comes from GeoNames, a geographical database that "covers all countries and contains over eleven million placenames." We are using the cities1000.zip data from http://download.geonames.org/export/dump/ . This file includes "all cities with a population > 1000 or seats of adm div to PPLA3 (ca 130.000)"

In [3]:
#This is the full dataset from Geonames
#dfCities=pd.read_csv("/data/cities1000.txt",sep="\t",header=None,names=["geonameid","name","asciiname","alternatenames","latitude","longitude","feature class","feature code","country code","cc2","admin1 code","admin2 code","admin3 code","admin4 code","population","elevation","dem","timezone","modification date"])

#We only need a few select columns:
dfCities=pd.read_csv("/data/cities1000.txt",sep="\t",header=None,usecols=[0,1,4,5,8],names=["geonameid","name","latitude","longitude","country"])

dfCities["rlat"]=np.radians(dfCities.latitude)
dfCities["rlng"]=np.radians(dfCities.longitude)
display(dfCities.head())
print(dfCities.shape)

TEST_LAT=51.756594
TEST_LNG=-1.260865

Unnamed: 0,geonameid,name,latitude,longitude,country,rlat,rlng
0,3039154,El Tarter,42.57952,1.65362,AD,0.743153,0.028861
1,3039163,Sant Julià de Lòria,42.46372,1.49129,AD,0.741132,0.026028
2,3039604,Pas de la Casa,42.54277,1.73361,AD,0.742511,0.030257
3,3039678,Ordino,42.55623,1.53319,AD,0.742746,0.026759
4,3040051,les Escaldes,42.50729,1.53414,AD,0.741892,0.026776


(132445, 7)


# Task

The purpose of this assignment is to count the number of geotagged tweets occuring within 25km of any of any city in our list.

The lab exercises will ask you to do this in serial (single-threaded), with multiprocessing, and in a distributed environment with PySpark.

Optionally, you can write Hadoop, Dask, mpi4py or other distributed approaches.

In [4]:
#In order to do this somewhat efficiently, we will define our own numpy version to calculate one to many distances using great circle distance

import numba as nb

#Great circle distance
# https://www.movable-type.co.uk/scripts/latlong.html
# https://stackoverflow.com/a/19412565

@nb.jit(nopython=True,parallel=True)
def gcdist(dfCities,lat,lng):
    R = 6371.0087714 # approximate radius of earth in km
    rlat=math.radians(lat)
    rlng=math.radians(lng)
    
    dlng=dfCities.rlng-rlng
    dlat=dfCities.rlat-rlat

    a = np.sin(dlat / 2)**2 + np.cos(dfCities.rlat) * np.cos(rlat) * np.sin(dlng / 2)**2
    c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1 - a))
    distances = R * c
    return distances

myresults=gcdist(dfCities,TEST_LAT,TEST_LNG)

#Geodesic distance (this is more accurate, but probably unneeded for our purposes)
def geopydist(dfCities,lat,lng):
    distances=[]
    for index, row in dfCities.iterrows():
        distances.append(geopy.distance.great_circle([row.latitude,row.longitude],[lat,lng]).km)
    return distances

geopy_results=pd.Series(geopydist(dfCities,TEST_LAT,TEST_LNG))

#print(myresults.head())
#print(geopy_results.head())

#myindex=myresults.idxmin()
#gpyindex=geopy_results.idxmin()

#print(myindex,gpyindex)
#print(myresults[myindex]-geopy_results[gpyindex])

In [4]:
#print(abs(myresults-geopy_results).describe())
assert np.all(abs(myresults-geopy_results)<1), "myresults geopy differ"
print("Great. Our function matches the output of geopy")

Great. Our function matches the output of geopy


In [6]:
def degree_dist(dfCities,lat,lng):
    return ((dfCities.latitude-lat)**2+(dfCities.longitude-lng)**2)**0.5

In [15]:
#Optional
%timeit geopydist(dfCities,52.406374,16.9251681)
%timeit gcdist(dfCities,52.406374,16.9251681)
%timeit degree_dist(dfCities,52.406374,16.9251681)

11.6 s ± 367 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
14.8 ms ± 967 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
5.1 ms ± 95 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


# Exercise: Paris

First, let's just make sure we can get to the data, and things are working as expected.

We will count the number of tweets that are closer to Paris than any other city and are within 25km of Paris (48.8567, 2.3508). More formally, we only want to count a tweet when the index of the smallest distance (`idxmin`) is equal to the index of Paris in dfCities and that distance is strictly less than 25. 

Your function, `tweets_in_paris` should accept one argument a gzip file of tweets and return an integer corresponding to the number of tweets.

In [5]:
#This is the index of Paris in our dfCities dataframe. We only want to count tweets are closest to this city (and strictly less than 25km).
PARIS=dfCities[(dfCities.name=="Paris") & (dfCities.country=="FR")].index[0]
dfCities[(dfCities.name=="Paris") & (dfCities.country=="FR")]

Unnamed: 0,geonameid,name,latitude,longitude,country,rlat,rlng
37679,2988507,Paris,48.85341,2.3488,FR,0.852653,0.040994


In [10]:

def tweets_in_paris(file):
    count=0
    with gzip.open(file,"rt") as fh:
        for line in fh:
            tweet=json.loads(line)
            if "coordinates" in tweet and tweet["coordinates"]!=None:
                loc=tweet["coordinates"]["coordinates"]
                dists=gcdist(dfCities,loc[1],loc[0])
                closest=dists.idxmin()
                if closest==PARIS and dists[closest]<25:
                    count+=1
    return count        

In [11]:
#A faster way with some prep

#Just fine the cities wtihin 25km of Paris
dfCities_subset=dfCities[gcdist(dfCities,48.85341,2.3488)<25]
#display(dfCities_subset)

def tweets_in_paris2(file):
    count=0
    with gzip.open(file,"rt") as fh:
        for line in fh:
            tweet=json.loads(line)
            if "coordinates" in tweet and tweet["coordinates"]!=None:
                loc=tweet["coordinates"]["coordinates"]
                dists=gcdist(dfCities_subset,loc[1],loc[0])
                closest=dists.idxmin()
                if closest==PARIS  and dists[closest]<25:
                    count+=1
    return count 

In [12]:
#Let's run and check our code.

paris_tweets=0
for file in glob.glob("/data/twitter-geo/2016-01-01.csv.gz"):
    paris_tweets+=tweets_in_paris2(file)
print(paris_tweets)

assert paris_tweets==16, "Your count is different from my count"
print("Great work!")

16
Great work!


# Exercise: Single threaded

Let's get it right before we get it fast.

We can benchmark on a subset of the data; so, for this exercise, please only use the January 2016 data.

In [16]:
def tweets_per_city(file):
    counts=np.zeros(dfCities.shape[0])
    with gzip.open(file,"rt") as fh:
        for line in fh:
            tweet=json.loads(line)
            if "coordinates" in tweet and tweet["coordinates"]!=None:
                loc=tweet["coordinates"]["coordinates"]
                dists=gcdist(dfCities,loc[1],loc[0])
                closest=dists.idxmin()
                if dists[closest]<25:
                    counts[closest]+=1
    return counts

In [17]:
def tweets_per_city_single_thread(files):
    counts=np.zeros(dfCities.shape[0])
    for file in files:
        counts+=tweets_per_city(file)
    return counts


In [18]:
results_singlethreaded=tweets_per_city_single_thread(glob.glob("/data/twitter-geo/2016-01-01.csv.gz"))

print(results_singlethreaded[0:10])
print("A total of {} tweets were located to cities.".format(results_singlethreaded.sum()))
max_index=np.argmax(results_singlethreaded)
print("The city with the most tweets was {} with {} tweets.".format(dfCities.iloc[int(max_index)]["name"],results_singlethreaded[max_index]))

"""
Scott's output was:

[1. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
A total of 13214.0 tweets were located to cities.
The city with the most tweets was Jakarta with 220.0 tweets.

"""

[1. 0. 0. 0. 0. 1. 0. 0. 0. 0.]
A total of 13214.0 tweets were located to cities.
The city with the most tweets was Jakarta with 220.0 tweets.


"\nScott's output was:\n\n[1. 0. 0. 0. 0. 1. 0. 0. 0. 0.]\nA total of 13214.0 tweets were located to cities.\nThe city with the most tweets was Jakarta with 220.0 tweets.\n\n"

In [19]:
#Let's check our results -- pickle file was created with pickle.dump(results_singlethreaded,open("check_2016-01-01.p","wb"))
check_results=pickle.load(open("check_2016-01-01.p","rb"))
assert np.all(abs(results_singlethreaded-check_results)<1), "Your single-threaded results do not match the expected results."
print("Great work!")

Great work!


# Exercise: Multiprocessing

Please write a function that uses multiple processes (one per file) to get the number of tweets per city


In [20]:
import multiprocessing

def tweets_per_city_multiprocess(files):
    #Start up a pool and run tweets_per_city for each file in the list files
    with multiprocessing.Pool() as pool:
        results=pool.map(tweets_per_city,glob.glob("/data/twitter-geo/2016-01-0*.csv.gz"),chunksize=1)

    counts=np.zeros(dfCities.shape[0])
    for r in results:
        counts+=r
    
    #Combine the results from the pool and return the combined results
    return counts


In [None]:
multiprocess_results=tweets_per_city_multiprocess(glob.glob("/data/twitter-geo/2016-01-0*.csv.gz"))
print(multiprocess_results[0:10])

In [None]:
#Let's check our results
check_results=pickle.load(open("check_2016-01-1x.p","rb"))
assert np.all(abs(multiprocess_results-check_results)<1), "Your multiprocessing results do not match the expected results."
print("Great work!")

# Exercise MapReduce

In [35]:
def map_func(rawtweet):
    #The input is a string representation of a tweet object.
    #Load it with json
    #Find the closest city
    #Return a (key,value) tuple. For example, (xxx,1) where xxx is a row number from dfCities
    tweet=json.loads(rawtweet)
    if "coordinates" in tweet and tweet["coordinates"]!=None:
        loc=tweet["coordinates"]["coordinates"]
        dists=gcdist(dfCities,loc[1],loc[0])
        closest=dists.idxmin()
        if dists[closest]<25:
            return (closest,1)

In [36]:
def reduce_func(key,values):
    #key is a row number from dfCities
    #values is a list of integers (a list of the values for key from map_func)
    #return a (key,value) tuple. The key should be the same as the input key, while the value should be the sum of all numbers in the input values
    return (key,sum(values))

In [37]:
#Check -- this is running locally in a single thread and not at all indicative of actual performance on Hadoop
# DO NOT CHANGE THIS CELL - the sort and intermediate values are being held in memory. Larger amounts of data may cause system instability.

from collections import defaultdict


map_out=defaultdict(list)

for file in glob.glob("/data/twitter-geo/2016-01-01.csv.gz"): # DO NOT TRY MORE THAN ONE MONTH
    with gzip.open(file,"rt") as fh:
        for line in fh:
            tmp=map_func(line)
            if tmp!=None:
                map_out[tmp[0]].append(tmp[1])

final_out={}
for key,vals in map_out.items():
    k,v=reduce_func(key,vals)
    final_out[k]=v
map_out=None




In [38]:
PARIS=dfCities[(dfCities.name=="Paris") & (dfCities.country=="FR")].index[0]
print(final_out[PARIS]) #Expected output for 2016-01-01.csv.gz is 16

LONDON=dfCities[(dfCities.name=="London") & (dfCities.country=="GB")].index[0]
print(final_out[LONDON]) #Expected output for 2016-01-01.csv.gz is 26

OXFORD=dfCities[(dfCities.name=="Oxford") & (dfCities.country=="GB")].index[0]
print(final_out[OXFORD]) #Expected output for 2016-01-01.csv.gz is 2

16
26
2


# Exercise PySpark

Please count the number of tweets within 25km of each city using PySpark.

In [22]:
import pyspark
from pyspark.sql import SQLContext

#Create a spark context
sc = pyspark.SparkContext('local[*]')

#Create a SQLContext to be able to read JSON easily
sqlContext = SQLContext(sc)
df = sqlContext.read.json("/data/twitter-geo/2016-01-01.csv.gz").select("coordinates")




In [27]:
def cord2city(coord):
    loc=coord["coordinates"]["coordinates"]
    dists=gcdist(dfCities,loc[1],loc[0])
    closest=dists.idxmin()
    if dists[closest]<25:
        return closest


In [31]:
#df is a spark dataframe
#you can get the rdd with df.rdd

#apply map and reduce operations to get counts of tweets by city. 

#you can use .take(5) to get the first five results
#use .collect() to get all results


#One option using what we saw in class:
spark_results=df.rdd.map(cord2city).groupBy(lambda x: x).map(lambda x: (x[0],len(x[1]))).collect()
spark_results[0:10]

[(111284, 4),
 (52726, 15),
 (53239, 1),
 (81792, 56),
 (91269, 6),
 (91207, 2),
 (52617, 220),
 (113774, 7),
 (45043, 26),
 (8192, 21)]

In [30]:
#Another option
spark_results=df.rdd.map(cord2city).countByValue()
spark_results

defaultdict(int,
            {111284: 4,
             52726: 15,
             53239: 1,
             81792: 56,
             91269: 6,
             91207: 2,
             52617: 220,
             113774: 7,
             45043: 26,
             8192: 21,
             112752: 7,
             109151: 3,
             124335: 29,
             52799: 2,
             48517: 1,
             52894: 1,
             126370: 1,
             70242: 1,
             110047: 4,
             52614: 8,
             110089: 1,
             122870: 8,
             111833: 1,
             52440: 3,
             51927: 18,
             69222: 5,
             9463: 3,
             8574: 2,
             111608: 3,
             110837: 19,
             46390: 1,
             51827: 45,
             131988: 1,
             89326: 16,
             28814: 2,
             51933: 14,
             70233: 11,
             53548: 1,
             51727: 31,
             52834: 48,
             123683: 1,
             1

In [34]:
#The classic map-reduce option
def cord2city_v2(coord):
    loc=coord["coordinates"]["coordinates"]
    dists=gcdist(dfCities,loc[1],loc[0])
    closest=dists.idxmin()
    if dists[closest]<25:
        return (closest,1)
    else:
        return (None,1)


spark_results=df.rdd.map(cord2city_v2).reduceByKey(lambda a, b: a + b).collect()
spark_results[0:10]

[(111284, 4),
 (52726, 15),
 (53239, 1),
 (81792, 56),
 (91269, 6),
 (91207, 2),
 (52617, 220),
 (113774, 7),
 (45043, 26),
 (8192, 21)]

In [None]:
sc.stop()

# Bonus/Optional

Improve the `gcdist` function:
* Profile it to find bottlenecks
* Try using numba, TenserFlow, or another library
* Try running it on a GPU rather than CPU

Improve multiprocessing:
* Try with different chunk sizes. Is it better to group on something other than one file/day?
* Run the exercises with the full 2016 dataset in `/data/twitter-geo/` on the server.
* Implement a version with mpi4py

MapReduce:
* Implement your mapper and reducer as standalone files
* Add the code needed to read input from standard input
* Create a bash script that contains all the necessary code to run the MapReduce functions on Hadoop
* Run them with Hadoop on the server.


# Conclusion

## Exercise: Trade-offs

* What are the trade-offs of these different approaches?
* Which were fast to write?
* Which were fast to run?
* Which are most easily read / understood?
* What other factors are important?


Discussion the above points briefly here.
...
...
...
...
