In [None]:
from dask.distributed import Client, LocalCluster

dask_n_workers = 32
dask_worker_memory_limit = 16
dask_threads_per_worker = 16
cluster = LocalCluster(dashboard_address = 'localhost:7920', 
                       n_workers = dask_n_workers, 
                       processes = True, 
                       threads_per_worker = dask_threads_per_worker,
                       memory_limit = str(dask_worker_memory_limit) + 'GB', 
                       local_directory = "/path/to/dask-worker-space")
client = Client(cluster)

import dask.dataframe as ddf
import numpy as np
import pandas as pd

import shutil
import gc
gc.enable()

import multiprocessing as mp

import time
from datetime import timedelta  
from datetime import date
from datetime import datetime
from collections import Counter

import os

import matplotlib.pyplot as plt
%matplotlib inline


## Compute Home Segments

In [None]:
# Convert daily modal location data to row major format
# Input schema: 
# phoneHash, day, daily_modal_location
# Output schema (if we have 10 days of data):
# phoneHash, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
def convertToRowMajorFormat(data, first_day, last_day):

    data['day_series'] = (data.day - first_day).dt.days

    # make dataframe wide form - one row per user
    data = data.pivot_table(index='phoneHash1', columns='day_series', values='daily_modal_location')

    # uncategorize column names
    existing_days = data.columns.tolist()
    existing_days = list(map(str, existing_days))
    data.columns = existing_days

    # add missing columns
    total_days = (last_day - first_day).days + 1
    day_series_range = np.arange(0, total_days, 1)
    day_series_range = list(map(str, day_series_range))

    missing_days = set(day_series_range).difference(set(existing_days))
    for day in missing_days:
        data[day] = np.NaN

    # order columns
    data = data[day_series_range]

    # convert column names to int
    data.columns = np.arange(0, total_days, 1)

    return data

# Convert location data to column major format
# Input schema (if we have 10 days of data):
# phoneHash, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
# Output schema: 
# phoneHash, day, daily_modal_location
def convertToColumnMajorFormat(data, first_day, last_day):
    
    data = data.reset_index()
    
    # get column names based on first_day and last_day of daily modal locations
    total_days = (last_day - first_day).days + 1
    day_series_range = np.arange(0, total_days, 1)
    
    # unpivot table
    data = ddf.reshape.melt(data, id_vars=['phoneHash1'], value_vars=day_series_range, var_name='day_series', value_name='home_location')

    # convert day_series to day
    data['day'] = first_day + ddf.to_timedelta(data.day_series, unit="d")

    # drop NaN location values
    data = data.dropna()[['phoneHash1', 'day', 'home_location']]
    
    return data

# drop location segments which have low proportion of actual raw locations
# input: 
# raw_locations: 2, np.NaN, np.NaN, 3, 3, np.NaN, np.NaN
# inferred_locations: 2, 2, np.NaN, 3, 3, 3, np.NaN
# segment_proportion_days_threshold: 0.6
#
# output: np.NaN, np.NaN, np.NaN, 3, 3, 3, np.NaN
def dropLowPropSegments(raw_locations, inferred_locations, segment_proportion_days_threshold):
    
    # check if segment satisfies segment_proportion_days_threshold
    def removeSegmentIfInvalid(raw_locations, inferred_locations, segment_start_day, segment_end_day):
        
        #print("validateSegment: \n", inferred_locations.tolist())
        #print("segment_start_day: ", segment_start_day)
        #print("segment_end_day: ", segment_end_day)
        
        segment = raw_locations[segment_start_day:segment_end_day]

        segment_length = len(segment)
        days_with_cdr_activity = len(segment[segment.notna()])
                
        prop_days_with_cdr_activity = days_with_cdr_activity/segment_length
        
        if(prop_days_with_cdr_activity < segment_proportion_days_threshold):

            # remove segment if it does not satisfy segment proportion day criteria
            inferred_locations[segment_start_day:segment_end_day] = np.NaN

    
    # get non-null location series
    locations = inferred_locations.dropna()
    location_indices = locations.index.tolist()

    # return if all locations are null
    if(locations.empty):
        return inferred_locations
    
    # get index of start of (first) segment
    segment_start_day = location_indices[0]
    # runner to track previous non-null location day
    previous_active_day = segment_start_day

    # iterate through non-null days and remove invalid segments
    for iterator in range(0, len(location_indices)):
        
        current_day = location_indices[iterator]
                
        if((current_day - previous_active_day) > 1):
                
                segment_end_day = location_indices[iterator - 1] + 1

                removeSegmentIfInvalid(raw_locations, inferred_locations, segment_start_day, segment_end_day)

                segment_start_day = current_day
                
        previous_active_day = current_day

    segment_end_day = location_indices[iterator] + 1

    removeSegmentIfInvalid(raw_locations, inferred_locations, segment_start_day, segment_end_day)
        
    return inferred_locations

