## Import Area

In [1]:
import json
from collections import defaultdict
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext,SQLContext
from pyspark.sql import SparkSession
import pandas as pd
import os
import itertools
import boto3
from boto3.dynamodb.conditions import Key, Attr
from pandas.io.json import json_normalize


## Dynamo DB Configuration

In [2]:
client = boto3.client('dynamodb',endpoint_url='http://localhost:8000')

In [3]:

dynamodb = boto3.resource('dynamodb', region_name='us-west-2', endpoint_url="http://localhost:8000")

for i in client.list_tables()["TableNames"]:
    client.delete_table(TableName=i)


In [4]:
table_locations = dynamodb.create_table(
    TableName='Locations',
    KeySchema=[
        {
            'AttributeName': 'id',
            'KeyType': 'HASH'  #Partition key
        },
        {
            'AttributeName': 'description',
            'KeyType': 'RANGE'  #Sort key
        }
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'id',
            'AttributeType': 'S'
        },
        {
            'AttributeName': 'description',
            'AttributeType': 'S'
        },

    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 10,
        'WriteCapacityUnits': 10
    }
)


In [5]:
def insert_locations_in_dynamo(locations_dict, table):
    for k in locations_dict:
        response = table.put_item(
            Item={
                'id': k,
                 'description':locations_dict[k]
            }
        )

Variable declaration

In [11]:
#Var Declaration
#basepath is where the input files will be stored
basepath="./test"
user_data={}
locations_all={}
locations_detail={}
locations_name={}
locations_times_pds={}
locations_detail_dict={}

In [7]:
def onlyPlaces(every_stop):
    if (every_stop.get("placeVisit","null") !="null"):
        return every_stop.get("placeVisit","null")

## Data Ingestion


In [8]:
def get_location_times_pd(locations):
    locations_times=defaultdict()
    for location,time in locations:
        time["id_place"]=location
        locations_times[location+"#"+time["startTimestampMs"]]=time
    location_times_pd = pd.DataFrame.from_dict(locations_times,orient='index')
    location_times_pd["startTimestampMs"]=pd.to_numeric(location_times_pd["startTimestampMs"])
    location_times_pd["endTimestampMs"]=pd.to_numeric(location_times_pd["endTimestampMs"])
    location_times_pd.index.name="id_starttime"
    return location_times_pd

def get_location_times_dict(locations):
    locations_times=defaultdict()
    for location,time in locations:
        time["id_place"]=location
        locations_times[location+"#"+time["startTimestampMs"]]=time
    return location_times


In [9]:
#Var Declaration
basepath="./test"
user_data={}
locations_all={}
locations_detail={}
locations_name={}
locations_times_pds={}

In [14]:

%%time

i=0

#Iterate through input jsons
for entry in os.listdir(basepath):
    if os.path.isfile(os.path.join(basepath, entry)):
        with open(os.path.join(basepath, entry), 'r') as file:
            #Filter non json files
            if(os.path.splitext(entry)[-1].lower()==".json"):
                user_data[entry] = json.load(file)
                try:
                    #Filter to get only main locations,discarding trips and infered locations
                    locations_all[entry]= list(filter(onlyPlaces,user_data[entry]["timelineObjects"]))
                    #For each location, we get the periods been there
                    locations_detail_dict=[(i.get("placeVisit","null").get('location').get("placeId"),i.get("placeVisit","null").get('duration')) for i in locations_all[entry]]
                    locations_times_pds[entry]=get_location_times_pd(locations_detail_dict)
                    
                    #locations_detail[entry]= spark.createDataFrame(pd.DataFrame.from_dict(locations_detail_dict,orient='index'))
                    #spark.createDataFrame(df1_pd.reset_index(drop=False))
                    #We get the ids and store them in a local Dynamo DB (useful for GDPR)
                    locations_name={i.get("placeVisit","null").get('location').get("placeId"):i.get("placeVisit","null").get('location').get('name') for i in locations_all[entry]}
                    insert_locations_in_dynamo(locations_name,table_locations)
                except Exception as exc:
                    print("Error processing " +entry)
                    print(exc)
                    pass
            print(entry + " processed")
            i+=1


2017_APRIL.json processed
Guybrush_Threpwood_April copy 3.json processed
2017_MARCH copy 2.json processed
2017_DECEMBER copy 4.json processed
Error processing 2017_JULY copy 7.json
'timelineObjects'
2017_JULY copy 7.json processed
2017_MAY copy 3.json processed
2017_AUGUST copy 7.json processed
2017_MAY.json processed
2017_JUNE copy 3.json processed
Error processing 2017_JANUARY copy 2.json
'timelineObjects'
2017_JANUARY copy 2.json processed
Sonya_Blade copy 7.json processed
John_Doe_April copy 3.json processed
2017_OCTOBER copy 3.json processed
Error processing 2017_JULY copy.json
'timelineObjects'
2017_JULY copy.json processed
2017_NOVEMBER copy 2.json processed
David_Garcia_April copy 2.json processed
David_Garcia_April copy 10.json processed
John_Doe_April.json processed
2017_NOVEMBER copy.json processed
2017_FEBRUARY.json processed
2017_OCTOBER.json processed
David_Garcia_April copy 11.json processed
.DS_Store processed
David_Garcia_April copy 3.json processed
2017_NOVEMBER copy 

In [15]:
#Getting combinations of all month file inputs
combinations=(list(itertools.combinations(locations_all.keys(), 2)))


In [None]:
#Receives 2 dataframes (one per file entry) with locations_times format and returns a df with rows where both entries have been in the same time
# at the same space
def get_df_common_moments(df1_pd,entry_pd1,df2_pd,entry_pd2):
    df_sol=pd.DataFrame(columns=["startTimestampMs_x","endTimestampMs_x","id_place","startTimestampMs_y","endTimestampMs_y", "people_involved"])
    df2=df1_pd.merge(df2_pd,on='id_place',how='left')
    #df1 starts bef
    df_cond_1 = df2[((df2["startTimestampMs_x"] <= df2["startTimestampMs_y"])&(df2["endTimestampMs_x"] >= df2["startTimestampMs_y"]))]
    df_cond_2 = df2[(df2["startTimestampMs_x"] <= df2["endTimestampMs_y"])&(df2["startTimestampMs_x"] >= df2["startTimestampMs_y"])]
    df_cond_1.append(df_cond_2)
    df_cond_1= df_cond_1.assign(people_involved=entry_pd1+" , "+entry_pd2)
    df_sol=df_cond_1
    return df_sol


In [None]:
df_sol=pd.DataFrame(columns=["startTimestampMs_x","endTimestampMs_x","id_place","startTimestampMs_y","endTimestampMs_y", "people_involved"])
#Receives 2 dataframes (one per file entry) with locations_times format and returns a df with rows where both entries have been in the same time
# at the same space
get_df_common_moments(locations_times_pds["David_Garcia_April.json"],"David_Garcia_April.json",locations_times_pds["Guybrush_Threpwood_April.json"],'Guybrush_Threpwood_April.json')

In [None]:
df_sol

In [None]:
%%time
df_sol=pd.DataFrame(columns=["startTimestampMs_x","endTimestampMs_x","id_place","startTimestampMs_y","endTimestampMs_y", "people_involved"])
for combination in combinations:
    print(combination)
    df_sol_aux=get_df_common_moments(locations_times_pds[combination[0]],combination[0],locations_times_pds[combination[1]],combination[1])
    df_sol=pd.concat([df_sol,df_sol_aux])

In [None]:
df_sol.head(5)

In [None]:

df_sol