## Transformation for rental journey data 
This notebook is responsible for transforming journey data by performing the following tasks:

    1. Renaming columns (removing spaces and lowercasing)

    2. Convert data types from string to timestamps
    
    3. Attach weather dates
    
    4. Drop unnecessary columns
    
    5. Update extra files for dimension tables

In [1]:
import pyspark
import os

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
        .master('local[*]') \
        .appName('journey-and-stations-data-transformer') \
        .config("spark.hadoop.fs.s3a.access.key", os.environ.get('AWS_ACCESS_KEY'))\
        .config("spark.hadoop.fs.s3a.secret.key", os.environ.get('AWS_SECRET_ACCESS_KEY'))\
        .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/01 19:15:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# get journey data
df_journey = spark.read.csv("s3a://hrc-de-data/raw/cycling-journey/*/*", inferSchema=True, header=True)

22/03/01 19:15:14 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [5]:
df_journey.take(2)

[Row(Rental Id=109096951, Duration=540, Bike Id=13318, End Date='15/06/2021 20:19', EndStation Id=661, EndStation Name='All Saints Church, Portobello', Start Date='15/06/2021 20:10', StartStation Id=105, StartStation Name='Westbourne Grove, Bayswater'),
 Row(Rental Id=108982015, Duration=780, Bike Id=18991, End Date='13/06/2021 13:03', EndStation Id=312, EndStation Name="Grove End Road, St. John's Wood", Start Date='13/06/2021 12:50', StartStation Id=106, StartStation Name='Woodstock Street, Mayfair')]

In [6]:
df_journey.printSchema()

root
 |-- Rental Id: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- Bike Id: integer (nullable = true)
 |-- End Date: string (nullable = true)
 |-- EndStation Id: integer (nullable = true)
 |-- EndStation Name: string (nullable = true)
 |-- Start Date: string (nullable = true)
 |-- StartStation Id: integer (nullable = true)
 |-- StartStation Name: string (nullable = true)



In [7]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [8]:
# rename columns
df_journey= df_journey.withColumnRenamed('Rental Id', 'rental_id')\
.withColumnRenamed('Bike Id', 'bike_id')\
.withColumnRenamed('Start Date', 'start_date')\
.withColumnRenamed('End Date', 'end_date')\
.withColumnRenamed('StartStation Id', 'start_station')\
.withColumnRenamed('EndStation Id', 'end_station')

In [10]:
# convert data types
df_journey= df_journey.withColumn('start_date', to_timestamp(col('start_date'), 'dd/MM/yyy HH:mm'))

df_journey= df_journey.withColumn('end_date',  to_timestamp(col('end_date'), 'dd/MM/yyy HH:mm'))

In [11]:
# add weather_date column
df_journey= df_journey.withColumn('weather_date', to_date(col("start_date"), 'dd/MM/yyy HH:mm'))

In [12]:
df_journey.show(5)
df_journey.printSchema()

+---------+-------+-------------------+-----------+--------------------+-------------------+-------------+--------------------+------------+
|rental_id|bike_id|           end_date|end_station|     EndStation Name|         start_date|start_station|   StartStation Name|weather_date|
+---------+-------+-------------------+-----------+--------------------+-------------------+-------------+--------------------+------------+
|109096951|  13318|2021-06-15 20:19:00|        661|All Saints Church...|2021-06-15 20:10:00|          105|Westbourne Grove,...|  2021-06-15|
|108982015|  18991|2021-06-13 13:03:00|        312|Grove End Road, S...|2021-06-13 12:50:00|          106|Woodstock Street,...|  2021-06-13|
|108839141|  16736|2021-06-10 15:28:00|        333|Palace Gardens Te...|2021-06-10 15:14:00|          106|Woodstock Street,...|  2021-06-10|
|108816591|    913|2021-06-09 22:37:00|         51|Finsbury Library ...|2021-06-09 22:14:00|          123|St. John Street, ...|  2021-06-09|
|108919084|  

### Stations data
We are going to update the stations data (previously saved by another process) with some additional stations that are not present in the original stations data but are seen in some journey.

In [13]:
# read previously saved stations data from parquet
df_processed_stations= spark.read.parquet('s3a://hrc-de-data/processed/cycling-dimension/stations/')

                                                                                

In [14]:
df_processed_stations.tail(2)

                                                                                

[Row(station_id=838, station_name='Fore Street Avenue, Guildhall', longitude=-0.0914017, latitude=51.518093, easting=532524.0, northing=181634.0),
 Row(station_id=839, station_name='Sea Containers, South Bank', longitude=-0.1068403, latitude=51.507974, easting=531482.0, northing=180481.0)]

In [15]:
# create temporary table for both stations and journey
df_journey.createOrReplaceTempView('journey')
df_processed_stations.createOrReplaceTempView('station')

In [16]:
# we keep all the stations which are not found in the temp view station table
additional_stations= spark.sql('''
select distinct(start_station) as station_id, `StartStation Name` as station_name 
from journey 
where start_station not in (select station_id from station)
union
select distinct(end_station) as station_id, `EndStation Name` as station_name 
from journey 
where end_station not in (select station_id from station)
''')
additional_stations.show()



+----------+--------------------+
|station_id|        station_name|
+----------+--------------------+
|       840|George Row, Bermo...|
|       391|Clifford Street, ...|
|       842|Temple Gardens, T...|
|       844|Canada Water Stat...|
|       845|Bermondsey Statio...|
|       841|Tower Wharf, Berm...|
+----------+--------------------+



                                                                                

