In [None]:
import pyspark
from pyspark import SparkContext
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql import types as T

import json
import numpy as np

In [None]:
def rename_columns(df, list_of_tuples):
    for (old_col, new_col) in list_of_tuples:
        df = df.withColumnRenamed(old_col, new_col)
    return df

def read_file(filepath, sqlContext):
    data_frame = sqlContext.read.csv(filepath, header=False,
                                     inferSchema=True,nullValue="-")
    while len(data_frame.columns) < 16:
        col_name = "_c" + str(len(data_frame.columns))
        data_frame = data_frame.withColumn(col_name, F.lit(None))

    data_frame = rename_columns(
        data_frame,
        [
            ("_c0", "route"),
            ("_c1", "tripNum"),
            ("_c2", "shapeId"),
            ("_c3", "shapeSequence"),
            ("_c4", "shapeLat"),
            ("_c5", "shapeLon"),
            ("_c6", "distanceTraveledShape"),
            ("_c7", "busCode"),
            ("_c8", "gpsPointId"),
            ("_c9", "gpsLat"),
            ("_c10", "gpsLon"),
            ("_c11", "distanceToShapePoint"),
            ("_c12", "timestamp"),
            ("_c13", "busStopId"),
            ("_c14", "problem"),
            ("_c15", "numPassengers")
        ]
    )

    date = "-".join(filepath.split("/")[-2].split("_")[:3])

    data_frame = data_frame.withColumn("date", F.lit(date))

    return data_frame

def print_df(df,l=10):
    return df.limit(l).toPandas()

### Read GPS Data

In [None]:
sc = SparkContext.getOrCreate()
sqlContext = pyspark.SQLContext(sc)

In [None]:
exp_data_folder_path = '/local/tarciso/masters/experiments/preliminary-exp/preliminary-exp-sample-data/'
trips_data = read_file(exp_data_folder_path + '/bulma-output/2017_05_11_veiculos.csv/part-00000', sqlContext)

In [None]:
trips_data.head(3)

In [None]:
trips_data.printSchema()

### Read Bus Card Data

In [None]:
ticketing_data = sqlContext.read.json(exp_data_folder_path + '/ticketing-sample/doc1-2017051115.txt')

In [None]:
#Renaming columns to english
ticketing_data = ticketing_data.select(F.col("CODLINHA").alias("route"),
                                       F.col("CODVEICULO").alias("busCode"),
                                       F.col("DATANASCIMENTO").alias("userBirthdate"),
                                       F.col("DATAUTILIZACAO").alias("cardTimestamp"),
                                       F.col("NOMELINHA").alias("lineName"),
                                       F.col("NUMEROCARTAO").alias("cardNum"),
                                       F.col("SEXO").alias("gender"))

In [None]:
ticketing_data.printSchema()

In [None]:
ticketing_data.head(5)

### Pre-processing GPS data

In [None]:
#Warning: Both data sources dates refer to previous day, not to the day in the file name. Fixing this issue here to match bus card data.
trips_data = trips_data.withColumn("date", F.date_sub(F.col("date"), 1))
trips_data = trips_data.withColumn("gps_timestamp", F.concat(F.col("date"), F.lit(" "), F.col("timestamp")))
trips_data = trips_data.withColumn("gps_timestamp_in_secs", F.unix_timestamp(F.col("gps_timestamp"), "yyyy-MM-dd HH:mm:ss"))
trips_data = trips_data.withColumn("gps_date_in_secs", F.unix_timestamp(F.col("date"), "yyyy-MM-dd"))

In [None]:
#trips_data.select(["date","gps_date_in_secs","timestamp","gps_timestamp_in_secs"]).show()

In [None]:
def get_N_sec_group(timestamp_in_secs,date_in_secs,N):
    secs_since_midnight = timestamp_in_secs-date_in_secs
    return(F.floor(secs_since_midnight/N))

In [None]:
trips_data = trips_data.withColumn("sec_group",get_N_sec_group(F.col("gps_timestamp_in_secs"),F.col("gps_date_in_secs"),60))
#trips_data.select(["gps_timestamp","gps_timestamp_in_secs","sec_group"]).show()

## Pre-processing Bus Card data

In [None]:
ticketing_data = ticketing_data.withColumn("card_timestamp_in_secs", F.unix_timestamp(F.col("cardTimestamp"), "dd/MM/yy HH:mm:ss"))
ticketing_data = ticketing_data.withColumn("date",F.from_unixtime(F.col("card_timestamp_in_secs"), "yyyy-MM-dd"))
ticketing_data = ticketing_data.withColumn("card_date_in_secs",F.unix_timestamp(F.col("date"),"yyyy-MM-dd"))
#ticketing_data.select(["cardTimestamp","card_timestamp_in_secs","date","card_date_in_secs"]).show()

In [None]:
ticketing_data = ticketing_data.withColumn("sec_group",get_N_sec_group(F.col("card_timestamp_in_secs"),F.col("card_date_in_secs"),60))
#ticketing_data.select(["cardTimestamp","card_timestamp_in_secs","sec_group"]).orderBy("card_timestamp_in_secs").show()

In [None]:
trips_data.count()

In [None]:
ticketing_data.count()

### Removing duplicate GPS records (occurred in the same time period)

