### Location related modules:

    This module contains a few applications for location related use cases.
    Since location data sources are often huge, this module is designed to work on spark dataframe.
    
    Including use cases:
    
    1. Approximate lat long points into bins using geohash.
    2. Find co-location based on geohash.

In [4]:
import os
import random
import pandas as pd
import numpy as np
import findspark
#spark path using default value
findspark.init()
import pyspark
import pyarrow
from pyspark.sql import SQLContext
    
from importlib import reload
from pyspark.sql.types import *
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.functions import collect_list,collect_set,struct,col,substring,lit,udf,concat
from pyspark.sql.functions import unix_timestamp,from_unixtime,to_timestamp,to_date
from pyspark.sql.functions import year,month,dayofmonth,hour,minute

import geohash as gh
### Note: track_id is the person's id.

def initialize_spark(app_name='location'):
    conf = pyspark.SparkConf()\
        .setAppName(app_name)\
        .setMaster('local')\
        .set('spark.driver.memory', '8g')\
        .set('spark.executor.memory', '8g')\
        .set('spark.executor.instances', 4)\
        .set('spark.executor.cores', 4)\
        .set('spark.driver.maxResultSize', '8g')\
        .set('spark.sql.shuffle.partitions', 100)\
        .set('spark.default.parallelism', 200)\
        .set('spark.sql.broadcastTimeout', 36000)\
        .set('spark.kryoserializer.buffer.max', '1024m')\
        .set('spark.sql.execution.arrow.enabled', 'false')\
        .set('spark.dynamicAllocation.enabled', "False")\
        .set('spark.port.maxRetries',30) 

    sc = pyspark.SparkContext.getOrCreate(conf)
    spark = pyspark.sql.SparkSession(sc)
    sqlContext = SQLContext.getOrCreate(sc)    
    return sc,spark,sqlContext

In [2]:
sc,spark,sqlContext = initialize_spark()

In [9]:
# pandas to spark df, loading data
def equivalent_type(f):
    '''
    add more spark sql types like bigint ...
    '''
    if f == 'datetime64[ns]': return DateType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

