# User Attributes
In this notebook, we compute the following indicators:
- **Trips**: the total number of trips between two regions.
- **outflows**: outgoing trips from a region.
- **inflows**: Number of trips into a region.

The regions for the ODs is set to *administrative level 4* while the temporal unit is *day*. 

## Methodology Description
The basis of the OD creation methodology is from this paper:

`
Calabrese, Francesco, et al. "Estimating Origin-Destination flows using opportunistically collected mobile phone location data from one million users in Boston Metropolitan Area." IEEE Pervasive Computing 10.4 (2011): 36-44.
`

And its explained below:

1. Instead of using absolute locations, I cluster to a 1km radius. The rationale for this is to counter the location errors which comes with the localisation. The 1 Km here is a parameter which can be changed. 
2. Assign stay time to each cluster so that we can classify the clusters as either *stops* or *OD*. The threshold used for classifying as a stop is 30 minutes Thus, if a user stayed only 30 mniutes or less at a location, we consider it as a stop. Again, this is a parameter which can be changed.
3. Discard the stop points and keep only the stay clusters. 
4. Detect trips for each user
4. Aggregate trips with same OD for all users

In [2]:
import os
from functools import wraps
from datetime import datetime
import time
import pandas as pd
import numpy as np
from pyspark.sql.types import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import collect_set, from_unixtime, unix_timestamp, col, udf, datediff, pandas_udf, PandasUDFType,lit
from cdr_data_utils import data_utils as ut
from cdr_data_utils import data_processor as dp

# Mount Folders

In [5]:
# mount folder for hashed data
mount_folder_from_azure_blob(storage_acc_name=STORAGE_ACCOUNT_NAME, container_name="africell-four-months-subset", 
                             dirname="",mnt_name="afr_sub", secret_key=SECRET_KEY, access_key=ACCESS_KEY)

# Mount folder for sample-data just for loc file in there
mount_folder_from_azure_blob(storage_acc_name=STORAGE_ACCOUNT_NAME, container_name="sample-data", 
                             dirname="",mnt_name="sample", secret_key=SECRET_KEY, access_key=ACCESS_KEY)

# just check the mounted folders
display(dbutils.fs.ls("/mnt/afr_sub/"))
display(dbutils.fs.ls("/mnt/sample/"))

In [6]:
def mount_folder_from_azure_blob(storage_acc_name=None, container_name=None, 
                                 dirname=None, mnt_name=None, access_key=None, secret_key=None):
  
  """
  Utility function to mount a folder from Azure Blob storage if its not mounted
  """
  # Check if folder is arleady mounted
  mounted_folders = dbutils.fs.mounts()
  for m in mounted_folders:
    s = m.source
    if container_name in s:
      print("Arleady Mounted")
      return
  
  # Mount folder if not mounted
  configs = {access_key: secret_key}
  result = dbutils.fs.mount(
              source = "wasbs://{}@{}.blob.core.windows.net/{}".format(container_name, storage_acc_name, dirname),
              mount_point = "/mnt/{}".format(mnt_name),
              extra_configs = configs)
  
  if result:
    print("Successfully mounted folder")

In [7]:
# Parameters for computing user attributes
home_hrs = [20, 21, 22, 23] + [i for i in range(0, 7)]
transit_hrs = [7, 8, 19]
work_hrs = [i for i in range(9, 19)]
check_hrs = sorted(home_hrs + transit_hrs + work_hrs)
assert check_hrs == [i for i in range(0, 24)]

USER_ID = 'phonenumber'
misc_processing_params = {'distance_threshold': 2,
                              'datetime_col': 'datetime2',
                              'home_hrs': home_hrs, 'work_hrs': work_hrs,
                              'transit_hrs': transit_hrs,
                              'excluded_days_for_home_work': ['Sunday', 'Saturday'],
                              'y': 'lat', 'x': 'lon'}                         

# Define schema for output DF-this has to be hardcoded and the order should be same as order in DF
user_att_schema = StructType([StructField('avg_locs_day', DoubleType()), StructField('avg_num_trps',DoubleType()),
            StructField('home_x',  DoubleType()), StructField('home_y',  DoubleType()),StructField('avg_trp_dist',  DoubleType()),
            StructField('work_x',  DoubleType()), StructField('work_y',  DoubleType()),
            StructField('avg_gyration', DoubleType()), StructField('median_gyration', DoubleType()),
            StructField('total_locs', IntegerType()), StructField('trip_days',  IntegerType()), 
            StructField('usage_days',  IntegerType()),
            StructField('userid', StringType()), StructField('operator', IntegerType())])

