In [19]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import Row
from pyspark import SparkConf
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.ml.clustering import KMeans
import pyspark.sql.types as T
import tarfile
import datetime 

Extract the file

In [114]:
# assuming the location-data-sample.tar.gz file has been downloaded and put into the same directory
tar = tarfile.open('location-data-sample.tar.gz')
tar.extractall()
tar.close()

Initiate `spark` session

In [2]:
# spark = SparkSession.builder.config(conf).getOrCreate()
spark = SparkSession \
    .builder \
    .appName('freckle') \
    .config('spark.master', 'local[*]') \
    .getOrCreate()
#     .config('spark.driver.memory', '6G') \
#     .config('spark.driver.maxResultSize', '6G') \

Read the data into a dataframe

In [3]:
df = spark.read.json('location-data-sample')

Group by `idfa`, so some aliasing and take a look at the summary of the resulting dataframe.
This will give us what `Instruction 3` asks for, i.e., the max, min, avg, std deviation of the number of location events per IDFA.

In [4]:
df.groupBy('idfa').count() \
  .select(F.col('count').cast('integer').alias('event_count')).describe().show()

+-------+------------------+
|summary|       event_count|
+-------+------------------+
|  count|            238211|
|   mean|  36.7517578953113|
| stddev|118.61139276213814|
|    min|                 1|
|    max|             15979|
+-------+------------------+



Get an idea about the schema structure and columns.

In [46]:
df.printSchema()

