In [1]:
# Add user specific python libraries to path
import sys
sys.path.insert(0, "/home/smehra/local-packages")

In [2]:
import numpy as np
import pandas as pd

import time
from datetime import timedelta  
from datetime import date
from datetime import datetime

import graphlab as gl
gl.set_runtime_config('GRAPHLAB_CACHE_FILE_LOCATIONS', '/data/tmp/smehra/tmp')
gl.set_runtime_config('GRAPHLAB_DEFAULT_NUM_PYLAMBDA_WORKERS', 48)
import migration_detector as md

This non-commercial license of GraphLab Create for academic use is assigned to xtai@berkeley.edu and will expire on January 29, 2021.


[INFO] graphlab.cython.cy_server: GraphLab Create v2.1 started. Logging: /tmp/graphlab_server_1583778080.log


In [3]:
import logging
# setup logging to a specified file
log_file = '/data/tmp/smehra/logs/displacement_location_segments_computation.log'
logging.basicConfig(filename=log_file,
                            filemode='a+',
                            format='%(asctime)s %(levelname)s %(message)s',
                            datefmt='%Y-%m-%d %H:%M:%S',
                            level=logging.DEBUG)
logger=logging.getLogger(__name__)

In [4]:
import os
os.environ["SPARK_CONF_DIR"] = "/data/tmp/spark/conf"

In [5]:
import pyspark
import random

from pyspark.sql import SparkSession
from pyspark.sql import HiveContext

config = pyspark.SparkConf().setAll([('spark.ui.port', 4050), 
                                     ('spark.ui.enabled', True),
                                     
                                     # if running in local mode, driver will be only executor
                                     # hence, give driver as much memory as possible if running in local mode
                                     ('spark.driver.memory','50g'), 
                                     
                                     # set up executor config if running in cluster or client mode
                                     #('spark.executor.instances', '5'), 
                                     #('spark.executor.cores', '5'), 
                                     #('spark.executor.memory', '5g'), 
                                     #('spark.executor.memoryOverhead', '500m'),
                                     
                                     # more partitions means smaller partition size per task
                                     # hence, would reduce memory load
                                     ('spark.sql.shuffle.partitions', '1000'),
                                     
                                     # increase max result size if you are "collecting" big dataset 
                                     # driver will need more memory to collect
                                     ('spark.driver.maxResultSize', '2g'),
                                     
                                     # set location spark should use for temporary data
                                     ('spark.local.dir', '/data/tmp/smehra/tmp'),
                                     # Set location of hive database
                                     ('spark.sql.warehouse.dir', '/data/tmp/hive_warehouse'),
                                     # Add mysql connector jar to use mysql as metastore service
                                     ('spark.jars', '/data/tmp/spark/jars/mysql-connector-java-5.1.30-bin.jar'),
                                    
                                     # KryoSerializer is faster and more compact than the Java default serializer.
                                     ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
                                     
                                     # G1GC overcomes the latency and throughput limitations with the old garbage collectors.
                                     ('spark.executor.extraJavaOptions','-XX:+UseG1GC')])

spark = SparkSession.builder \
        .enableHiveSupport() \
        .config(conf=config) \
        .master("local[30]") \
        .appName("afgh_project_smehra_hive_setup") \
        .getOrCreate()

# Get the Hive Context
hive = HiveContext(spark.sparkContext)

spark.sparkContext._conf.getAll()


