## Start SparkSession 

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('DEChallenge').getOrCreate()

## Load data and explore 

In [3]:
#read source data
loc_event_df = spark.read.json('location-data-sample/*')

In [4]:
loc_event_df.printSchema()

root
 |-- action: string (nullable = true)
 |-- api_key: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- beacon_major: long (nullable = true)
 |-- beacon_minor: long (nullable = true)
 |-- beacon_uuid: string (nullable = true)
 |-- city: string (nullable = true)
 |-- code: string (nullable = true)
 |-- community: string (nullable = true)
 |-- community_code: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- county: string (nullable = true)
 |-- county_code: string (nullable = true)
 |-- event_time: long (nullable = true)
 |-- geohash: string (nullable = true)
 |-- horizontal_accuracy: double (nullable = true)
 |-- idfa: string (nullable = true)
 |-- idfa_hash_alg: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- place: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- user_ip: string (nullable =

In [5]:
loc_event_df.select('idfa').distinct().count()

238211

In [6]:
loc_event_df.count()

8754673

## 3. What is the max, min, avg, std deviation of the number of location events per IDFA? We define a location event to be one record in the sample file. 

In [7]:
#count of location events per idfa
loc_event_df_idfa = loc_event_df.groupBy('idfa').count()
loc_event_df_idfa = loc_event_df_idfa.withColumnRenamed('count', 'num_loc_events')

In [9]:
loc_event_df_idfa.show()

+--------------------+--------------+
|                idfa|num_loc_events|
+--------------------+--------------+
|6bc2a6e1-c710-4dd...|           158|
|3a8ddabb-f98e-46b...|             6|
|e118f5a2-7949-429...|            30|
|5b9785d5-529d-49f...|           362|
|0376307f-d608-452...|            73|
|ad2aad6b-6af3-4eb...|            29|
|7360e2a4-9d0b-4fa...|            34|
|aad8e6c9-ae62-448...|            88|
|0c837a76-54f9-4d4...|            95|
|16c2c602-3f0c-4ce...|            15|
|789864f3-01d1-410...|           248|
|e2af50ab-61eb-423...|           107|
|c940218f-906e-401...|             9|
|6d866497-b0ab-498...|            42|
|ace6dd48-1ac3-48e...|            32|
|f92c8e26-a827-485...|           158|
|4d88225c-97ea-413...|            45|
|fcf44327-9fd7-44e...|           165|
|32444aa4-67b8-42c...|            35|
|6ad974b0-4de7-4b3...|            30|
+--------------------+--------------+
only showing top 20 rows



In [10]:
from pyspark.sql.functions import (max, min, avg, stddev)

In [11]:
#max, min, avg, stddev
loc_event_df_idfa.select([max('num_loc_events').alias('max_loc_events'),
                          min('num_loc_events').alias('min_loc_events'),
                          avg('num_loc_events').alias('avg_loc_events'),
                          stddev('num_loc_events').alias('std_loc_events')
                         ]).show()

+--------------+--------------+----------------+------------------+
|max_loc_events|min_loc_events|  avg_loc_events|    std_loc_events|
+--------------+--------------+----------------+------------------+
|         15979|             1|36.7517578953113|118.61139276213812|
+--------------+--------------+----------------+------------------+



## 4. Produce geohashes for all coordinates in a new RDD or DataFrame

In [12]:
#count distinct geohash values
loc_event_df.select('geohash').distinct().count()

2763239

In [13]:
#grab unique lat, lng pair
coord_df = loc_event_df.select(['lat', 'lng']).distinct()
#count unique lat,lng pair
coord_df.count()

2767342

### Since the unique geohash count (2,763,239) and coordinate count (2,767,342) does not match, we need to generate separate geohash values for the coordinates. I am going to use monotonically_increasing_id function to generate unique geohash values

In [14]:
from pyspark.sql.functions import monotonically_increasing_id
#create geohash for each coordinate using monotonically_increasing_id
coord_df = coord_df.withColumn('geohash_coord', monotonically_increasing_id())

In [15]:
coord_df.show()

+-----------+------------+-------------+
|        lat|         lng|geohash_coord|
+-----------+------------+-------------+
| 40.4127551|-104.7690948|            0|
|  43.450253|  -80.490263|            1|
| 39.7185289| -74.9114232|            2|
| 40.2688506| -79.8246405|            3|
| 30.6381235| -88.2478296|            4|
| 45.4375043|-122.8177072|            5|
| 37.7038763|-121.8985499|            6|
| 36.2916264|-115.1806714|            7|
| 39.9424803| -76.7897343|            8|
|  38.961574| -76.9149852|            9|
|  46.799501| -93.2504123|           10|
|47.89985909|-117.7854043|           11|
| 40.1515891|   -74.20003|           12|
| 37.0519105|-100.9378785|           13|
| 34.5617546| -83.2575422|           14|
| 41.9748247| -87.9034822|           15|
| 33.6445113|-112.2756285|           16|
| 40.4125194|-111.7549916|           17|
| 41.8793913| -88.0734717|           18|
| 37.7619249|-121.4385202|           19|
+-----------+------------+-------------+
only showing top

## 5. Using the geohashes, determine if there clusters of people at any point in this dataset. If so, how many people and how close are they?

In [39]:
#geohash location info + user_ip (to calculate population)
geohash_info_df = loc_event_df.select(['geohash', 'user_ip', 'city', 'community_code', 'country_code', 'county_code',
                                       'state_code', 'place'])
#drop duplicates to get unique user event.
geohash_info_df = geohash_info_df.dropDuplicates()

total_population = geohash_info_df.count()

geohash_ppl_df = geohash_info_df.groupby(['city', 'community_code', 'country_code', 'county_code', 'state_code',
                                          'place']).count().orderBy('count', ascending=False)
#rename column
geohash_ppl_df = geohash_ppl_df.withColumnRenamed('count', 'num_people')

In [40]:
geohash_ppl_df.show()

+-------------+--------------+------------+-----------+----------+-------------+----------+
|         city|community_code|country_code|county_code|state_code|        place|num_people|
+-------------+--------------+------------+-----------+----------+-------------+----------+
|      Houston|              |          US|        201|        TX|      Houston|     34905|
|     Columbus|              |          US|        049|        OH|     Columbus|     19133|
|       Dallas|              |          US|        113|        TX|       Dallas|     18062|
|      Atlanta|              |          US|        121|        GA|      Atlanta|     15919|
|    Cleveland|              |          US|        035|        OH|    Cleveland|     15123|
|    Baltimore|              |          US|        510|        MD|    Baltimore|     14875|
| Indianapolis|              |          US|        097|        IN| Indianapolis|     13407|
|   Cincinnati|              |          US|        061|        OH|   Cincinnati|

### Not much insights for community and county, so I will see if there are any clusters in the country, state, city, and place level. 

In [19]:
from pyspark.sql.functions import col
#country-level: compute population precentage and show in descending order.
geohash_ppl_df.groupby('country_code').agg({'num_people': 'sum'})\
              .withColumn('population_percentage', col('sum(num_people)') / total_population) \
              .orderBy('population_percentage', ascending=False).show()

+------------+---------------+---------------------+
|country_code|sum(num_people)|population_percentage|
+------------+---------------+---------------------+
|          US|        2352971|   0.7661168985721007|
|          CA|         677967|  0.22074304161599587|
|          JP|           5788| 0.001884547072163...|
|          GU|           5756| 0.001874128014404...|
|          MX|           4547| 0.001480482988446...|
|          GB|           2436|  7.93150771905662E-4|
|          BR|           2213| 7.205429631474671E-4|
|          DE|           1640| 5.339767101499531E-4|
|          TR|           1627| 5.297439679353498E-4|
|          AU|           1572| 5.118362124120282E-4|
|          PR|           1245|  4.05366465937007E-4|
|          PH|           1219| 3.969009815078004...|
|          PK|            980| 3.190836438700938...|
|          CO|            959| 3.122461372157347E-4|
|          ES|            897| 2.920592128076268E-4|
|          RU|            863| 2.8098896393866

In [20]:
#state-level: compute population precentage and show in descending order.
geohash_ppl_df.groupby('state_code').agg({'num_people': 'sum'})\
              .withColumn('population_percentage', col('sum(num_people)') / total_population) \
              .orderBy('population_percentage', ascending=False).show()

+----------+---------------+---------------------+
|state_code|sum(num_people)|population_percentage|
+----------+---------------+---------------------+
|        ON|         347008|  0.11298426233885055|
|        TX|         249000|   0.0810732931874014|
|        FL|         165248|  0.05380401426759722|
|        GA|         149717|  0.04874718970336617|
|        CA|         147038| 0.047874919211602925|
|        OH|         139163|  0.04531085421621824|
|        TN|         105931| 0.034490662733472364|
|        AL|         105504|  0.03435163343150039|
|        NC|          99211|  0.03230266060407743|
|        AB|          95831|  0.03120214762828058|
|        MI|          83035| 0.027035826906891068|
|        BC|          82829| 0.026968754222567354|
|        PA|          81647| 0.026583900276593422|
|        SC|          71145| 0.023164495758303906|
|        QC|          69383| 0.022590796390447677|
|        VA|          68852| 0.022417905150758883|
|        IL|          63016| 0.

In [21]:
#city-level: compute population precentage and show in descending order.
geohash_ppl_df.groupby('city').agg({'num_people': 'sum'})\
              .withColumn('population_percentage', col('sum(num_people)') / total_population) \
              .orderBy('population_percentage', ascending=False).show()

+------------+---------------+---------------------+
|        city|sum(num_people)|population_percentage|
+------------+---------------+---------------------+
|     Toronto|          95486| 0.031089817161816108|
|      Ottawa|          37746| 0.012289929817878126|
|     Houston|          35102| 0.011429055170538811|
|     Calgary|          33923|  0.01104517801122979|
|    Edmonton|          28377| 0.009239425063369035|
|    Columbus|          24220| 0.007885924341360892|
| Mississauga|          23853| 0.007766430772687091|
|    Montréal|          22907|  0.00745841737768596|
|   Vancouver|          20641|  0.00672061785012511|
|     Atlanta|          19404| 0.006317856148627859|
|      Dallas|          19268| 0.006273575153152009|
|   Cleveland|          17077| 0.005560195292213871|
|    Winnipeg|          16994| 0.005533170861151403|
|   Baltimore|          15288| 0.004977704844373465|
|    Hamilton|          14509| 0.004724065907052237|
|    Richmond|          14305| 0.0046576444138

In [41]:
#place-level: compute population precentage and show in descending order.
geohash_ppl_df.groupby('place').agg({'num_people': 'sum'})\
              .withColumn('population_percentage', col('sum(num_people)') / total_population) \
              .orderBy('population_percentage', ascending=False).show()

+-------------+---------------+---------------------+
|        place|sum(num_people)|population_percentage|
+-------------+---------------+---------------------+
|      Houston|          35102| 0.011429055170538811|
|     Columbus|          24220| 0.007885924341360892|
|      Atlanta|          19404| 0.006317856148627859|
|       Dallas|          19268| 0.006273575153152009|
|    Cleveland|          17077| 0.005560195292213871|
|    Baltimore|          15288| 0.004977704844373465|
|   Cincinnati|          13846| 0.004508196054107469|
| Indianapolis|          13590| 0.004424843592035281|
| Philadelphia|          13237| 0.004309908361131054|
|    Las Vegas|          13091| 0.004262371410105...|
|  Saint Louis|          12916|  0.00420539218798585|
| Jacksonville|          12442| 0.004051059894930315|
|   Birmingham|          12001| 0.003907472255188772|
|      Chicago|          11856| 0.003860260899718197|
|    Knoxville|          11770| 0.003832259681990...|
|    Charlotte|          116

### Conclusion
1. Most of the users are in North America - predominantly in US (76%)
2. Ontario had the highest number of users by state
3. Toronto had the highest number of users by city
4. In US, the users are more scattered aroud the regions, where in Canada the users are concentrated in ON, QC, BC, and AB

## 6. Write any findings into a local parquet format file for later use. 

###  Saving loc_event_df_idfa, coord_df, geohash_ppl_df to a parquet file.

In [77]:
loc_event_df_idfa.write.parquet('loc_event_df_idfa')

In [79]:
geohash_ppl_df.write.parquet('geohash_ppl_df')