# drop location segments which are smaller than min_segment_length
# input: 
# location_series: 2, 2, np.NaN, 3, 3, 3, np.NaN
# min_segment_length: 3
#
# output: np.NaN, np.NaN, np.NaN, 3, 3, 3, np.NaN
def dropSmallSegments(location_series, min_segment_length):
    
    # check if segment length is smaller than minimum segment length permitted
    def removeSegmentIfInvalid(location_series, segment_start_day, segment_end_day):
        
        #print("validateSegment: \n", location_series.tolist())
        #print("segment_start_day: ", segment_start_day)
        #print("segment_end_day: ", segment_end_day)

        if(segment_end_day - segment_start_day < min_segment_length):
            
            # remove segment if its smaller than min_segment_length
            location_series[segment_start_day:segment_end_day] = np.NaN


    # get non-null location series            
    locations = location_series.dropna()
    location_indices = locations.index.tolist()

    # return if all locations are null    
    if(locations.empty):
        return location_series
    
    # get index of start of (first) segment
    segment_start_day = location_indices[0]
    # runner to track previous non-null location day
    previous_active_day = segment_start_day
    
    # iterate through non-null days and remove invalid segments
    for iterator in range(0, len(location_indices)):
        
        current_day = location_indices[iterator]
                
        if((current_day - previous_active_day) > 1):
                
            segment_end_day = location_indices[iterator - 1] + 1

            removeSegmentIfInvalid(location_series, segment_start_day, segment_end_day)

            segment_start_day = current_day
                
        previous_active_day = current_day

    segment_end_day = location_indices[iterator] + 1

    removeSegmentIfInvalid(location_series, segment_start_day, segment_end_day)
        
    return location_series    

# drop segment / stays that are smaller than threshold
# min_segment_length: 3
# input df: 
# 3, 3, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN
# np.NaN, np.NaN, 2, 2, 2, np.NaN, np.NaN, np.NaN
#
# output df: 
# np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN
# np.NaN, np.NaN, 2, 2, 2, np.NaN, np.NaN, np.NaN
def dropSmallStays(segments_df, min_segment_length):
    
    segments_df = segments_df.apply(lambda location_series: dropSmallSegments(location_series, min_segment_length), axis = 1)
    
    return segments_df
        
# remove overlaps b/w concurrent segments
# input df: 
# 3, 3, 3, np.NaN, 3, 3, 3, np.NaN
# np.NaN, 2, 2, 2, np.NaN, np.NaN, np.NaN, np.NaN
#
# output df: 
# 3, np.NaN, np.NaN, np.NaN, 3, 3, 3, np.NaN
# np.NaN, np.NaN, np.NaN, 2, np.NaN, np.NaN, np.NaN, np.NaN

def removeOverlaps(segments_df):
        
    for day in segments_df.columns:

        number_of_active_segments_on_the_day = len(segments_df[day].dropna())

        if(number_of_active_segments_on_the_day > 1):
                segments_df[day] = np.NaN

    return segments_df        

# collapse dataframe into single series
# input df: 
# 3, np.NaN, np.NaN, np.NaN, 3, 3, 3, np.NaN
# np.NaN, np.NaN, np.NaN, 2, np.NaN, np.NaN, np.NaN, np.NaN
#
# output: 3, np.NaN, np.NaN, 2, 3, 3, 3, np.NaN
def collapseAllSegmentsIntoSingleSeries(segments_df):
    
    # get first non null location from the series
    # if none found, return NA
    def getLocationOrNA(location_series):

        def notNaN(l):
            return (not np.isnan(l))

        for l in location_series.tolist():

            if(notNaN(l)):
                return l

        return np.NaN

    
    # create an empty location series
    number_of_days = len(segments_df.columns)
    location_series = pd.Series([np.NaN]*number_of_days)
    
    # iterate over each day and update active location in a single series
    for day in segments_df.columns:

        location = getLocationOrNA(segments_df[day])
        
        location_series[day] = location 
        
    return location_series


# Join any two consecutive segments if there no segments in any other location inbetween 
# input df: 
# 3, 3, 3, np.NaN, np.NaN, 3, 3, 3, 3, 3, 3, 3, 3
# np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, 2, 2, 2, np.NaN, np.NaN, 2, 2, 2
#
# output df: 
# 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3
# np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, 2, 2, 2, np.NaN, np.NaN, 2, 2, 2