root
 |-- action: string (nullable = true)
 |-- api_key: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- beacon_major: long (nullable = true)
 |-- beacon_minor: long (nullable = true)
 |-- beacon_uuid: string (nullable = true)
 |-- city: string (nullable = true)
 |-- code: string (nullable = true)
 |-- community: string (nullable = true)
 |-- community_code: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- county: string (nullable = true)
 |-- county_code: string (nullable = true)
 |-- event_time: long (nullable = true)
 |-- geohash: string (nullable = true)
 |-- horizontal_accuracy: double (nullable = true)
 |-- idfa: string (nullable = true)
 |-- idfa_hash_alg: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- place: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- user_ip: string (nullable =

Looks like `GeoHash` is already given in the dataset, so we can just use it instead of recomputing it from the `lat` and `lng` columns.
As per `Instruction 4`, we need to produce geohashes in a separate dataframe. This is done below, however, for looking for possible clusters later on as per `Instruction 5`, we are also taking the `idfa` and `event_time` columns along with `geohash` column.

In [9]:
gdf = df.select(['geohash', 'idfa', 'event_time'])

# Approaches

For finding possible clusters of people, we follow two approaches. 
- First, we will try a simple way based on only location (geohash) across the entire time period that the dataset covers.
- Secondly, we will try locating clusters in smaller time intervals.

## Approach 1
For the first approach we define the following function called `find_clusters_by_location` that takes one parameter aside from the dataframe, the accuracy level (number of `geohash` digits) we want to find the cluster for.

In [6]:
def find_clusters_by_location(gdf, geohash_precision):
    area_code = 'area_code_{}'.format(geohash_precision)
    gdf = gdf.withColumn(area_code, gdf['geohash'][0:geohash_precision])
    count_stat = gdf.groupBy(area_code).agg(F.countDistinct(gdf['idfa']).alias('count_idfa')) \
                    .orderBy(['count_idfa'], ascending=[0])
    return count_stat

Let's try to find top 20 clusters like this using different geohash accuracy levels.

In [7]:
def save_and_show_clusters(gdf, geohash_precision, rows):
    clusters_df = find_clusters_by_location(gdf, geohash_precision)
    clusters_df.repartition(1).write.parquet('location_clusters_{}.parquet'.format(geohash_precision))
    clusters_df.show(rows)

In [149]:
# accuracy level 12 (which is like within a few centemeters from each other)
save_and_show_clusters(gdf, 12, 20)

+------------+----------+
|area_code_12|count_idfa|
+------------+----------+
|s00000000000|      4427|
|djfq0rzn7m70|        75|
|dq21mmek4q6q|        73|
|9vkh7wddguw5|        63|
|dpz8336uu2eq|        61|
|f244mdxpncbp|        58|
|dpm5wpyg42f9|        52|
|djfmbs7xs1j8|        48|
|f241b833vv6j|        47|
|djgzq3q23u2p|        46|
|djkvw9r4j8vp|        45|
|djt54wb39fhy|        44|
|dn6m9tgey6mq|        44|
|djdxvzvm9wvu|        43|
|dnkkg7cw8k1b|        39|
|dpscv16bk3zf|        38|
|dpherfur8ezf|        38|
|dnq1zws4u9te|        38|
|f2418x4h86s2|        37|
|c28rvbv6s3tc|        37|
+------------+----------+
only showing top 20 rows



In [49]:
# accuracy level 6 (which is like within a kilometer from each other)
save_and_show_clusters(gdf, 6, 20)

+-----------+----------+
|area_code_6|count_idfa|
+-----------+----------+
|     s00000|      4428|
|     dpz833|       394|
|     dpz83d|       356|
|     dpz839|       331|
|     dpz83e|       319|
|     dpz832|       283|
|     dpz838|       260|
|     c2b2q7|       238|
|     dpz83s|       237|
|     dpz83m|       232|
|     f25dvg|       229|
|     dpz836|       225|
|     dpz83f|       217|
|     dpz80z|       217|
|     dpz83t|       213|
|     dpz83c|       208|
|     dpz831|       206|
|     f244mt|       201|
|     dpz83w|       184|
|     f25dvf|       178|
+-----------+----------+
only showing top 20 rows



In [135]:
# accuracy level 3 (which is like within 100 kilometers from each other)
save_and_show_clusters(gdf, 3, 20)

+-----------+----------+
|area_code_3|count_idfa|
+-----------+----------+
|        dpz|     10363|
|        dqc|      5948|
|        dr4|      5239|
|        9vg|      5126|
|        9vk|      5098|
|        dpx|      4979|
|        dps|      4871|
|        s00|      4433|
|        dn5|      4166|
|        dph|      3759|
|        dp3|      3726|
|        dr1|      3677|
|        f25|      3520|
|        dnh|      3424|
|        9vf|      3390|
|        djg|      3247|
|        dr5|      3179|
|        dju|      3056|
|        dn6|      3024|
|        f24|      2977|
+-----------+----------+
only showing top 20 rows



A few discrete analyses to get cities/area around the geohash prefix.

In [138]:
# a few discrete analysis to get cities/area around the geohash prefix
df.where(df['geohash'][0:3] == 's00').select('city', 'country_code').distinct().show(20)

+---------+------------+
|     city|country_code|
+---------+------------+
|Barrigada|          GU|
+---------+------------+



In [140]:
df.where(df['geohash'][0:3] == 'dpz').select('city', 'country_code').distinct().show()

+--------------------+------------+
|                city|country_code|
+--------------------+------------+
|           Shelburne|          CA|
|             Orillia|          CA|
|         Mississauga|          CA|
|          New Lowell|          CA|
|            Georgina|          CA|
|             Caledon|          CA|
|              Minden|          CA|
|               Acton|          CA|
|          Sunderland|          CA|
|          Port Perry|          CA|
|              Oshawa|          CA|
|           Pickering|          CA|
|Whitchurch-Stouff...|          CA|
|          Blackstock|          CA|
|            Courtice|          CA|
|        Wasaga Beach|          CA|
|             Vaughan|          CA|
|         Collingwood|          CA|
|                Ajax|          CA|
|           Newmarket|          CA|
+--------------------+------------+
only showing top 20 rows



## Observation

It looks like there is definitely a cluster going on at `geohash` `s00000000000`, which is a cluster of 4427 distinct people (`idfa`). However, since this is in full 12-digit precision of the geohash, which is a matter of a few centimeters, it probably indicates those people were all on some specific point of interest, like a landmark or something like that. From the analysis above, it looks like it is a point in the village of `Barrigada` in `Guam`.

Things don't change too much if we bring down the precision to 6, which is still a pretty small distance of like 1 kilometer. Still `s00000` is the highest cluster zone. However, we do see some other smaller clusters appearing.

At a precision of 3, which is like 100 kilometers, however, things seem to change quite a bit. The geohash `dpz` seems to be the dominant cluster with 10363 different people (idfa). From the anaysis above, `dpz` seems to be a bunch of towns and cities in `Ontario`, `Canada`. The `s00` area seems to move to position 8 in order in terms of number of different `idfa`s.

As a decision, it really depends on how big or small of an area range we consider as a cluster-area. That in turn, will depend on the specific kind of business/problem we are dealing with.

## Approach 2

Now let's try clustering considering a smaller time range. For a quick way of doing this, we could try dividing the entire time span into some time buckets and consider clusters of people for events that happened within that time bucket only.

For example, we could try dividing the entire time span into buckets of 2 hours (7200 seconds) and consider clusters of people for events that happened within that time bucket only.

Before going ahead, let's do a quick analysis on how much time period the entire dataset covers.

In [11]:
min_time = gdf.groupBy().min('event_time').collect()[0][0]
max_time = gdf.groupBy().max('event_time').collect()[0][0]
print(min_time)
min_time_str = datetime.datetime.fromtimestamp(min_time).strftime('%Y-%m-%d %H:%M:%S')
print(min_time_str)
print(max_time)
max_time_str = datetime.datetime.fromtimestamp(max_time).strftime('%Y-%m-%d %H:%M:%S')
print(max_time_str)

1491004658
2017-03-31 19:57:38
1491091296
2017-04-01 20:01:36


It looks like the entire period is just a few minutes over a day. So the location-based approach (Approach 1) might not have been too bad. However, let's carry on some finer time-grained analysis.

Now define a function that will calculate the time buckets with a supplied bucket interval.

In [12]:
def get_df_with_time_bucket(df, bucket_interval):
    time_buckets_start = [i for i in range(min_time, max_time + 1, bucket_interval)]
    time_buckets_end = [i for i in range(min_time + bucket_interval - 1, max_time + bucket_interval, bucket_interval)]
    time_buckets = [time_buckets_start, time_buckets_end]
    R = Row('start_time', 'end_time')
    time_df = spark.sparkContext.parallelize([R(*r) for r in zip(*time_buckets)]).toDF()
    df = gdf.join(time_df, (gdf.event_time >= time_df.start_time) & (gdf.event_time <= time_df.end_time), 'inner')
    return df

Now we can write modified versions of our functions from Approach 1 that uses both location and time buckets and tries to find the top cluster within each time bucket. (These functions can be easily modified to find the top-k clusters)

In [43]:
def find_clusters_by_location_and_time_bucket(gdf, geohash_precision, time_bucket_interval):
    area_code = 'area_code_{}'.format(geohash_precision)
    gdf = gdf.withColumn(area_code, gdf['geohash'][0:geohash_precision])
    gdf_time = get_df_with_time_bucket(gdf, time_bucket_interval)
    gdf_time = gdf_time.withColumn(area_code, gdf_time['geohash'][0:geohash_precision])
    count_stat = \
        gdf_time.groupBy(area_code, 'start_time').agg(F.countDistinct(gdf_time['idfa']).alias('count_idfa')) \
        .withColumn('row_number', F.row_number().over(Window.partitionBy('start_time').orderBy(F.col('count_idfa').desc())))              
    
    count_stat = count_stat.where(count_stat['row_number'] == 1).orderBy(F.col('start_time')) \
                           .select(area_code, 'start_time', 'count_idfa')
    return count_stat

In [44]:
def save_and_show_clusters_with_time_bucket(gdf, geohash_precision, time_bucket_interval, rows=30):
    clusters_df = find_clusters_by_location_and_time_bucket(gdf, geohash_precision, time_bucket_interval)
    clusters_df.repartition(1).write.parquet('location_time_clusters_{}.parquet'.format(geohash_precision))
    clusters_df.show(rows)

Now we can examine clusters for different location and time buckets by tuning the `geohash_precision` and `time_bucket_interval` parameters. Similar to approach 1, let's try finding the clusters for `geohash_precision` 12, 6, 3 and with a `time_bucket_interval` of half 2 hours (7200 seconds).

In [45]:
save_and_show_clusters_with_time_bucket(gdf, 12, 7200)

+------------+----------+----------+
|area_code_12|start_time|count_idfa|
+------------+----------+----------+
|s00000000000|1491004658|       549|
|s00000000000|1491011858|       448|
|s00000000000|1491019058|       268|
|s00000000000|1491026258|       108|
|s00000000000|1491033458|        75|
|s00000000000|1491040658|       151|
|s00000000000|1491047858|       435|
|s00000000000|1491055058|       595|
|s00000000000|1491062258|       676|
|s00000000000|1491069458|       597|
|s00000000000|1491076658|       619|
|s00000000000|1491083858|       588|
|s00000000000|1491091058|        13|
+------------+----------+----------+



In [46]:
save_and_show_clusters_with_time_bucket(gdf, 6, 7200)

+-----------+----------+----------+
|area_code_6|start_time|count_idfa|
+-----------+----------+----------+
|     s00000|1491004658|       550|
|     s00000|1491011858|       448|
|     s00000|1491019058|       268|
|     s00000|1491026258|       108|
|     s00000|1491033458|        75|
|     s00000|1491040658|       151|
|     s00000|1491047858|       435|
|     s00000|1491055058|       595|
|     s00000|1491062258|       676|
|     s00000|1491069458|       597|
|     s00000|1491076658|       619|
|     s00000|1491083858|       588|
|     s00000|1491091058|        13|
+-----------+----------+----------+



In [47]:
save_and_show_clusters_with_time_bucket(gdf, 3, 7200)

+-----------+----------+----------+
|area_code_3|start_time|count_idfa|
+-----------+----------+----------+
|        dpz|1491004658|      4317|
|        dpz|1491011858|      3713|
|        dpz|1491019058|      2606|
|        dqc|1491026258|      1782|
|        dqc|1491033458|      1773|
|        dqc|1491040658|      2258|
|        dpz|1491047858|      3744|
|        dpz|1491055058|      4502|
|        dpz|1491062258|      4815|
|        dpz|1491069458|      4806|
|        dpz|1491076658|      4764|
|        dpz|1491083858|      4551|
|        dpz|1491091058|       363|
+-----------+----------+----------+



## Observation

It turns out there is not too much difference in the time-bucketed results from the results that we got in Approach 1.
At higher location precisions (area within 1 kilometer or less), still the `s000000` prefixed `geohash` seems to have the dominating clusters throughout the entire period.

Also, in lower location precisions (area around 100 kilometers), most of the time throughout the period the `dpz` geohash seems to have the dominating cluster. However, there is a period of 6 hours or so (around April 1, 2017 2 -8 PM EST), where geohash `dqc` (cities mainly in Maryland, US) seems to dominate.

In this way, we can tune the `geohash` and `time_bucket` parameters to find out finer-grained clusters. Also, the way we define the time buckets might be a factor here. We have followed a naive approach of defining the buckets, but it can be defined other ways as well.

# Additional analysis

We can perform a numberof additional analyses on the dataset for finding user behaviour. For shortage of time, we perform one simple analysis. We try to find the distinct geohashes per `idfa` (user) and get an idea about how far users are possibly roaming. Interestingly, this analysis also points out **an error inour original analysis** as we will see below. For simplicity, we will do the analysis with only the first 3 digits of the `geohash`.

In [108]:
gdf_with_geo_3 = gdf.withColumn('area_code_3', gdf['geohash'][0:3])

In [109]:
count_stat = gdf_with_geo_3.groupBy('idfa').agg(F.countDistinct('area_code_3').alias('count_geo_3')) \
                        .orderBy('count_geo_3', ascending=False)

In [106]:
count_stat.count()

238211

In [105]:
count_stat.where('count_geo_3 == 1').count()

195852

So from the above counts, it looks like the majority (over 80%) of users are not roaming too much, which is expected. Let's have a look at the data where an `idfa` covers more than 1 distinct 3-digit prefixed geohashes.

In [107]:
count_stat.where('count_geo_3 > 1').show(100, False)

+------------------------------------+-----------+
|idfa                                |count_geo_3|
+------------------------------------+-----------+
|00000000-0000-0000-0000-000000000000|424        |
|653b5c7a-64d2-4b73-9ddf-47a00c95a6e6|20         |
|97cca691-c2d9-4e0c-8039-ac336bf9f7fb|20         |
|13df7837-02a8-4d8c-9e0f-5491af634a1a|20         |
|a19d0dca-af40-4c9f-b7a8-ae04481822d5|19         |
|3439e5f3-77aa-4b15-831e-0bce48218421|19         |
|45606497-5fab-484c-8d18-03a31a1e30e9|19         |
|d7ece31d-8022-4e8e-a0a1-a735cb1f5041|18         |
|be7c52ae-6ba7-4c88-a34c-cf59ca226873|18         |
|3a1934f4-6810-4463-be4f-7cb0ea7fbfb2|18         |
|2960d8de-1383-4384-adf9-ebf9b06187da|18         |
|7685fd11-2594-4a71-9fa0-3f12356fe92f|17         |
|84a210e6-9411-4345-ac37-b3d83e48c723|17         |
|1df55f52-136c-449a-855f-8587929ba615|16         |
|65d3cf09-acb3-44ad-bc0f-c97d61a62a63|16         |
|7e2a913e-f0af-4b97-8e7e-d3fb0cb161ef|16         |
|362eb796-25b8-4835-964f-4447e4

Now here's the interesting finding. Looks like the `idfa` with all 0's covers the most number of areas. However, this all 0 `idfa` is not really a distinct user, it is the default `idfa` that is used for devices that has limited ad-tracking. So this row doesn't really give us a roaming information, rather points out the fact that there are a lot of users around the world who have limited ad-tracking set on their devices, and it's well possible that individually these users don't cover a lot of geographic area (do not roam too much).

Also, here is the **error in our original analysis**. The original analysis considered each `idfa` as a distinct user. However, as we see now, the all 0 `idfa` is not actually one single user. This may not change the general conclusions of our original analysis, but this is something to be considered.

Now, there seems to be quite a few other `idfa`s that is roaming over different 3-digit geohashes.
For simplicity, let's try to find this by using even bigger geographic areas, namely with only the first digit of `geohash`.

In [119]:
gdf_with_geo_1 = gdf.withColumn('area_code_1', gdf['geohash'][0:1])

DataFrame[geohash: string, idfa: string, event_time: bigint, area_code_1: string]

In [120]:
count_stat_1 = gdf_with_geo_1.groupBy('idfa').agg(F.countDistinct('area_code_1').alias('count_geo_1')) \
                        .orderBy('count_geo_1', ascending=False)

In [121]:
count_stat_1.where('count_geo_1 > 1').show(100, False)

+------------------------------------+-----------+
|idfa                                |count_geo_1|
+------------------------------------+-----------+
|00000000-0000-0000-0000-000000000000|16         |
|c1869823-9618-4cda-bf95-6df51074a78a|4          |
|d092bbf5-16b6-406f-8f26-6e3f0c2781fe|3          |
|ff45bbfd-46f3-414d-bece-6e07edebe650|3          |
|ba77aaee-5042-4ab2-b916-a27141d96067|3          |
|cc9b1c70-38da-4555-931d-52266b9958bf|3          |
|808eb093-ea01-49f0-a905-0f0ccd9b8784|3          |
|08164428-3fa4-42df-9738-1678488af3ff|3          |
|b986562c-4751-431d-89e8-19d4fb940d4e|3          |
|21714129-ca02-49bb-8ad5-d931bcbb91fe|3          |
|8ee3cd08-5942-49d8-95e2-3a81af2b614d|3          |
|954a669a-f1e9-4907-92e7-3ea3d5ac193a|3          |
|def36e7c-b116-478d-afef-5ee7f50871a9|3          |
|905c1e70-b4ee-41df-b8bc-23011a1adf56|3          |
|ed502ede-3341-481c-bb52-cddc565d6d22|3          |
|3d63e2ea-d78a-48bb-8623-c3b57eb83c46|3          |
|b338f208-89f7-4af8-b788-cf3b94

Now let's use one of the above `idfa`s (aside from the all 0 one) to see what geographic are he is roaming in.

In [123]:
gdf_with_geo_1.where(gdf_with_geo_1['idfa'] == 'c1869823-9618-4cda-bf95-6df51074a78a').select('area_code_1') \
    .distinct().show()

+-----------+
|area_code_1|
+-----------+
|          f|
|          d|
|          c|
|          9|
+-----------+



So it turns out we have users who roamed around and covered quite a bit around the world. However, overall the number of such users is low. 

The analysis, of course, is not complete. We can dig into this further and try figuring out user behaviour using and tuning different parameters. For shortage of time, we are ending our analysis here.

In [124]:
# count_stat_reverse = gdf.groupBy('idfa', gdf['geohash'][0:3]).count()

In [125]:
# df.where(df['idfa'] == '00000000-0000-0000-0000-000000000000').select(df['geohash'][0:3]).distinct().count()

In [126]:
# count_stat_with_time.orderBy(['count_idfa', 'event_span'], ascending=[0, 1]).show(100)

In [127]:
# df.where(df['geohash'][0:3] == 'dqc').select('city', 'country_code', 'geohash').distinct().show(20)