In [None]:
filtered_trips_data = trips_data.na.drop(subset=["route","busCode","busStopId","sec_group"]).dropDuplicates(["route","busCode", "tripNum", "date","sec_group"])
print filtered_trips_data.count()
#filtered_trips_data.select(["route","busCode", "tripNum","busStopId", "sec_group"]).orderBy("route","busCode","tripNum","busStopId").limit(20).toPandas()

### Removing Duplicate entries in ticketing data

In [None]:
ticketing_data = ticketing_data.dropDuplicates(["route","busCode","cardNum","date","sec_group"])

In [None]:
ticketing_data.count()

### Merging GPS and ticketing data 

In [None]:
#ticketing_data.select(['route','busCode','date','sec_group']).orderBy(['route','busCode','sec_group']).limit(20).toPandas()

In [None]:
#filtered_trips_data.select(['route','busCode','date','sec_group']).orderBy(['route','busCode','sec_group']).limit(20).toPandas()

In [None]:
user_boardings = ticketing_data.join(filtered_trips_data, ['route','busCode','date','sec_group'], 'inner')

In [None]:
user_boardings.printSchema()

In [None]:
#user_boardings.select(['route','busCode','sec_group','timestamp','cardTimestamp']).limit(20).toPandas()

In [None]:
user_boardings.count()

In [None]:
user_boardings.select('cardNum').distinct().count()

In [None]:
#user_boardings.select(['route','busCode','tripNum','busStopId','sec_group','cardNum','cardTimestamp','gps_timestamp']).orderBy('cardNum','cardTimestamp').limit(20).toPandas()

### Removing duplicated boarding data

In [None]:
filtered_boardings = user_boardings.dropDuplicates(['cardNum','date','sec_group'])

In [None]:
print filtered_boardings.count()
print_df(filtered_boardings)

### Removing single-trip users records

In [None]:
boarding_count = filtered_boardings.groupby('cardNum').count()

In [None]:
#Filtering only users with more than one ride per day
multiple_boardings = boarding_count.filter(F.col('count') > 1)

In [None]:
total_passengers = user_boardings.select('cardNum').distinct().count()
print "Total #Passengers:", total_passengers

In [None]:
passengers_mult_boardings = multiple_boardings.count()
prop_mult_boardings = 100*(passengers_mult_boardings)/total_passengers
print "Passengers with Multiple Boardings:", passengers_mult_boardings, "(", prop_mult_boardings, "%)" 
multiple_boardings.show()

In [None]:
multiple_boardings = multiple_boardings.select(F.col("cardNum").alias("cardNum1"),
                                                 F.col("count").alias("count1"))

In [None]:
clean_boardings = filtered_boardings.join(multiple_boardings, filtered_boardings.cardNum == multiple_boardings.cardNum1, 'leftsemi')

In [None]:
clean_boardings.count()

In [None]:
print_df(clean_boardings)

In [None]:
clean_boardings.write.csv(path=exp_data_folder_path+'/clean_boardings',header=True)

### Analyzing Boarding data

In [None]:
user_boarding_w = Window.partitionBy(clean_boardings.cardNum).orderBy(clean_boardings.card_timestamp_in_secs)

In [None]:
od_matrix = clean_boardings.withColumn('d_lat',F.lead(F.col('gpsLat')).over(user_boarding_w)). \
                            withColumn('d_lat',F.lead(F.col('gpsLat')).over(user_boarding_w)). \

In [None]:
print_df(od_matrix)

In [None]:
od_matrix = clean_boardings.withColumn('d_lat',F.when(clean_boardings.d_lat == None,
                                                      F.first(F.col('gpsLat').over(user_boarding_w))). \
                                                       otherwise(clean_boardings.d_lat))

In [None]:
#ticketing_data.filter(F.col('cardNum') == '0001080534').toPandas()

In [None]:
#clean_boardings.count()

In [None]:
#clean_boardings.select('cardNum').distinct().count()

In [None]:
#Taking a look at a sample:
#user_boardings.filter(F.col('cardNum') == '0002986469').limit(20).toPandas()

In [None]:
#Taking a look at a sample:
#user_boardings.filter(F.col('cardNum') == '0003372920').limit(20).toPandas()

In [None]:
#Taking a look at a sample:
#user_boardings.filter(F.col('cardNum') == '0002986469').limit(20).toPandas()

In [None]:
#Taking a look at a sample:
#user_boardings.filter(F.col('cardNum') == '0003372920').limit(20).toPandas()

In [None]:
#Checking if there are any duplicate boarding entries
#duplicate_board_entries = user_boardings.groupby(['cardNum','date','sec_group']).count().filter('count > 1')
#print duplicate_board_entries.count()
#print_df(duplicate_board_entries)

In [None]:
#Taking a look at a sample:
#print_df(user_boardings.filter(F.col('cardNum') == '0001884144'))

In [None]:
#print_df(user_boardings.filter(F.col('cardNum') == '0001884144').select(['route','busCode','tripNum','date','sec_group','gps_timestamp','problem','timestamp','cardTimestamp']))

In [None]:
#print_df(user_boardings.filter(F.col('cardNum') == '0003826824').select(['route','busCode','tripNum','date','sec_group','gps_timestamp','problem','timestamp','cardTimestamp']))

In [None]:
#print_df(user_boardings.filter(F.col('cardNum') == '0002195541').select(['route','busCode','tripNum','date','sec_group','gps_timestamp','problem','timestamp','cardTimestamp']))

As we can see above, the duplicated boarding records are due to a problem with the BULMA output, as GPS records which occurred in the same trip are being associated to different trips. We will exclude such entries from our analysis.