def joinConsecutiveSegments(segments_df):

    # Mark days which co-incide with a segment for another location
    # input df: 
    # 3, 3, 3, np.NaN, np.NaN, 3, 3, 3, 3, np.NaN, 3, 3, 3
    # np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, 2, 2, 2, np.NaN, np.NaN, 2, 2, 2
    #
    # output df: 
    # 3, 3, 3, np.NaN, np.NaN, 3, 3, 3, 3, 3, 3, 3, 3
    # np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, 2, 2, 2, np.NaN, -99, 2, 2, 2
    def highlightIneligibleLocationDays(segments_df):

        for day in segments_df.columns:

            number_of_active_segments_on_the_day = len(segments_df[day].dropna())

            if(number_of_active_segments_on_the_day > 0):

                segments_df[day] = segments_df[day].apply(lambda d: -99 if d != d else d)
                
    
    # Un-mark days which co-incide with a segment for another location            
    # input df: 
    # 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3
    # np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, 2, 2, 2, np.NaN, -99, 2, 2, 2
    #
    # output df: 
    # 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3
    # np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, 2, 2, 2, np.NaN, np.NaN, 2, 2, 2
    def unhighlightLocationDays(segments_df):
        
        for column in segments_df:

            segments_df[column] = segments_df[column].apply(lambda d: np.NaN if d == -99 else d)                
            
    # Join segments which do not have any ineligible days in between them  
    # input df: 
    # 3, 3, 3, np.NaN, np.NaN, 3, 3, 3, 3, 3, 3, 3, 3
    # np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, 2, 2, 2, np.NaN, -99, 2, 2, 2
    #
    # output df: 
    # 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3
    # np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, 2, 2, 2, np.NaN, -99, 2, 2, 2
    def joinSegmentsWithoutIneligibleDaysInBetween(location_series):

        # get non-null location series            
        locations = location_series.dropna()

        # return if all locations are null    
        if(locations.empty):
            return location_series

        # runner to track previous non-null location day
        previous_active_day = locations.index[0]
        # runner to track previous non-null location
        previous_active_location = locations[previous_active_day]

        # iterate through non-null days and join uninterrupted segments
        for current_day in locations.index.tolist():

            current_location = locations[current_day]

            if((current_day - previous_active_day) > 1):

                if(current_location == previous_active_location):

                    location_series[previous_active_day:current_day] = current_location

            previous_active_day = current_day
            previous_active_location = current_location

        return location_series
    
    # Mark days which co-incide with a segment for another location
    highlightIneligibleLocationDays(segments_df)
    
    # Join segments which do not have any ineligible days in between them
    segments_df = segments_df.apply(lambda location_series: joinSegmentsWithoutIneligibleDaysInBetween(location_series), axis = 1)
    
    # Un-mark days which co-incide with a segment for another location
    unhighlightLocationDays(segments_df)
    
    return segments_df


# create segments by carry forwarding location values
# input: 
# location_series: 2, np.NaN, np.NaN, np.NaN, 3, np.NaN, np.NaN, 4, np.NaN
# location_carry_forward_limit: 2
#
# output: 2, np.NaN, np.NaN, np.NaN, 3, 3, 3, 4, np.NaN
def createSegments(location_series, location_carry_forward_limit):

    # get non-null location series    
    locations = location_series.dropna()

    # return if all locations are null    
    if(locations.empty):
        return location_series
    
    # runner to track previous non-null location day
    previous_active_day = locations.index[0]
    # runner to track previous non-null location
    previous_active_location = locations[previous_active_day]
    
    # iterate through non-null days and forward locations when needed
    for current_day in locations.index.tolist():
        
        current_location = locations[current_day]
        
        missing_gap_length = current_day - previous_active_day - 1

        if((missing_gap_length >= 1) and (missing_gap_length <= location_carry_forward_limit)):
                
            location_series[previous_active_day:current_day] = current_location                
        
        previous_active_day = current_day
        previous_active_location = current_location
        
    return location_series
    
    