[(u'spark.driver.memory', u'50g'),
 (u'spark.repl.local.jars',
  u'file:///data/tmp/spark/jars/mysql-connector-java-5.1.30-bin.jar'),
 (u'spark.sql.shuffle.partitions', u'1000'),
 (u'spark.jars', u'/data/tmp/spark/jars/mysql-connector-java-5.1.30-bin.jar'),
 (u'spark.app.name', u'afgh_project_smehra_hive_setup'),
 (u'spark.master', u'local[30]'),
 (u'spark.executor.extraJavaOptions', u'-XX:+UseG1GC'),
 (u'spark.executor.id', u'driver'),
 (u'spark.driver.port', u'35327'),
 (u'spark.app.id', u'local-1583778092907'),
 (u'spark.local.dir', u'/data/tmp/smehra/tmp'),
 (u'spark.serializer', u'org.apache.spark.serializer.KryoSerializer'),
 (u'spark.ui.port', u'4050'),
 (u'spark.sql.warehouse.dir', u'/data/tmp/hive_warehouse'),
 (u'spark.sql.catalogImplementation', u'hive'),
 (u'spark.rdd.compress', u'True'),
 (u'spark.serializer.objectStreamReset', u'100'),
 (u'spark.driver.maxResultSize', u'2g'),
 (u'spark.submit.deployMode', u'client'),
 (u'spark.driver.host', u'umtiti.ischool.berkeley.edu')

# 1. Divide Daily Modal Location data into csv based buckets

In [6]:
daily_modal_locations = hive.sql('SELECT * FROM afghanistan.user_daily_modal_locations')
#daily_modal_locations = daily_modal_locations.sample(fraction =  0.01, seed = 42)
daily_modal_locations.show(5)

+----------------+--------------+----+-----+---------+-----------+
|          userId|tower_group_id|Year|Month|DaySeries|district_id|
+----------------+--------------+----+-----+---------+-----------+
|0YrK2DB31ne4lAmW|           469|2015|    7|      838|       1612|
|0YrK2DB33nW4lAmW|           489|2015|    7|      838|       1602|
|0YrK2DB3415olAmW|           735|2015|    7|      838|       1401|
|0YrK2DB36nyGlAmW|           288|2015|    7|      838|       2312|
|0YrK2DB36wE4lAmW|           550|2015|    7|      838|       1601|
+----------------+--------------+----+-----+---------+-----------+
only showing top 5 rows



In [7]:
# create dayseries_to_date map

from datetime import timedelta  
from datetime import date

march_31_2013 = date(2013, 3, 31)

dayseries_to_date = pd.DataFrame({})
# for each user .. detect displacement segments
for daySeries in range(1, 1462):

    dateForDaySeries = int((march_31_2013 + timedelta(days = daySeries)).strftime("%Y%m%d"))
    dayseries_to_date = dayseries_to_date.append({'DaySeries': daySeries, 'Date': dateForDaySeries}, ignore_index = True)

dayseries_to_date.Date = dayseries_to_date.Date.astype(int)
dayseries_to_date.DaySeries = dayseries_to_date.DaySeries.astype(int)

dayseries_to_date_sparkDF = spark.createDataFrame(dayseries_to_date)
dayseries_to_date_sparkDF.show()


+--------+---------+
|    Date|DaySeries|
+--------+---------+
|20130401|        1|
|20130402|        2|
|20130403|        3|
|20130404|        4|
|20130405|        5|
|20130406|        6|
|20130407|        7|
|20130408|        8|
|20130409|        9|
|20130410|       10|
|20130411|       11|
|20130412|       12|
|20130413|       13|
|20130414|       14|
|20130415|       15|
|20130416|       16|
|20130417|       17|
|20130418|       18|
|20130419|       19|
|20130420|       20|
+--------+---------+
only showing top 20 rows



In [8]:
import pyspark.sql.functions as F

daily_modal_locations_formatted = daily_modal_locations.select([F.col("userId").alias("user_id"),
                                                                F.col("tower_group_id"),
                                                                F.col("district_id"),
                                                                F.col("DaySeries")])

daily_modal_locations_formatted = daily_modal_locations_formatted.join(dayseries_to_date_sparkDF, 
                                                                       daily_modal_locations_formatted.DaySeries == dayseries_to_date_sparkDF.DaySeries, 
                                                                       how = 'left') \
                                                                 .select(["user_id", "date", "tower_group_id", "district_id"])

daily_modal_locations_formatted.show()


+----------------+--------+--------------+-----------+
|         user_id|    date|tower_group_id|district_id|
+----------------+--------+--------------+-----------+
|0YrK2DB30543lAmW|20140718|          1331|        821|
|0YrK2DB315R6lAmW|20140718|          1273|       3201|
|0YrK2DB33eD6lAmW|20140718|            81|       2001|
|0YrK2DB349J6lAmW|20140718|            95|       2001|
|0YrK2DB34x16lAmW|20140718|           794|        101|
|0YrK2DB35k1RlAmW|20140718|          1149|        308|
|0YrK2DB36M36lAmW|20140718|          1110|        308|
|0YrK2DB38km4lAmW|20140718|           571|       1615|
|0YrK2DB394nWlAmW|20140718|          1208|       1212|
|0YrK2DB3B5v4lAmW|20140718|           992|        301|
|0YrK2DB3BVa3lAmW|20140718|           887|        101|
|0YrK2DB3BWmrlAmW|20140718|          1168|        101|
|0YrK2DB3BrW4lAmW|20140718|           796|        107|
|0YrK2DB3G95RlAmW|20140718|          1114|        101|
|0YrK2DB3JMWwlAmW|20140718|           109|       2001|
|0YrK2DB3J

In [9]:
user_id_first_char_list = daily_modal_locations_formatted.withColumn("user_id_first_char", daily_modal_locations_formatted.user_id.substr(0, 1))
user_id_first_char_list = user_id_first_char_list.select("user_id_first_char").distinct().toPandas()
print(user_id_first_char_list.user_id_first_char.tolist())

[u'7', u'D', u'd', u'R', u'9', u'j', u'L', u'z', u'y', u'G', u'b', u'g', u'3', u'0', u'k', u'v', u'J', u'P', u'r', u'm', u'n', u'W', u'1', u'E', u'e', u'o']


In [46]:
daily_modal_districts_formatted = daily_modal_locations_formatted.select([F.col("user_id"),
                                                                          F.col("date"),
                                                                          F.col("district_id").alias("location")])

for first_char in user_id_first_char_list.user_id_first_char.tolist():
    print('Filtering on .. ' + first_char)
    
    subset_of_users = daily_modal_districts_formatted.filter(daily_modal_districts_formatted.user_id.substr(0, 1) == first_char)
    subset_of_users = subset_of_users.sort(["user_id", "date"])
    subset_of_users.write.option("header","true").csv('/data/tmp/smehra/aggregated_data/migration_detector_input_data/district_level/' + first_char)
    
    

Filtering on .. 7
Filtering on .. D
Filtering on .. d
Filtering on .. R
Filtering on .. 9
Filtering on .. j
Filtering on .. L
Filtering on .. z
Filtering on .. y
Filtering on .. G
Filtering on .. b
Filtering on .. g
Filtering on .. 3
Filtering on .. 0
Filtering on .. k
Filtering on .. v
Filtering on .. J
Filtering on .. P
Filtering on .. r
Filtering on .. m
Filtering on .. n
Filtering on .. W
Filtering on .. 1
Filtering on .. E
Filtering on .. e
Filtering on .. o


In [47]:
daily_modal_towers_formatted = daily_modal_locations_formatted.select([F.col("user_id"),
                                                                          F.col("date"),
                                                                          F.col("tower_group_id").alias("location")])

for first_char in user_id_first_char_list.user_id_first_char.tolist():
    print('Filtering on .. ' + first_char)
    
    subset_of_users = daily_modal_towers_formatted.filter(daily_modal_towers_formatted.user_id.substr(0, 1) == first_char)
    subset_of_users = subset_of_users.sort(["user_id", "date"])
    subset_of_users.write.option("header","true").csv('/data/tmp/smehra/aggregated_data/migration_detector_input_data/tower_level/' + first_char)
    
    

Filtering on .. 7
Filtering on .. D
Filtering on .. d
Filtering on .. R
Filtering on .. 9
Filtering on .. j
Filtering on .. L
Filtering on .. z
Filtering on .. y
Filtering on .. G
Filtering on .. b
Filtering on .. g
Filtering on .. 3
Filtering on .. 0
Filtering on .. k
Filtering on .. v
Filtering on .. J
Filtering on .. P
Filtering on .. r
Filtering on .. m
Filtering on .. n
Filtering on .. W
Filtering on .. 1
Filtering on .. E
Filtering on .. e
Filtering on .. o


# Step 2: Use Migration Detector to detect segments

In [5]:
def get_results_dataset_column_list():
    column_list = ['userId']
    march_31_2013 = date(2013, 3, 31)

    # for each user .. detect displacement segments
    for daySeries in range(1, 1462):

        dateForDaySeries = (march_31_2013 + timedelta(days = daySeries)).strftime("%Y%m%d")
        column_list.append(daySeries)

    return column_list

# we use daily modal locations as input data to migration_detector
# intermediate input dataset divides users into buckets based on first char of their user id
for first_char_of_user_id in ['0', '1', '3', '7', '9', 'b', 'd', 'D', 'e', 'E', 'g', 'G', 'j', 'J', 'k', 'L', 'm', 'n', 'o', 'P', 'r', 'R', 'v', 'W', 'y', 'z']:
    
    # intermediate data is available at district_level or tower_level
    level = 'district_level'

    logger.info('processing users with first character of userId: ' + str(first_char_of_user_id))
    input_path = '/data/tmp/smehra/aggregated_data/migration_detector_input_data/' + level + '/' + str(first_char_of_user_id)
    
    user_daily_modal_loc = gl.SFrame.read_csv(input_path, verbose = False)
    logger.info('shape of input set: ' + str(user_daily_modal_loc.shape))
    
    unique_users_list = user_daily_modal_loc['user_id'].unique()
    unique_users_count = str(len(unique_users_list))
    logger.info('total unique users in this set: ' + unique_users_count)

    logger.info('Migration Detector: creating trajectory record..')
    traj = md.createTrajRecordFromSFrame(user_daily_modal_loc)

    logger.info('Migration Detector: finding migrants..')
    migrants = traj.find_migrants(num_stayed_days_migrant=5, 
                                  num_days_missing_gap=2,
                                  small_seg_len=7, 
                                  seg_prop=0.5, 
                                  min_overlap_part_len=0,
                                  max_gap_home_des=1470)

    logger.info('Migration Detector: fetching segments..')
    segments = traj.get_segments_as_pandas_DF(which_step=3)

    logger.info('cleaning segments dataset..')
    segments['segment_start_date'] = pd.to_datetime(segments['segment_start_date'], format='%Y%m%d').dt.date
    segments['segment_end_date'] = pd.to_datetime(segments['segment_end_date'], format='%Y%m%d').dt.date

    # get start and end of segments in form of day series
    segments['start'] = (segments['segment_start_date'] - date(2013, 3, 31)).dt.days
    segments['end'] = (segments['segment_end_date'] - date(2013, 3, 31)).dt.days

    segments['segment_length'] = segments.segment_length.astype(int)

    logger.info('creating an empty dataframe to store results..')
    detected_locations = pd.DataFrame(columns = get_results_dataset_column_list())
    
    # ingest a list of unique users into results dataset
    detected_locations['userId'] = unique_users_list

    logger.info('processing segments..')
    start_time = int(round(time.time()))

    # iterate through each user
    for index_outer, user in detected_locations.iterrows():

        logger.info('processing user number: ' + str(index_outer))
        
        # iterate through each segment of user
        for index_inner, segment in segments[segments.user_id == user.userId].iterrows():
            
            # update results dataset with detected segment
            user[segment.start : (segment.end + 1)] = segment.location

    end_time = int(round(time.time()))
    logger.info('finished processing all users in ' + str((end_time - start_time)/60) + ' minutes')
    
    detected_locations.to_csv('/data/tmp/smehra/aggregated_data/migration_detector_output_data/' + level + '/' + str(first_char_of_user_id) + '.csv', index = False)
    logger.info('finished saving dataset for users with first character of userId: ' + str(first_char_of_user_id))


Start: Detecting migration
Done
Start: Detecting migration
Done
Start: Detecting migration
Done
Start: Detecting migration
Done
Start: Detecting migration
Done


# Step 3. Ingest output into hive

In [6]:
# write datafram to hive
hive.sql('DROP TABLE afghanistan.migration_based_user_districts')

DataFrame[]

In [7]:
hive.sql('use afghanistan')
hive.sql('show tables').head(10)

[Row(database=u'afghanistan', tableName=u'raw_data_phone_calls', isTemporary=False),
 Row(database=u'afghanistan', tableName=u'user_daily_modal_districts_wide', isTemporary=False),
 Row(database=u'afghanistan', tableName=u'user_daily_modal_locations_long', isTemporary=False),
 Row(database=u'afghanistan', tableName=u'user_daily_modal_towers_wide', isTemporary=False),
 Row(database=u'afghanistan', tableName=u'user_daily_unique_districts_long', isTemporary=False),
 Row(database=u'afghanistan', tableName=u'user_daily_unique_districts_wide', isTemporary=False),
 Row(database=u'afghanistan', tableName=u'user_daily_unique_towers_long', isTemporary=False),
 Row(database=u'afghanistan', tableName=u'user_daily_unique_towers_wide', isTemporary=False)]

In [None]:
migration_based_user_districts = spark.read.csv('/data/tmp/smehra/aggregated_data/migration_detector_output_data/district_level/*.csv',header = True, inferSchema=True)
migration_based_user_districts.show(5)


In [9]:
# write datafram to hive
migration_based_user_districts.write.saveAsTable('afghanistan.migration_based_user_districts')

In [None]:
# Read from Hive
user_segments = hive.sql('SELECT * FROM migration_based_user_districts')
user_segments.show()

In [11]:
user_segments.count()

10477778

In [7]:
spark.stop()