In [8]:
@pandas_udf(user_att_schema, PandasUDFType.GROUPED_MAP)
def process_user_attributes(pdf):
    # create user
    print(pdf.dtypes)
    # dt = misc_processing_params["datetime_col"]
    # pdf["datetime3"] = pd.to_datetime(pdf[dt], format="%Y-%m-%d %H:%M:%S", errors='coerce')
    user = ut.User(userid=pdf[USER_ID].iloc[0], raw_user_data=pdf)
    # misc_processing_params["datetime_col"] = "datetime3"
    user.misc_params = misc_processing_params
    network_operator = 1

    # =============================
    # Generate user attributes
    # =============================
    user.generate_trips_by_day()
    user.compute_avg_trip_distance()
    user.compute_avg_trips_per_day()
    user.generate_home_and_work_clusters()
    user.set_home_work_from_clusters()

    # =============================
    # Get user attributes
    # =============================

    # home and work
    home_x, home_y, work_x, work_y = [float(99999) for _ in range(4)]
    if user.home:
        home_x = float(user.home[0])
        home_y = float(user.home[1])
    if user.work:
        work_x = float(user.work[0])
        work_y = float(user.work[1])

    # Trips attributes
    avg_trp_dist = user.avg_trp_dist
    avg_num_trps = user.avg_trps_day

    # Location
    avg_locs_day = user.average_unique_locs_perday
    total_locs = user.total_unique_locations

    # Distances
    radius_gyration_mean = user.average_farthest_distance
    radius_gyration_median = user.median_farthest_distance

    # Days with trips
    total_number_of_days = user.usage_days
    days_with_trips = user.num_of_days_with_trips

    # =============================
    # Create Dataframe
    # =============================
    data_pt = {'userid': str(user.userid), 'avg_trp_dist': avg_trp_dist, 'avg_num_trps': avg_num_trps,
               'avg_gyration': radius_gyration_mean, 'median_gyration': radius_gyration_median,
               'usage_days': total_number_of_days, 'trip_days': days_with_trips,
               'avg_locs_day': avg_locs_day, 'total_locs': total_locs, 'home_x': home_x, 'home_y': home_y,
               'work_x': work_x, 'work_y': work_y, 'operator': network_operator}
    return pd.DataFrame([data_pt])

In [9]:
def generate_user_attributes( sample = False, sample_size=10000,  user_count_file=None, by_month=False, month=None,
                             userid='phonenumber', raw_cdrs=None, min_events=None, max_events=None):
    """
    Generates trip count and trip attributes of users
    """
    # Get good users
    df = select_good_user_data(user_counts=user_count_file, raw_folder_data=raw_cdrs, min_num_events=min_events, max_num_events=max_events, 
                          sample_users=sample, num_users=sample_size)
    # Month
    if by_month:
      dfm = df.filter(df.month == month)
      df = dfm
    
    # Schema for output df
    gdf = df.groupBy(userid).apply(process_user_attributes)
    
    return gdf

In [10]:
def select_good_user_data(user_counts=None, raw_folder_data=None, min_num_events=150, max_num_events=3000, 
                          sample_users=False, num_users=1000000):
  # rdd raw users
  df = read_user_data_as_df(file_or_folder=hashed_data)
  df2 =  df.repartition(20)
 
  # user event counts
  pdf = pd.read_csv(user_counts)
  pdf.phonenumber = pdf.phonenumber.astype(str)
  pdf_good = pdf[(pdf["count"] >= min_num_events) & (pdf["count"] < max_num_events)]
  print("Number of all users: {:,}, but only {:,} good users".format(pdf.shape[0], pdf_good.shape[0]))
  
  if sample_users:
    pdf_good_sample = pdf_good.sample(n=num_users)
    pdf_good = pdf_good_sample
  
  df_good_users = spark.createDataFrame(pdf_good)
  df3 = df2.join(df_good_users, on="phonenumber", how="inner")
  
  return df3