# use daily modal locations to infer home location segments
# input:
# raw_location_series: pandas series representing daily locations of a user
# location_carry_forward_limit: number of days to carry forward location
# segment_proportion_days_threshold: proportion of days in a segment with modal location
# minimum_segment_length: minimum segment length (in days)
# minimum_stay_length: minimum number of days stayed at a location (in days)
def computeHomeSegments(raw_location_series, location_carry_forward_limit, segment_proportion_days_threshold, minimum_segment_length, minimum_stay_length, drop_single_location_users):
    
    # dataframe to hold one series per unique location
    all_segments_df = pd.DataFrame()
    
    # get all unique locations visited
    unique_locations = raw_location_series.dropna().unique()
    
    if(len(unique_locations) == 0):
        return raw_location_series
    
    # iterate over all unique locations
    for current_loc in unique_locations:
        
        current_loc_raw = raw_location_series.copy()
        
        # get a data series containing only current location
        current_loc_raw[current_loc_raw != current_loc] = np.NaN
        
        current_loc_segments = current_loc_raw.copy()
    
        # forward location to empty days upto carry_forward_limit
        current_loc_segments = createSegments(current_loc_segments, location_carry_forward_limit)

        # drop small segments
        current_loc_segments = dropSmallSegments(current_loc_segments, minimum_segment_length)

        # drop segments where proportion of raw_location/(raw_locations + forward_locations) is lower than threshold
        current_loc_segments = dropLowPropSegments(current_loc_raw, current_loc_segments, segment_proportion_days_threshold)        
        
        # append series to dataframe if it contains any valid segments
        if(current_loc_segments.notnull().any()):
            all_segments_df = all_segments_df.append(current_loc_segments, ignore_index = True)
        
    # return location series with no segments
    if(len(all_segments_df) == 0):
        return pd.Series([np.NaN]*len(raw_location_series))
    
    # drop single location users
    if((drop_single_location_users == True) and len(all_segments_df) == 1):
        return pd.Series([np.NaN]*all_segments_df.shape[1])
       
    # join same location segments without any other location segments in between
    all_segments_df = joinConsecutiveSegments(all_segments_df)
    
    # remove overlaps
    all_segments_df = removeOverlaps(all_segments_df)
    
    # drop small segments meant to represent "too-small" stays
    all_segments_df = dropSmallStays(all_segments_df, minimum_stay_length)
    
    # merge all segments into one series
    all_segments = collapseAllSegmentsIntoSingleSeries(all_segments_df)
        
    return all_segments
    

def computeHomeLocationPerUser(daily_modal_locations, first_day, last_day, location_carry_forward_limit, segment_proportion_days_threshold, minimum_segment_length, minimum_stay_length, drop_single_location_users):

    print(str(datetime.now()) + " Reading raw dataset.")

    data = pd.read_csv(daily_modal_locations,
                        dtype = {'phoneHash1': str,
                                 'daily_modal_location': 'int16'},
                        parse_dates = ['day'],
                        date_parser = (lambda x: datetime.strptime(x, '%Y-%m-%d')))
    
    if(len(data) == 0):
        
        print(str(datetime.now()) + " No records - Returning empty dataset.")
        return pd.DataFrame(columns = ['phoneHash1', 'day', 'home_location'])
    
    print(str(datetime.now()) + " Converting dataset to row major format.")
    
    # convert data to row major format for more efficient computing
    data = convertToRowMajorFormat(data, first_day, last_day)
    
    print(str(datetime.now()) + " Computing home segments.")

    data = ddf.from_pandas(data, npartitions = dask_n_workers*8)

    # create meta for dask apply function
    total_days = (last_day - first_day).days + 1
    meta = {}
    for t in range(0, total_days, 1):
        meta[t] = 'int16'

    data = data.apply(lambda user_daily_location: computeHomeSegments(user_daily_location, location_carry_forward_limit, segment_proportion_days_threshold, minimum_segment_length, minimum_stay_length, drop_single_location_users), 
                      axis = 1, 
                      meta=meta)
    
    # convert data back to column major form
    data = convertToColumnMajorFormat(data, first_day, last_day)
    
    data = data.compute()
    
    return data


In [None]:
%%time

# first day of cdr activity
first_day = datetime(2013,4,1)

# last day of cdr activity
last_day = datetime(2020,10,31)

# number of days to carry forward location
location_carry_forward_limit = 2

# proportion of days in a segment with modal location
segment_proportion_days_threshold = 0.5

# minimum segment length (in days)
minimum_segment_length = 7

# minimum length of stay (in days)
minimum_stay_length = 5

# drop single location users
drop_single_location_users = True

input_data_dir = '/path/to/data/daily_modal_voice_version_bucketed/district_level/'
output_data_dir = '/path/to/data/daily_modal_voice_only_2013-2020_version/'

for file in os.listdir(input_data_dir):
    
    print(str(datetime.now()) + " Using dataset .. " + file)
    
    daily_modal_locations = input_data_dir + file

    home_locations = computeHomeLocationPerUser(daily_modal_locations, first_day, last_day, location_carry_forward_limit, segment_proportion_days_threshold, minimum_segment_length, minimum_stay_length, drop_single_location_users)
    
    print(str(datetime.now()) + " Saving result.")
    
    home_locations.to_csv(output_data_dir + file, index = False)
