In [129]:
import os
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import json
import re
import logging
from pyspark.sql.window import Window
import operator

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [130]:
spark = SparkSession.builder.appName("Deloitte Challenge Test").config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0").getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [131]:
stations_data = "s3a://deloitte-ciprian-test/data/source/stations.txt"
trips_data = "s3a://deloitte-ciprian-test/data/source/trips.txt"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Read Data From Files

In [132]:
df_stations = spark.read.text(stations_data)
df_stations.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|               value|
+--------------------+
|                   {|
|"internal_bus_sta...|
|"public_bus_stati...|
|                   }|
+--------------------+

In [133]:
df_trips = spark.read.text(trips_data)
df_trips.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|               value|
+--------------------+
|                   {|
|"ORIGIN": ["B","B...|
|"DESTINATION": ["...|
|"INTERNAL_BUS_STA...|
|" TRIPTIMES": [[2...|
|                   }|
+--------------------+

### Because is not possible to parse the data in JSON Format I clean a bit the data before I start to parse it

In [134]:
df_stations = df_stations.filter(df_stations.value!='{')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [135]:
df_stations = df_stations.filter(df_stations.value!='}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [136]:
df_trips = df_trips.filter(df_trips.value!='{')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [137]:
df_trips = df_trips.filter(df_trips.value!='}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### I visualize the rows of data I'm going to parse for both stations and trips tables

In [138]:
df_stations.head(4)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(value='"internal_bus_station_id": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5],'), Row(value='"public_bus_station": ["BAutogara", "BVAutogara", "SBAutogara","CJAutogara", "MMAutogara","ISAutogara", "CTAutogara","TMAutogara", "BCAutogara", "MSAutogara", "ISAutogara"]')]

In [139]:
df_trips.head(4)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(value='"ORIGIN": ["B","BV","TM","CJ"],'), Row(value='"DESTINATION": ["MM","IS","CT","BC"],'), Row(value='"INTERNAL_BUS_STATION_IDS": [[0,2,4],[1,8,5],[7,2,6],[3,9,8]],'), Row(value='" TRIPTIMES": [[2020-03-01 10:10:00 , 2020-03-01 12:20:10, 2020-03-01 15:10:00],[ 2020-03-01 10:10:00, 2020-03-01 12:20:10, 2020-03-01 15:10:00],[2020-04-01 10:10:00, 2020-04-01 12:20:10, 2020-04-01 15:10:00],[2020-05-01 10:10:00, 2020-05-01 12:20:10, 2020-05-01 15:10:00 ]]')]

### Parse the Data in unidimensional Array Format and organize it in a tabular format

In [140]:
def last_index_right_brackets(string):
    index_close_array = string.rfind("]")
    if index_close_array == len(string)-1:
        index_close_array = -1
    return index_close_array

def convert_to_dict(column): 
    map_return = {}    
    map_kv = column.split(':')
    prop = map_kv[0].strip().lower()
    prop = prop.replace('"', '')
    val = map_kv[1]
    val = val.strip()
    
    #remove both comma if exists and the close square bracket
    index_close_array = last_index_right_brackets(val)
    val = val[1:index_close_array]
    array_of_arr = val.split(',')
    intermediary_array = []
    for detail_item in array_of_arr:
        detail_item = detail_item.strip()
        detail_item = detail_item.replace('"', '')
        intermediary_array.append(detail_item)
        val = intermediary_array 
    map_return[prop] = val
    return map_return

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### UDF Function to read single dimensional Arrays

In [141]:
string_to_dict_udf = F.udf(convert_to_dict, T.MapType(T.StringType(), T.ArrayType(T.StringType())))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Load unidimensional arrays first

### Load Stations data

In [142]:
df_stations = df_stations.withColumn("dict_from_string", string_to_dict_udf(F.col("value")))

df_stations.show(truncate=False, vertical=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 value            | "internal_bus_station_id": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5],                                                                                                                
 dict_from_string | [internal_bus_station_id -> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 5]]                                                                                                               
-RECORD 1----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 value            | "public_bus_station": ["BAutogara", "BVAutogara", "SBAutogara","CJAutogara", "MMAutogara","ISAutogara", "CTAutogara","TMAutogara", "BCAutogara", "MSAutogara", "ISAutogara"] 
 dict_from_string | [public_bu

### GET the data from the MAP Types and insert as their own rows 

In [143]:
data_public_bus = df_stations.select(F.explode(F.col("dict_from_string")['public_bus_station']).alias("public_bus_station"))
df_ibsi = df_stations.select(F.explode(F.col("dict_from_string")['internal_bus_station_id']).alias("internal_bus_station_id"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Stations Table Initial

In [144]:
#maintain the original order from original dataset
data_public_bus = data_public_bus.withColumn("join_id_bus", F.monotonically_increasing_id())
df_ibsi = df_ibsi.withColumn("join_id_ibsi", F.monotonically_increasing_id())

inner_join = data_public_bus.join(df_ibsi,data_public_bus.join_id_bus == df_ibsi.join_id_ibsi, how='inner')

station_staging_table = inner_join.select(["internal_bus_station_id", "public_bus_station"])

station_staging_table = station_staging_table.drop_duplicates(subset=['internal_bus_station_id'])

station_staging_table = station_staging_table.withColumn("default_value",F.lit("ABC")) #default value only to generate row_number
w = Window().partitionBy('default_value').orderBy(F.lit('A'))
station_staging_table = station_staging_table.withColumn("row", F.row_number().over(w)).drop("default_value")

station_staging_table = station_staging_table.select(["row", "internal_bus_station_id", "public_bus_station"])
station_staging_table.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+-----------------------+------------------+
|row|internal_bus_station_id|public_bus_station|
+---+-----------------------+------------------+
|  1|                      7|        TMAutogara|
|  2|                      3|        CJAutogara|
|  3|                      8|        BCAutogara|
|  4|                      0|         BAutogara|
|  5|                      5|        ISAutogara|
|  6|                      6|        CTAutogara|
|  7|                      9|        MSAutogara|
|  8|                      1|        BVAutogara|
|  9|                      4|        MMAutogara|
| 10|                      2|        SBAutogara|
+---+-----------------------+------------------+

### Load Trips Table columns origin and destination

In [145]:
df_trips = df_trips.withColumn("dict_from_string", string_to_dict_udf(F.col("value")))
df_trips.show(truncate=False, vertical=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 value            | "ORIGIN": ["B","BV","TM","CJ"],                                                                                                                                                                                                                                                    
 dict_from_string | [origin -> [B, BV, TM, CJ]]                                                                                                                                                                                                                                                        
-RECORD 1-------------------------------------------------------------------------------------------------------

In [146]:
data_origin = df_trips.select(F.explode(F.col("dict_from_string")['origin']).alias("origin"))
data_origin = data_origin.withColumn("id", F.monotonically_increasing_id())
data_origin = data_origin.select(["id", "origin"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [147]:
data_origin.show(truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+------+
|id |origin|
+---+------+
|0  |B     |
|1  |BV    |
|2  |TM    |
|3  |CJ    |
+---+------+

In [148]:
data_dest = df_trips.select(F.explode(F.col("dict_from_string")['destination']).alias("destination"))
data_dest = data_dest.withColumn("id_dest", F.monotonically_increasing_id())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [149]:
data_dest.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+-------+
|destination|id_dest|
+-----------+-------+
|         MM|      0|
|         IS|      1|
|         CT|      2|
|         BC|      3|
+-----------+-------+

### Intermediary Join origin with destination

In [150]:
inner_join = data_origin.join(data_dest,data_origin.id == data_dest.id_dest, how='inner')
trips_table = inner_join.select("id",  "origin", "destination")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [151]:
trips_table.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+------+-----------+
| id|origin|destination|
+---+------+-----------+
|  0|     B|         MM|
|  1|    BV|         IS|
|  2|    TM|         CT|
|  3|    CJ|         BC|
+---+------+-----------+

### Parse the Data in Multidimensional Array Format and organize it in a tabular format

In [152]:
def convert_to_map(column):
    map_return = ""
    start_column_index = column.find("[[")
    end_column_index = column.find("]]")
    slice_column_string = column[start_column_index : end_column_index]
    slice_column_string = slice_column_string.replace("[", "")
    slice_column_string = slice_column_string.replace("]", "")
    arr_column_strings = slice_column_string.split(',')
    #add variable to identify the trip
    trip_index = 0
    trip_array = []
    trips_dict = {}
    counter = 1
    for index, item in enumerate(arr_column_strings):
        #cleaning dates by spaces and quotes
        item = item.strip()
        item = item.replace('"', '')
        trip_array.append(item)
        if counter%3 == 0:
            trips_dict[trip_index] = trip_array
            trip_index = trip_index + 1
            trip_array = []
        counter = counter + 1
    return trips_dict
            
    
    
string_to_map_udf = F.udf(convert_to_map, T.MapType(T.IntegerType(), T.ArrayType(T.StringType())))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [153]:
df_triptimes = df_trips.select('value')
# Filter only for triptimes
df_triptimes = df_triptimes.filter(F.lower(df_triptimes.value).contains('triptimes'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [154]:
df_triptimes = df_triptimes.withColumn("map_column", string_to_map_udf(F.col("value")))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [155]:
exploded_triptimes = df_triptimes.select(F.explode(F.col("map_column")))
exploded_triptimes = exploded_triptimes.select(F.col("key"), F.explode(F.col("value")))
exploded_triptimes = exploded_triptimes.withColumn("id", F.monotonically_increasing_id())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## TripTimes Dimension table

In [None]:
triptimes_table = exploded_triptimes.select("id",  F.col("col").alias("triptimes"), F.col("key").alias("tripkey"))
triptimes_table.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df_internal_bus_station_ids = df_trips.select('value')

df_internal_bus_station_ids = df_internal_bus_station_ids.filter(F.lower(df_internal_bus_station_ids.value).contains('internal_bus_station_ids'))

In [None]:
df_internal_bus_station_ids = df_internal_bus_station_ids.withColumn("map_column", string_to_map_udf(F.col("value")))

In [None]:
df_internal_bus_station_ids.show(truncate=False)

In [None]:
exploded_internal_bus_station_ids = df_internal_bus_station_ids.select(F.explode(F.col("map_column")))
exploded_internal_bus_station_ids = exploded_internal_bus_station_ids.select(F.col("key"), F.explode(F.col("value")))
exploded_internal_bus_station_ids = exploded_internal_bus_station_ids.withColumn("id", F.monotonically_increasing_id())
exploded_internal_bus_station_ids.show(truncate=False)

## Bus Stations Dimension table

In [None]:
internal_bus_station_ids = exploded_internal_bus_station_ids.select("id",  F.col("col").alias("bus_stations"), F.col("key").alias("tripkey"))
internal_bus_station_ids = internal_bus_station_ids.join(station_staging_table, internal_bus_station_ids.bus_stations==station_staging_table.internal_bus_station_id, how="inner")
internal_bus_station_ids = internal_bus_station_ids.select("id", "tripkey", "bus_stations", "public_bus_station").orderBy("tripkey")
internal_bus_station_ids.show()

### Transform triptimes column from staging triptimes_table in time format from String and add a new column min

In [None]:
triptimes_table = triptimes_table.select('id', 'tripkey', F.to_timestamp(triptimes_table.triptimes, 'yyyy-MM-dd HH:mm:ss').alias('dt')).orderBy("dt")
triptimes_table.show()

### Calculate journey time duration

In [None]:
import datetime
def calculate_journey_duration(column):
    last_date = datetime.datetime.strptime(column[len(column) - 1], '%Y-%m-%d %H:%M:%S')
    first_date = datetime.datetime.strptime(column[0], '%Y-%m-%d %H:%M:%S')
    dif_result =  last_date - first_date
    journey_time_minutes = int(dif_result.seconds / 60)
    return journey_time_minutes  
    
calculate_journey_udf = F.udf(calculate_journey_duration, T.IntegerType())

In [None]:
triptimes_table.show()

In [None]:
def sorter(l):
  res = sorted(l, key=operator.itemgetter(0))
  return [item[1] for item in res]

sort_udf = F.udf(sorter, T.ArrayType(T.StringType()))

In [None]:
# w = Window.partitionBy('tripkey').orderBy('id')
triptimes_table = triptimes_table.withColumn("triptimes_str", F.date_format(F.col("dt"), "yyyy-MM-dd HH:mm:ss"))
triptimes_table_agg = triptimes_table.groupBy('tripkey').agg(F.collect_list(F.struct("id", "triptimes_str")).alias("triptimes_unordered")).orderBy("tripkey")
triptimes_table_agg = triptimes_table_agg.withColumn("triptimes", sort_udf(F.col("triptimes_unordered"))).drop("triptimes_unordered")
triptimes_table_agg = triptimes_table_agg.withColumn("min", calculate_journey_udf(F.col('triptimes')).cast("integer"))
triptimes_table_agg.show(truncate=False)

### Intermediary join origin with destination and duration

In [None]:
trips_table = trips_table.join(triptimes_table_agg, trips_table.id==triptimes_table_agg.tripkey, how="inner")

In [None]:
trips_table = trips_table.select("id",  "origin", "destination", "min")
trips_table.show()

## UDF to accomodate collect_list aggregated columns in the right order 

In [None]:
def sorter(l):
  res = sorted(l, key=operator.itemgetter(0))
  return [item[1] for item in res]

sort_udf = F.udf(sorter)

# Trips Table Initial

In [None]:
internal_bus_station_ids_agg = internal_bus_station_ids.groupBy('tripkey').agg(F.collect_list(F.struct("id", "bus_stations")).alias("bus_unordered")).orderBy("tripkey")
internal_bus_station_ids_agg = internal_bus_station_ids_agg.withColumn("bus_stations", sort_udf(F.col("bus_unordered"))).drop("bus_unordered")
trips_table_initial = trips_table.join(internal_bus_station_ids_agg.select("bus_stations", "tripkey"), internal_bus_station_ids_agg.tripkey==trips_table.id, how="inner").drop("tripkey") 

trips_table_initial= trips_table_initial.join(triptimes_table_agg, triptimes_table_agg.tripkey==trips_table_initial.id, how="inner")
trips_table_initial = trips_table_initial.withColumn("default_value",F.lit("ABC")) #default value only to generate row_number
w = Window().partitionBy('default_value').orderBy(F.lit('A'))
trips_table_initial = trips_table_initial.withColumn("row", F.row_number().over(w)).drop("default_value")
trips_table_initial = trips_table_initial.select("row", "origin", "destination", "bus_stations", "triptimes")
trips_table_initial.show(truncate=False)

# Trips Fact Table

In [None]:
internal_bus_station_ids_fact_agg = internal_bus_station_ids.groupBy('tripkey').agg(F.collect_list(F.struct("id", "public_bus_station")).alias("public_bus_unordered")).orderBy("tripkey")
internal_bus_station_ids_fact_agg = internal_bus_station_ids_fact_agg.withColumn("public_bus_station", sort_udf(F.col("public_bus_unordered"))).drop("public_bus_unordered")
trips_table_fact = trips_table.join(internal_bus_station_ids_fact_agg.select("public_bus_station", "tripkey"), internal_bus_station_ids_fact_agg.tripkey==trips_table.id, how="inner") 
trips_table_fact = trips_table_fact.withColumn("default_value",F.lit("ABC")) #default value only to generate row_number
w = Window().partitionBy('default_value').orderBy(F.lit('A'))
trips_table_fact = trips_table_fact.withColumn("row", F.row_number().over(w)).drop("default_value")
trips_table_fact = trips_table_fact.select("row", "origin", "destination", "public_bus_station", F.col("min").alias("duration"))

In [None]:
trips_table_fact.show(truncate=False)

# Conclusions

In Conclusion the final table is an OLAP Cube formed by aggregation from duration Fact table on the dimension columns (origin, destination, bus_station names)

1. Station Dimension Table in tabular format 

In [None]:
internal_bus_station_ids.show(truncate=False)

2. Trips Dimension Tables Origin and Destination

In [None]:
trips_table.select("id",  "origin", "destination").show(truncate=False)

3. Trips Dimension Tables Triptimes

In [None]:
triptimes_table.show()

4. Fact Duration Table

In [None]:
triptimes_table_agg.show(truncate=False)

5. Initial Table

In [None]:
trips_table_initial.show(truncate=False)

6. Final Fact Table aggregated(OLAP Cube) on dimensions

In [None]:
trips_table_fact.show(truncate=False)