In [18]:
# add columns to the additional stations to avoid errors when merging it to the previous one (df_processed_stations)
additional_stations= additional_stations.withColumn('longitude', lit(0).cast(DoubleType()))\
.withColumn('latitude', lit(0).cast(DoubleType()))\
.withColumn('easting', lit(0).cast(DoubleType()))\
.withColumn('northing', lit(0).cast(DoubleType()))

In [19]:
additional_stations.show(5)
additional_stations.printSchema()



+----------+--------------------+---------+--------+-------+--------+
|station_id|        station_name|longitude|latitude|easting|northing|
+----------+--------------------+---------+--------+-------+--------+
|       840|George Row, Bermo...|      0.0|     0.0|    0.0|     0.0|
|       391|Clifford Street, ...|      0.0|     0.0|    0.0|     0.0|
|       842|Temple Gardens, T...|      0.0|     0.0|    0.0|     0.0|
|       844|Canada Water Stat...|      0.0|     0.0|    0.0|     0.0|
|       845|Bermondsey Statio...|      0.0|     0.0|    0.0|     0.0|
+----------+--------------------+---------+--------+-------+--------+
only showing top 5 rows

root
 |-- station_id: integer (nullable = true)
 |-- station_name: string (nullable = true)
 |-- longitude: double (nullable = false)
 |-- latitude: double (nullable = false)
 |-- easting: double (nullable = false)
 |-- northing: double (nullable = false)



                                                                                

In [20]:
# remove duplicate values
additional_stations.dropDuplicates()

DataFrame[station_id: int, station_name: string, longitude: double, latitude: double, easting: double, northing: double]

In [21]:
# save additional stations data into parquet files in s3
additional_stations.write.parquet('s3a://hrc-de-data/processed/cycling-dimension/stations/', mode='append')

                                                                                

In [22]:
# drop other unnecessary journey columns
df_journey= df_journey.drop('StartStation Name', 'EndStation Name', 'Duration')

### Datetime
We are going to create/update datetime data from the start and end date of each journey.

In [23]:
# extract datetime values from the start and the end date
df_datetime_from_start= (
    df_journey.select(
        col('start_date').alias('datetime_id'), 
        year(col('start_date')).alias('year'), 
        month(col('start_date')).alias('month'), 
        dayofmonth(col('start_date')).alias('day'),
        hour(col('start_date')).alias('hour'),
        minute(col('start_date')).alias('minute'),
        second(col('start_date')).alias('second'),
    )
)
df_datetime_from_end= (
    df_journey.select(
        col('end_date').alias('datetime_id'), 
        year(col('end_date')).alias('year'), 
        month(col('end_date')).alias('month'), 
        dayofmonth(col('end_date')).alias('day'),
        hour(col('end_date')).alias('hour'),
        minute(col('end_date')).alias('minute'),
        second(col('end_date')).alias('second'),
    )
)

df_datetime_from_start.show(3)
df_datetime_from_end.show(3)

+-------------------+----+-----+---+----+------+------+
|        datetime_id|year|month|day|hour|minute|second|
+-------------------+----+-----+---+----+------+------+
|2021-06-15 20:10:00|2021|    6| 15|  20|    10|     0|
|2021-06-13 12:50:00|2021|    6| 13|  12|    50|     0|
|2021-06-10 15:14:00|2021|    6| 10|  15|    14|     0|
+-------------------+----+-----+---+----+------+------+
only showing top 3 rows

+-------------------+----+-----+---+----+------+------+
|        datetime_id|year|month|day|hour|minute|second|
+-------------------+----+-----+---+----+------+------+
|2021-06-15 20:19:00|2021|    6| 15|  20|    19|     0|
|2021-06-13 13:03:00|2021|    6| 13|  13|     3|     0|
|2021-06-10 15:28:00|2021|    6| 10|  15|    28|     0|
+-------------------+----+-----+---+----+------+------+
only showing top 3 rows



In [24]:
# combine the dataframes
df_datetime= df_datetime_from_start.union(df_datetime_from_end)

# remove duplicate entries
df_datetime.dropDuplicates()

df_datetime.show(10)

+-------------------+----+-----+---+----+------+------+
|        datetime_id|year|month|day|hour|minute|second|
+-------------------+----+-----+---+----+------+------+
|2021-06-15 20:10:00|2021|    6| 15|  20|    10|     0|
|2021-06-13 12:50:00|2021|    6| 13|  12|    50|     0|
|2021-06-10 15:14:00|2021|    6| 10|  15|    14|     0|
|2021-06-09 22:14:00|2021|    6|  9|  22|    14|     0|
|2021-06-12 11:09:00|2021|    6| 12|  11|     9|     0|
|2021-06-10 22:33:00|2021|    6| 10|  22|    33|     0|
|2021-06-13 14:48:00|2021|    6| 13|  14|    48|     0|
|2021-06-14 18:06:00|2021|    6| 14|  18|     6|     0|
|2021-06-14 18:06:00|2021|    6| 14|  18|     6|     0|
|2021-06-09 16:06:00|2021|    6|  9|  16|     6|     0|
+-------------------+----+-----+---+----+------+------+
only showing top 10 rows



In [25]:
# save datetime data into parquet files in s3
df_datetime.write.parquet('s3a://hrc-de-data/processed/cycling-dimension/datetime/', mode='append')

                                                                                

In [26]:
# finally, save journey data into parquet files in s3
df_journey.write.parquet('s3a://hrc-de-data/processed/cycling-fact/journey/', mode='append')

                                                                                