In [11]:
def add_admin_boundaries_to_user_home_work(df=None, aggreg_col_id='admin4Pcod', aggreg_col_name='admin4Name',
                                           admin_shp_file=None):
    """

    :param df_work:
    :param df_home:
    :return:
    """

    df_home = ut.add_admin_attributes_to_random_latlon(admin_shp=admin_shp_file, df_with_latlon=df, lon='home_x',
                                                       lat='home_y')

    df_work = ut.add_admin_attributes_to_random_latlon(admin_shp=admin_shp_file, df_with_latlon=df, lon='work_x',
                                                       lat='work_y')
    suffix = ['_h', '_w']
    df = pd.merge(left=df_home,right=df_work, on='userid', suffixes=suffix, how='inner')

    admin_col_names = [aggreg_col_name + suffix[0], aggreg_col_name + suffix[1],
                       aggreg_col_id + suffix[0], aggreg_col_id + suffix[1]]
    cols_to_keep = ['userid', 'avg_locs_day_h', 'avg_num_trps_h', 'avg_trp_dist_h', 'days_with_trips_h',
     'radius_gyration_h', 'radius_gyration_median_h', 'total_locs_h', 'total_number_of_days_h'] + admin_col_names

    out_df = df[cols_to_keep]
    renamed_cols = {}
    for c in out_df.columns[1:]:
        if aggreg_col_name == c[:-2] or aggreg_col_id == c[:-2]:
            continue
        if c == 'userid':
            continue
        renamed_cols[c] = c[:-2]

    out_df.rename(columns=renamed_cols, inplace=True)

    return out_df

In [12]:
def read_user_data_as_df(file_or_folder=None, date_fmt="%Y-%m-%d %H:%M:%S"):
    """
    The min_num_users and max_num_users is based on summary stats and can be adjusted

    :param file:
    :param has_dates:
    :return:
    """
    # read all user data
    df = spark.read.csv(path=file_or_folder, header=True)

    # Spark UDF to add date and datetime
    add_datetime = udf(lambda x: datetime.strptime(x[:-4], date_fmt), TimestampType())
    add_date = udf(lambda x: datetime.strptime(x[:-4], date_fmt), DateType())
    add_month = udf(lambda x: x.month, IntegerType())

    # create timestamp
    df2 = df.withColumn('datetime2', add_datetime(col('datetime')))
    df3 = df2.withColumn('date', add_date(col('datetime')))
    df4 = df3.withColumn('month', add_month(col('datetime2')))
    
     # change lon and lat columns to numeric
    convert_to_numeric = udf(lambda x: float(x), DoubleType())
    df5 = df4.withColumn('lon2', convert_to_numeric(col('lon')))
    df6 = df5.withColumn('lat2', convert_to_numeric(col('lat')))
    df7 = df6.drop(*['lon','lat'])
    df8 = (df7
             .withColumnRenamed('lon2', 'lon')
             .withColumnRenamed('lat2', 'lat'))
    
    return df8

In [13]:
def save_user_event_counts(raw_data_folder=None, output=None):
  """
  Generates number of events for each user and saves as CSV
  """
  df = read_user_data_as_df(file_or_folder=raw_data_folder)
  df2 = df.repartition(20)
  df_users = df2.groupBy("phonenumber").count().toPandas()
  
  
  df_users.to_csv(output, index=False)

In [14]:
def save_good_user_data(user_counts=None, raw_folder_data=None, out_good_user_data=None, 
                        min_num_events=150, max_num_events=3000):
  # rdd raw users
  df = read_user_data_as_df(file_or_folder=hashed_data)
  df2 =  df.repartition(20)
 
  # user event counts
  pdf = pd.read_csv(user_counts)
  pdf.phonenumber = pdf.phonenumber.astype(str)
  pdf_good = pdf[(pdf["count"] >= min_num_events) & (pdf["count"] < max_num_events)]
  print("Number of all users: {:,}, but only {:,} good users".format(pdf.shape[0], pdf_good.shape[0]))
  df_good_users = spark.createDataFrame(pdf_good)
  
  # Now join
  df3 = df2.join(df_good_users, on="phonenumber", how="inner")
  df3.write.csv(out_good_user_data, header=True)

In [15]:
# good_users = "/mnt/afr_sub/consistent-users"
# save_good_user_data(user_counts=user_counts, raw_folder_data=hashed_data, out_good_user_data=good_users, 
#                         min_num_events=150, max_num_events=2400)

## Generate User Attributes
In this case for the whole dataset

In [17]:
hashed_data =  "/mnt/afr_sub/hashed_dataset/"
afr_loc = "/mnt/sample/africell-loc-with-attributes.csv"
user_counts = "/dbfs/mnt/afr_sub/user_event_count.csv"
out_att_file = "/dbfs/mnt/sample/user_attributes_africell_orange.csv"

In [18]:
dfu = generate_user_attributes(sample=False,  user_count_file=user_counts, userid='phonenumber', raw_cdrs=hashed_data,by_month=False, min_events=6, max_events=5000)

In [19]:
pdfu = dfu.repartition(20).toPandas()

In [20]:
# Save to file
user_att_afr = "/dbfs/mnt/sample/user-attributes-afr.csv"
pdfu.to_csv(user_att_afr, index=False)