def pandas_to_spark(sqlcontext,pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
        struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlcontext.createDataFrame(pandas_df, p_schema)

def load_basic_location_data(path,sqlContext):
    '''
    get location raw data given a period
    inputs:
    * path: path to csv location data
    * sqlContext
    output:
    * location_spark_df: spark df, containing location info
    '''
    location_pandas_df = pd.read_csv(path)
    
    location_spark_df = pandas_to_spark(sqlContext,location_pandas_df)
    
    location_spark_df = location_spark_df.withColumn("points",struct(col("latitude").cast("Float"),col("longitude").cast("Float")))
    # convert into spark df's time dtype
    location_spark_df = location_spark_df.withColumn("datetime", to_timestamp(location_spark_df.time, 'yyyy-MM-dd HH:mm:ss'))
    
    return location_spark_df.select(['track_id','points','datetime'])

@udf("string")
def geohash_w_time(struct_input):
    # using geohash to put points to bins
    encoded = gh.encode(struct_input[0],struct_input[1],precision=6)
    return encoded

def get_hashed_points_w_time(path,sqlContext):
    '''
    use geohash to hash location points
    stamp with time and create time-location struct
    '''
    basic_location_df = load_basic_location_data(path,sqlContext)
    basic_location_df = basic_location_df.withColumn('geohashed_point',geohash_w_time('points'))
    
    basic_location_df = basic_location_df.withColumn("easytime", concat(concat(year(col('datetime')),\
                                            lit("-"),month(col('datetime')),lit("-"),\
                                           dayofmonth(col('datetime')),\
                                           lit(":"),hour(col('datetime')))))
    
    basic_location_df = basic_location_df.withColumn("points_w_time",struct(col("easytime"),col("geohashed_point")))

    keep_cols = ['track_id','easytime','points_w_time','geohashed_point']
    
    return basic_location_df.select(*keep_cols)

def find_co_location(sdf):
    sdf_agg = sdf.groupBy("easytime","geohashed_point").agg(collect_set(col("track_id")).alias("co_occurrence"))
    return sdf_agg

@udf("Boolean")
def has_co_occurrence(set_input):
    bool_val = len(set_input) > 1
    return bool_val

In [6]:
path = 'datasets/go_track_trackspoints.csv'

In [27]:
sdf = get_hashed_points_w_time(path,sqlContext)

In [28]:
sdf.show()

+--------+-----------+--------------------+---------------+
|track_id|   easytime|       points_w_time|geohashed_point|
+--------+-----------+--------------------+---------------+
|       1|2014-9-13:7|[2014-9-13:7, 7nj...|         7nj9u8|
|       1|2014-9-13:7|[2014-9-13:7, 7nj...|         7nj9u8|
|       1|2014-9-13:7|[2014-9-13:7, 7nj...|         7nj9u8|
|       1|2014-9-13:7|[2014-9-13:7, 7nj...|         7nj9u8|
|       1|2014-9-13:7|[2014-9-13:7, 7nj...|         7nj9u8|
|       1|2014-9-13:7|[2014-9-13:7, 7nj...|         7nj9u8|
|       1|2014-9-13:7|[2014-9-13:7, 7nj...|         7nj9u8|
|       1|2014-9-13:7|[2014-9-13:7, 7nj...|         7nj9u8|
|       1|2014-9-13:7|[2014-9-13:7, 7nj...|         7nj9u8|
|       1|2014-9-13:7|[2014-9-13:7, 7nj...|         7nj9u8|
|       1|2014-9-13:7|[2014-9-13:7, 7nj...|         7nj9u8|
|       1|2014-9-13:7|[2014-9-13:7, 7nj...|         7nj9u8|
|       1|2014-9-13:7|[2014-9-13:7, 7nj...|         7nj9u8|
|       1|2014-9-13:7|[2014-9-13:7, 7nj.

In [29]:
sdf2 = find_co_location(sdf)

In [30]:
sdf2.show()

+-------------+---------------+-------------+
|     easytime|geohashed_point|co_occurrence|
+-------------+---------------+-------------+
|2014-10-23:18|         7nj9ut|         [28]|
| 2014-11-4:19|         7njv59|     [30, 31]|
|2014-11-28:12|         7nj9u1|         [39]|
|  2015-2-12:3|         7nj9uz|        [131]|
|   2015-3-2:3|         7nj9v5|        [153]|
|  2015-5-19:6|         7nj9uk|      [37960]|
|  2015-5-19:4|         7nj9gy|      [37962]|
| 2015-5-29:10|         7nj9sb|      [38001]|
| 2015-5-29:12|         7nj9km|      [38002]|
|2015-11-23:12|         7nj9u0|      [38080]|
| 2014-9-30:10|         7nj9uh|         [13]|
| 2014-11-4:19|         7njutj|     [30, 31]|
| 2014-11-4:19|         7njucq|     [30, 31]|
| 2014-12-6:10|         7nj9ux|     [48, 47]|
|  2015-2-19:7|         7nj9sq|   [140, 141]|
|  2015-2-23:9|         7nj9t4|        [149]|
| 2015-5-29:11|         7nj9uf|      [38002]|
| 2015-5-29:12|         7nj9kw|      [38002]|
|   2015-6-3:5|         7nj9ge|   

In [31]:
sdf3 = basic_location_df = sdf2.withColumn('result',has_co_occurrence('co_occurrence'))

In [32]:
sdf3.show()

+-------------+---------------+-------------+------+
|     easytime|geohashed_point|co_occurrence|result|
+-------------+---------------+-------------+------+
|2014-10-23:18|         7nj9ut|         [28]| false|
| 2014-11-4:19|         7njv59|     [30, 31]|  true|
|2014-11-28:12|         7nj9u1|         [39]| false|
|  2015-2-12:3|         7nj9uz|        [131]| false|
|   2015-3-2:3|         7nj9v5|        [153]| false|
|  2015-5-19:6|         7nj9uk|      [37960]| false|
|  2015-5-19:4|         7nj9gy|      [37962]| false|
| 2015-5-29:10|         7nj9sb|      [38001]| false|
| 2015-5-29:12|         7nj9km|      [38002]| false|
|2015-11-23:12|         7nj9u0|      [38080]| false|
| 2014-9-30:10|         7nj9uh|         [13]| false|
| 2014-11-4:19|         7njutj|     [30, 31]|  true|
| 2014-11-4:19|         7njucq|     [30, 31]|  true|
| 2014-12-6:10|         7nj9ux|     [48, 47]|  true|
|  2015-2-19:7|         7nj9sq|   [140, 141]|  true|
|  2015-2-23:9|         7nj9t4|        [149]| 

In [33]:
sdf3.filter(sdf3.result==True).count()

266

In [34]:
spark.stop()