# Exploring the dataset

An intial look at the raw unzipped data shows the files are in JSON format. Since JSON is a schema-based format and DataFrames are optimized to work efficiently with a data with a schema, I have used dataframes to analyze this data.

In [1]:
# initialize spark shell
import os
execfile(os.path.join(os.environ["SPARK_HOME"], 'python/pyspark/shell.py'))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 2.7.9 (default, Dec 15 2014 10:37:34)
SparkSession available as 'spark'.


In [2]:
# read in the data and print the schema

df = spark.read.json('/Users/akaur/Desktop/DataEngChallenge/location-data-sample')
df.printSchema()

root
 |-- action: string (nullable = true)
 |-- api_key: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- beacon_major: long (nullable = true)
 |-- beacon_minor: long (nullable = true)
 |-- beacon_uuid: string (nullable = true)
 |-- city: string (nullable = true)
 |-- code: string (nullable = true)
 |-- community: string (nullable = true)
 |-- community_code: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- county: string (nullable = true)
 |-- county_code: string (nullable = true)
 |-- event_time: long (nullable = true)
 |-- geohash: string (nullable = true)
 |-- horizontal_accuracy: double (nullable = true)
 |-- idfa: string (nullable = true)
 |-- idfa_hash_alg: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- place: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- user_ip: string (nullable =

# Metrics for location events per IDFA

For the max, min, avg, std deviation of the number of location events per IDFA, first we get the count of the records per IDFA and store it in a new dataframe, then we get the values for the required metrics using the very handy 'describe' function from the spark inbuilt sql functions

In [15]:
# number of locations events(records) per IDFA
idfa_count = df.groupBy(df.idfa).count()
idfa_count.show(10)

+--------------------+-----+
|                idfa|count|
+--------------------+-----+
|b5b237fe-4ab2-4f0...|   28|
|0894896b-1b58-4b8...|   58|
|0446d012-6d80-4b2...|   36|
|564fa141-580a-445...|   72|
|4bf5568f-4369-421...|   31|
|b2a03e10-3b45-479...|   94|
|f4503b93-f2ec-418...|   48|
|fe64cf85-bd56-4d1...|  245|
|71f57c4d-78fa-448...|   33|
|ef2d34f9-07fd-4cf...|   89|
+--------------------+-----+
only showing top 10 rows



In [17]:
# get the aggregation metrics for the count column
idfa_count.describe('count').show()

+-------+------------------+
|summary|             count|
+-------+------------------+
|  count|            238211|
|   mean| 36.79234376246269|
| stddev|118.69280626757613|
|    min|                 1|
|    max|             15999|
+-------+------------------+



# Geohashes of all coordinates
In order to get the geohashes for all coordinates, we select distinct tuples of lat, long and geohash by first selecting the three columns and then doing a distinct over the dataframe

In [19]:
geohash_df = df.select(df.lat, df.lng, df.geohash).distinct()
geohash_df.show()

+----------+------------+------------+
|       lat|         lng|     geohash|
+----------+------------+------------+
|43.8799046| -79.7387178|dpz39s04cpnd|
|43.0179897| -81.2123496|dpwhxznvzne0|
| 45.268008|  -75.306878|f243w2g1q83c|
|39.8381067|  -85.996203|dp4feh73z1r7|
| 45.429277|  -73.312884|f25f7wphtzp9|
|33.6508982| -86.2153236|djfxqjgytq56|
|33.4084427|-111.9747922|9tbq5cy6q1p7|
|32.9452214| -87.1452896|djf52vqxzd6k|
|38.0779899| -78.4741732|dqb0w7bu75z3|
|42.3185033| -89.0907915|dp88sxy54hfc|
|32.6601029| -96.5708318|9vg3f3x6bs2w|
| 33.577859|  -79.029531|djzz18smnqwd|
|31.2965654| -85.7604213|dje403r5tqeh|
|41.8792052| -94.0947443|9zmn4zfk199m|
|32.4636283|  -99.708028|9vc0ejff6dct|
|37.7026782|-122.4443142|9q8ymxwf08tg|
|45.1598639| -93.8706887|cbj0zvj7q2v3|
|45.3641305| -75.7888115|f24456k30wy5|
| 27.067164|  -82.419769|dhv3urrv8jjf|
| 30.391329| -88.6527619|dj2uyst42fme|
+----------+------------+------------+
only showing top 20 rows



In [10]:
# save the dataframe locally for later lookup if required
geohash_df.write.parquet('/Users/akaur/PycharmProjects/DataEngChallenge-Amanjot/geohash_coordinates.parquet')

# Geohash - based clusters

Using the information that locations/coordinates with similar geohash prefixes are close, I look at different length of geohash prefixes to see the volume of distinct IDFAs in the proximity and whether they can be classified as clusters

In [21]:
import pyspark.sql.functions as func
df.groupBy(df.geohash)\
    .agg(func.countDistinct(df.idfa).alias('vol'))\
    .sort('vol', ascending=False).show()

+------------+----+
|     geohash| vol|
+------------+----+
|s00000000000|4427|
|djfq0rzn7m70|  75|
|dq21mmek4q6q|  73|
|9vkh7wddguw5|  63|
|dpz8336uu2eq|  61|
|f244mdxpncbp|  58|
|dpm5wpyg42f9|  52|
|djfmbs7xs1j8|  48|
|f241b833vv6j|  47|
|djgzq3q23u2p|  46|
|djkvw9r4j8vp|  45|
|djt54wb39fhy|  44|
|dn6m9tgey6mq|  44|
|djdxvzvm9wvu|  43|
|dnkkg7cw8k1b|  39|
|dnq1zws4u9te|  38|
|dpscv16bk3zf|  38|
|dpherfur8ezf|  38|
|f2418x4h86s2|  37|
|c2b2mbftz52c|  37|
+------------+----+
only showing top 20 rows



Here we see that geohash 's00000000000' has a very high volume of IDFAs compared to any other geohash, this seems like an outlier caused by possibly artificial traffic or some other inaccuracy in the data. Let's check the cities/coordinates for this geohash.

In [22]:
df.select(df.city)\
    .filter(df.geohash == 's00000000000')\
    .distinct()\
    .show()

+---------+
|     city|
+---------+
|Barrigada|
+---------+



Since the city Barrigada responsible for this unusually high amount of traffic is a small village with a population of  < 9K, it is safe to assume this is some kind of artificial/fraud/bot traffic. Going forward, we filter out this geohash in our analysis in order to avoid any skew in data because of this.

In [68]:
df.filter(df.geohash != 's00000000000')\
    .groupBy(func.substring(df.geohash, 1, 4))\
    .agg(func.countDistinct('idfa').alias('count'))\
    .sort('count', ascending=False)\
    .show(100)

+------------------------+-----+
|substring(geohash, 1, 4)|count|
+------------------------+-----+
|                    dpz8| 4315|
|                    dpz2| 3676|
|                    dpz9| 2535|
|                    f25d| 1938|
|                    c3nf| 1825|
|                    dpxr| 1792|
|                    f244| 1749|
|                    9vk1| 1743|
|                    9vg5| 1689|
|                    9vk0| 1636|
|                    dr4e| 1612|
|                    dn5b| 1595|
|                    c3x2| 1581|
|                    dpsc| 1574|
|                    dqcx| 1569|
|                    9vg4| 1569|
|                    dphg| 1558|
|                    dqcr| 1535|
|                    djgz| 1514|
|                    dnh0| 1510|
|                    9vk4| 1415|
|                    f25e| 1414|
|                    9vff| 1399|
|                    dpsb| 1384|
|                    c2b2| 1384|
|                    dpj5| 1356|
|                    djup| 1347|
|         

# Analysis of behaviour of IDFAs

To get more insights into the behaviour of IDFAs, I would like to look at the following variables:
- Top countries by volume
- Top cities by volume
- Distribution over platforms
- Time of day activity

In [63]:
import pyspark.sql.functions as func
from iso3166 import countries
def toAlpha3(code):
    return countries.get(code).alpha3
 
udfToAlpha3=func.udf(toAlpha3)

top_countries_df = df.groupBy(df.country_code)\
    .agg(func.countDistinct('idfa').alias('count'))\
    .orderBy(['count'], ascending=False)
    
top_countries_df.show(10)

+------------+------+
|country_code| count|
+------------+------+
|          US|189577|
|          CA| 42627|
|          GU|  4486|
|          JP|   706|
|          MX|   626|
|          GB|   399|
|          BR|   324|
|          TR|   259|
|          DE|   222|
|          AU|   213|
+------------+------+
only showing top 10 rows



In [64]:
# for plotting, need to convert the country code to 3 letter code
pandas_df =  top_countries_df.withColumn('alpha3',udfToAlpha3(df.country_code)).toPandas()

In [41]:
import plotly.plotly as py
import pandas as pd

data = [ dict(
        type = 'choropleth',
        locations = pandas_df['alpha3'],
        z = pandas_df['count'],
        text = pandas_df['country_code'],
        colorscale = [[0,"rgb(5, 10, 172)"],[0.35,"rgb(40, 60, 190)"],[0.5,"rgb(70, 100, 245)"],\
            [0.6,"rgb(90, 120, 245)"],[0.7,"rgb(106, 137, 247)"],[1,"rgb(220, 220, 220)"]],
        autocolorscale = False,
        reversescale = True,
        marker = dict(
            line = dict (
                color = 'rgb(180,180,180)',
                width = 0.5
            ) ),
        colorbar = dict(
            autotick = True,
            tickprefix = '',
            title = 'Number of IDFAs'),
      ) ]

layout = dict(
    title = 'Distribution of IDFAs over the world',
    geo = dict(
        showframe = False,
        showcoastlines = False,
        projection = dict(
            type = 'Mercator'
        )
    )
)

fig = dict( data=data, layout=layout )
py.iplot( fig, validate=False, filename='world_map' )

### Distribution by top cities
Top cities by volume of IDFAs:

In [67]:
top_cities_df = df.groupBy(df.city)\
    .agg(func.countDistinct('idfa').alias('count'))\
    .orderBy(['count'], ascending=False)
    
top_cities_df.show(50)

+-------------+-----+
|         city|count|
+-------------+-----+
|      Toronto| 6307|
|    Barrigada| 4446|
|      Houston| 3885|
|     Columbus| 3018|
|       Ottawa| 2635|
|      Atlanta| 2604|
|  Mississauga| 2439|
|       Dallas| 2407|
|      Calgary| 2352|
|     Edmonton| 2021|
|     Montréal| 1958|
|     Richmond| 1908|
|    Cleveland| 1858|
|    Baltimore| 1847|
|    Vancouver| 1662|
| Indianapolis| 1564|
|   Cincinnati| 1547|
| Philadelphia| 1540|
|     Hamilton| 1492|
|    Knoxville| 1466|
|  Saint Louis| 1456|
|      Detroit| 1406|
|    Charlotte| 1376|
|     Columbia| 1362|
|   Fort Worth| 1336|
|   Birmingham| 1308|
|      Chicago| 1255|
|     Brampton| 1205|
|    Nashville| 1201|
|    Arlington| 1194|
|     Winnipeg| 1185|
| Jacksonville| 1168|
|Oklahoma City| 1153|
|      Raleigh| 1144|
|   Burlington| 1110|
|  Kansas City| 1106|
|        Omaha| 1091|
|  Minneapolis| 1056|
|      Orlando| 1049|
|      Vaughan| 1041|
|  Springfield| 1030|
|       London| 1009|
|    Las V

If we look at the top 50 cities, most of these are big cities located in the US and Canada, which is expected. Barrigada is an exception as discussed earlier. Toronto has a higher chunk of traffic as compared to other cities, which points to the data being possibly collected for clients which are local. It is interesting that Houston has another bigger chunk of traffic, instead of a city with a bigger population, which again points to a big client/clients located in Houston.

### Distribution by platform
Next, let's take a look at the top platforms:

In [66]:
plat_df = df.groupBy(df.platform)\
    .agg(func.countDistinct('idfa').alias('count'))\
    .orderBy(['count'], ascending=False)
    
plat_df.show()

+--------+------+
|platform| count|
+--------+------+
| android|204602|
|     ios| 33609|
+--------+------+



This is interesting. I would have expected majority of the population to be from iOS since iPhones seem to be more popular in North America, but the volume for Android almost 9x that of iOS. It seems that more iOS users have opted out of interest based ads and turned off the tracking. Another possibility is that the apps from which this data comes are not as popular with iPhone users as opposed to Android phone users.

### Time of day analysis:

We try to see whether there is a pattern in the time of day corresponding to the volume of the IDFAs overall

In [60]:
overall_tod = df.groupBy(func.hour(func.from_unixtime(df.event_time)).alias('utc_hour'))\
    .agg(func.countDistinct('idfa').alias('idfa_vol'))\
    .orderBy('utc_hour')\
    .toPandas()

In [61]:
import plotly.graph_objs as go
data_overall = [go.Scatter(x=overall_tod['utc_hour'], y=overall_tod['idfa_vol'])]
py.iplot(data_overall, filename='time_of_day_overall')

In [57]:
van_tod = df.filter(df.city == 'Vancouver')\
    .select(func.from_unixtime(df.event_time).alias('utc_time'), df.idfa)\
    .withColumn('pst_time', func.from_utc_timestamp('utc_time', 'PST'))\
    .groupBy(func.hour('utc_time').alias('utc_hour'), func.hour('pst_time').alias('pst_hour'))\
    .agg(func.countDistinct('idfa').alias('idfa_vol'))\
    .orderBy('utc_hour')\
    .toPandas()
    

data_van = [go.Scatter(x=van_tod['pst_hour'], y=van_tod['idfa_vol'])]
py.iplot(data_van, filename='time_of_day_van')

In [58]:
tor_tod = df.filter(df.city == 'Toronto')\
    .select(func.from_unixtime(df.event_time).alias('utc_time'), df.idfa)\
    .withColumn('est_time', func.from_utc_timestamp('utc_time', 'EST'))\
    .groupBy(func.hour('utc_time').alias('utc_hour'), func.hour('est_time').alias('est_hour'))\
    .agg(func.countDistinct('idfa').alias('idfa_vol'))\
    .orderBy('utc_hour')\
    .toPandas()


data_tor = [go.Scatter( x=tor_tod['est_hour'], y=tor_tod['idfa_vol'])]
py.iplot(data_tor, filename='time_of_day_tor')