In [None]:
import os
import random
from datetime import datetime
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType, TimestampType, IntegerType
import seaborn as sns
import d5

In [1]:
pwd

'/Users/dmatekenya/Google-Drive/gigs/aims-dakar-2019/src/day3'

In [None]:
def add_weekdays(row, day_dict=None):
    
    return day_dict[row['wkday']]
    

In [None]:
def preprocess_cdrs_using_spark(file_or_folder=None, number_of_users_to_sample=None,
                                output_csv=None, date_format='%Y%m%d%H%M%S', 
                                debug_mode=True, loc_file=None, save_to_csv=False ):
    """
    In this function, we perfom some basic preprocessing such as below:
    1. rename columns
    2. change some data types
    3. Add location details
    Eventually, we will sample the data to use for our analysis
    :param data_folder:
    :param output_csv_for_sample_users:
    :return:
    """

    # create SparkSession object
    spark = SparkSession.builder.master("local[8]").appName("data_processor").getOrCreate()

    # read data with spark
    df = spark.read.csv(path=file_or_folder, header=True)
    
    # repartition to speed up 
    df = df.repartition(10)
    
    # if just testing/debugging, pick only a small dataset
    if debug_mode:
        dfs = df.sample(fraction = 0.001)
        df = dfs
    

    # rename columns to remove space and replace with underscore
    df2 = (df.withColumnRenamed("cdr datetime", "cdrDatetime")
             .withColumnRenamed("calling phonenumber2", "phoneNumber")
             .withColumnRenamed("last calling cellid", "cellId")
             .withColumnRenamed("call duration", "cellDuration"))

    # drop cdr type column
    df3 = df2.drop('cdr type')

    # Use Spark UDF to add date and datetime
    add_datetime = udf(lambda x: datetime.strptime(x, date_format), TimestampType())
    add_date = udf(lambda x: datetime.strptime(x, date_format), DateType())

    # create timestamp
    df4 = df3.withColumn('datetime', add_datetime(col('cdrDatetime')))
    df5 = df4.withColumn('date', add_date(col('cdrDatetime')))

    # lets make sure we dont have any null phoneNumbers
    df6 = df5.filter(df5['phoneNumber'].isNotNull())
    
    # Lets merge with location details using cellId from CDRs and also
    # cellID on the other 
    dfLoc = pd.read_csv(loc_file)
    dfLoc.rename(columns = {'cell_id':'cellId'}, inplace=True)
    sdfLoc = spark.createDataFrame(dfLoc)
    df7 = df6.join(sdfLoc, on='cellId', how='inner')
    
    
    # select nsample users to work with
    all_users = df7.select('phoneNumber').distinct().collect()
    
    # randomly select users using filter statement
    random_user_numbers = [i['phoneNumber'] for i in random.choices(all_users, k=100)]
    
    # select only our random user data
    dfu = df7.filter(df7['phoneNumber'].isin(random_user_numbers))
    
    # save to CSV if necessary
    if save_to_csv:
        dfu.coalesce(1).write.csv(path=output_csv, header=True)
    else:
        return dfu

In [None]:
def explore_data(df=None, output_plot_file=None, output_heatmap=None):
    """
    For quick examination of user activity, lets generate
    user call count and do a simple plot.
    """
    # Number of days in data
    dates_rows = dfs.select('date').distinct().collect()
    sorted_dates = sorted([i['date'] for i in dates_rows])
    diff = sorted_dates[-1] - sorted_dates[0]
    num_days = diff.days
    
    # call count by hour
    add_hr = udf(lambda x: x.hour, IntegerType())
    add_wkday = udf(lambda x: x.weekday(), IntegerType())
    day_dict = {0:'Mon', '1':'Tue', '2':'Wed', 3: 'Thurs', 4:'Frid', 5:'Sat', 6:'Sun'}

    dfHr = dfs.withColumn('hr', add_hr(col('datetime')))
    dfHr2 = dfHr.withColumn('wkday', add_wkday(col('datetime')))
    dfWkDay = dfHr2.groupBy('wkday', 'hr').count().toPandas()
    dfWkDay['weekDay'] = dfWkDay.apply(add_weekdays, args=(day_dict,), axis=1)
    dfWkDay.drop(labels=['wkday'], axis=1, inplace=True)
    dfWkDayPivot = dfWkDay.pivot(index='weekDay', columns='hr', values='count')
    d = dfWkDayPivot.reset_index()
    ax = sns.heatmap(d)
    ax.get_figure().savefig(output_heatmap)
   
    # group user and count number of events
    # convert resulting spark dataframe to pandas
    dfGroup = df.groupBy('phoneNumber').count().toPandas()
    
    # create a distribution plot of user call count using
    # seaborn
    ax = sns.distplot(dfGroup['count'])

    # save plot as png file
    ax.get_figure().savefig(output_plot_file)
    
    # report average number calls per day for each user
    dfGroupDay = df.groupBy('phoneNumber', 'date').count().toPandas()
    
    # get mean and median
    mean = dfGroupDay['count'].mean()
    median = dfGroupDay['count'].median()
    
    # data duration
    return mean, median                   

In [None]:
dataFolder = '../../day5-case-studies/cdrs-test/'

In [None]:
dfs = preprocess_cdrs_using_spark(file_or_folder=dataFolder, number_of_users_to_sample=1000, 
                                  date_format='%Y%m%d%H%M%S', 
                                  loc_file='../../day5-case-studies/cellTowers/staggered-cell-locs.csv',
                                 save_to_csv=False, debug_mode=False)

In [None]:
dfGroup = dfs.groupBy('phoneNumber').count().toPandas()

In [None]:
mean, med, wk = explore_user_call_count(df=dfs, 
                                    output_plot_file='../../day5-case-studies/test.png')

In [None]:
dates_rows = dfs.select('date').distinct().collect()
sorted_dates = sorted([i['date'] for i in dates_rows])
diff = sorted_dates[-1] - sorted_dates[0]
diff.days

In [None]:
sorted_dates

In [None]:
add_hr = udf(lambda x: x.hour, IntegerType())
add_wkday = udf(lambda x: x.weekday(), IntegerType())
day_dict = {0:'Mon', '1':'Tue', '2':'Wed', 3: 'Thurs', 4:'Frid', 5:'Sat', 6:'Sun'}
    
dfHr = dfs.withColumn('hr', add_hr(col('datetime')))
dfHr2 = dfHr.withColumn('wkday', add_wkday(col('datetime')))
dfWkDay = dfHr2.groupBy('wkday', 'hr').count().toPandas()
dfWkDay['weekDay'] = dfWkDay.apply(add_weekdays, args=(day_dict,), axis=1)
dfWkDay.drop(labels=['wkday'], axis=1, inplace=True)
dfWkDayPivot = dfWkDay.pivot(index='weekDay', columns='hr', values='count')
d = dfWkDayPivot.reset_index()
ax = sns.heatmap(d)

In [None]:
d2.columns

In [None]:
dfWkDay.drop(labels=['wkday'], axis=1, inplace=True)

In [None]:
d = dfWkDay.pivot(index='weekDay', columns='hr', values='count')
d.columns

In [None]:
d.reset_index()

In [None]:
dfWkDay[dfWkDay['weekDay']== 'Sun']