## **ETL**

Notebook con scripts para poblar un datawarehouse en Azure SQL Database de los recorridos de ecobici.

Los datos de los recorridos están almacenados en una tabla de una base de datos local PostgreSQL.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

# jdbc to interact with postgresql and azure sql database
jars = ['/home/jovyan/ecobici_rides/sqljdbc_8.2/enu/mssql-jdbc-8.2.0.jre8.jar',
 '/home/jovyan/ecobici_rides/postgresql-42.2.12.jar']

In [2]:
spark = SparkSession.builder \
            .master('local[*]') \
            .appName('etl') \
            .config('spark.driver.extraClassPath', ','.join(jars)) \
            .config("spark.jars", ','.join(jars)) \
            .getOrCreate()

In [33]:
PG_USERNAME = '<username>'
PG_PASSWORD = '<password>'
PG_JDBC_URL = f'jdbc:postgresql://localhost/ecobici?user={PG_USERNAME}&password={PG_PASSWORD}'

AZ_USERNAME = '<username>'
AZ_PSSWD = "<password>"
AZ_JDBC_URL = f"jdbc:sqlserver://datadev-stuff.database.windows.net:1433;database=dw_ecobci;user={AZ_USERNAME}@datadev-stuff;password={AZ_PSSWD};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

### **CARGA DE DATOS A LA TABLA DIM_DATE**

Obtenemos las diferentes fechas que existen en las columna *fecha_origen_recorrido* y *fecha_destino_recorrido*.

Estos valores de fecha, se cargarán en la tabla DIM_DATE.

Se obtienen las diferentes fechas para dichas columnas, estas se almacenan primero en su correspondiente dataframe, luego se unifican en uno solo. A partir de ahí, se calculan valores como el año, el mes, día del mes, del año, de la semana para esa fecha.

In [4]:
df_list = []
for _ in ['origen', 'destino']:
    query = f'''
    SELECT DISTINCT(DATE(fecha_{_}_recorrido))
    FROM rides
    '''
    
    df = spark.read.format('jdbc') \
            .option('url', PG_JDBC_URL) \
            .option('driver', 'org.postgresql.Driver') \
            .option('query', query) \
            .load()
    df_list.append(df)

In [5]:
all_dates = df_list[0].union(df_list[1]).distinct()

In [6]:
all_dates = all_dates.select( \
                 F.col('date').alias('date_value'), \
                 F.year('date').alias('year'), F.month('date').alias('month'), \
                 F.dayofyear('date').alias('day_of_year'), F.dayofmonth('date').alias('day_of_month'), \
                 F.dayofweek('date').alias('day_of_week'))

In [7]:
all_dates = all_dates.orderBy('date_value')

In [8]:
all_dates.show(10)

+----------+----+-----+-----------+------------+-----------+
|date_value|year|month|day_of_year|day_of_month|day_of_week|
+----------+----+-----+-----------+------------+-----------+
|2010-12-01|2010|   12|        335|           1|          4|
|2010-12-02|2010|   12|        336|           2|          5|
|2010-12-03|2010|   12|        337|           3|          6|
|2010-12-04|2010|   12|        338|           4|          7|
|2010-12-06|2010|   12|        340|           6|          2|
|2010-12-07|2010|   12|        341|           7|          3|
|2010-12-09|2010|   12|        343|           9|          5|
|2010-12-10|2010|   12|        344|          10|          6|
|2010-12-11|2010|   12|        345|          11|          7|
|2010-12-13|2010|   12|        347|          13|          2|
+----------+----+-----+-----------+------------+-----------+
only showing top 10 rows



Finalmente, se cargan los datos de este dataframe en su correspondiente tabla en la base de datos en Azure.

In [None]:
all_dates.write.format('jdbc') \
    .mode('append') \
    .option('url', PG_JDBC_URL) \
    .option('dbtable', 'dim_date') \
    .option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver') \
    .save()

### **CARGA DE DATOS DE LA TABLA DIM_HOUR**

Considero que es de interés la hora en la que se realizó el recorrido. Las horas varían entre 0 y 23, entonces es fácil generar un dataframe con estos números y, en base a éste, cargar la correspondiente tabla.

In [50]:
spark.range(0, 24, numPartitions=1) \
    .select(F.col('id').alias('hour_value')) \
    .write.option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver') \
    .jdbc(AZ_JDBC_URL, 'dim_hour', mode='append')

### **CARGA DE DATOS DE LA TABLA DIM_STATION**

Los datos de las estaciones de ecobici están contenidos en un archivo csv con varias columnas.

Se define un esquema para determinar las columnas a utilizar y el tipo de datos asociado.

Finalmente, con el dataframe creado, se cargan los datos en la tabla DIM_STATION.

In [6]:
my_schema = T.StructType([
                T.StructField('station_id', T.ShortType()),
                T.StructField('station_name', T.StringType()),
                T.StructField('lat', T.FloatType()),
                T.StructField('long', T.FloatType())
            ])

In [7]:
stations = spark.read.csv('all_stations.csv', my_schema, header=True).coalesce(1)

In [10]:
stations.orderBy('station_id') \
.write.option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver') \
.jdbc(AZ_JDBC_URL, 'dim_station', mode='append')

### **CARGA DE DATOS A LA TABLA DE HECHOS FACT_RIDE**

Se trabaja sobre la base de datos local en PostgreSQL con datos de los recorridos, la cual datos los identificadores de las estaciones de inicio y origen del recorrido, la fecha y hora de inicio y fin, y la duración del recorrido.

Se hacen agregaciones para tener la cantidad de recorridos y el total de minutos entre dos estaciones para una fecha y hora específica.

Luego es necesario obtener los diferentes ids para asociar a las tablas de dimensiones

In [32]:
query = """
SELECT id_estacion_origen, id_estacion_destino,
date(fecha_origen_recorrido),
cast(extract(hour from fecha_origen_recorrido) as smallint) as start_time,
cast(extract(hour from fecha_destino_recorrido) as smallint) as end_time,
count(*) as quantity, sum(duracion_recorrido) as hours_total
FROM rides
GROUP BY
id_estacion_origen, id_estacion_destino,
date(fecha_origen_recorrido),
cast(extract(hour from fecha_origen_recorrido) as smallint),
cast(extract(hour from fecha_destino_recorrido) as smallint)
"""
t = spark.read.format('jdbc') \
.option('url', PG_JDBC_URL) \
.option('query', query) \
.option('fetchsize', 1000000) \
.option('driver', 'org.postgresql.Driver') \
.load()

In [12]:
t.show(10)

+------------------+-------------------+----------+----------+--------+--------+-----------+
|id_estacion_origen|id_estacion_destino|      date|start_time|end_time|quantity|hours_total|
+------------------+-------------------+----------+----------+--------+--------+-----------+
|                 1|                  1|2010-12-01|         9|      10|       3|        151|
|                 1|                  1|2010-12-01|        10|      10|       1|          2|
|                 1|                  1|2010-12-01|        11|      12|       1|         26|
|                 1|                  1|2010-12-01|        11|      13|       1|        126|
|                 1|                  1|2010-12-01|        13|      14|       1|         63|
|                 1|                  1|2010-12-01|        13|      15|       1|        121|
|                 1|                  1|2010-12-01|        16|      18|       1|        121|
|                 1|                  1|2010-12-01|        17|      18

#### **OBTIENENDO LOS ID DE LA TABLA DIM_HOUR**

In [16]:
dim_hour_table = spark.read.format('jdbc') \
                    .option('url', AZ_JDBC_URL) \
                    .option('dbtable', 'dim_hour') \
                    .option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver') \
                    .load()
dim_hour_table.printSchema()

root
 |-- hour_id: integer (nullable = true)
 |-- hour_value: integer (nullable = true)



In [33]:
for time in ['start_time', 'end_time']:
    t = t.join(F.broadcast(dim_hour_table), t[time] == dim_hour_table.hour_value)
    t = t.drop(time, 'hour_value')
    t = t.withColumnRenamed('hour_id', f'{time}_id')
t.show(10)

+------------------+-------------------+----------+--------+-----------+-------------+-----------+
|id_estacion_origen|id_estacion_destino|      date|quantity|hours_total|start_time_id|end_time_id|
+------------------+-------------------+----------+--------+-----------+-------------+-----------+
|                 1|                  1|2010-12-01|       3|        151|           10|         11|
|                 1|                  1|2010-12-01|       1|          2|           11|         11|
|                 1|                  1|2010-12-01|       1|         26|           12|         13|
|                 1|                  1|2010-12-01|       1|        126|           12|         14|
|                 1|                  1|2010-12-01|       1|         63|           14|         15|
|                 1|                  1|2010-12-01|       1|        121|           14|         16|
|                 1|                  1|2010-12-01|       1|        121|           17|         19|
|         

#### **OBTENIENDO LOS ID DE LA TABLA DIM_DATE**

In [40]:
dim_date_table = spark.read.format('jdbc') \
                    .option('url', AZ_JDBC_URL) \
                    .option('query', 'select date_id, date_value from dim_date') \
                    .option('fetchsize', '3000') \
                    .option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver') \
                    .load()
dim_date_table.printSchema()

root
 |-- date_id: short (nullable = true)
 |-- date_value: date (nullable = true)



In [45]:
t = t.join(F.broadcast(dim_date_table), t.date == dim_date_table.date_value) \
    .drop('date', 'date_value') \
    .withColumnRenamed('date_id', 'ride_date_id')
t.show(10)

+------------------+-------------------+--------+-----------+-------------+-----------+------------+
|id_estacion_origen|id_estacion_destino|quantity|hours_total|start_time_id|end_time_id|ride_date_id|
+------------------+-------------------+--------+-----------+-------------+-----------+------------+
|                 1|                  1|       3|        151|           10|         11|           1|
|                 1|                  1|       1|          2|           11|         11|           1|
|                 1|                  1|       1|         26|           12|         13|           1|
|                 1|                  1|       1|        126|           12|         14|           1|
|                 1|                  1|       1|         63|           14|         15|           1|
|                 1|                  1|       1|        121|           14|         16|           1|
|                 1|                  1|       1|        121|           17|         19|    

#### **OBTENIENDO LOS ID DE LA TABLA DIM_STATION**

In [47]:
dim_station_table = spark.read.format('jdbc') \
                    .option('url', AZ_JDBC_URL) \
                    .option('query', 'select unique_id, station_id from dim_station') \
                    .option('fetchsize', '430') \
                    .option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver') \
                    .load()
dim_station_table.printSchema()

root
 |-- unique_id: short (nullable = true)
 |-- station_id: short (nullable = true)



In [53]:
for old_name, new_name in {'id_estacion_origen': 'origin_station_id', 'id_estacion_destino':'destination_station_id'}.items():
    t = t.join(dim_station_table, t[old_name] == dim_station_table.station_id) \
        .drop(old_name, 'station_id') \
        .withColumnRenamed('unique_id', new_name)
t.show(10)

+--------+-----------+-------------+-----------+------------+-----------------+----------------------+
|quantity|hours_total|start_time_id|end_time_id|ride_date_id|origin_station_id|destination_station_id|
+--------+-----------+-------------+-----------+------------+-----------------+----------------------+
|       1|         35|           14|         15|        1871|              148|                   148|
|       1|         66|           22|         23|        1878|              148|                   148|
|       1|         66|           17|         18|        1879|              148|                   148|
|       1|         46|           13|         14|        1882|              148|                   148|
|       1|         62|           11|         12|        1883|              148|                   148|
|       1|         65|           14|         14|        1884|              148|                   148|
|       1|         60|           15|         15|        1885|            

El dataframe ya está listo para ser cargado a la tabla de hechos, solo queda reordenar las columnas para que coincida con el esquema de la tabla en la base de datos.

In [55]:
cols = ['hours_total', 'quantity', 'origin_station_id', 'destination_station_id', 'ride_date_id', 'start_time_id', 'end_time_id']
t = t.select(*cols) \
    .orderBy(*cols)
t.show(10)

+-----------+--------+-----------------+----------------------+------------+-------------+-----------+
|hours_total|quantity|origin_station_id|destination_station_id|ride_date_id|start_time_id|end_time_id|
+-----------+--------+-----------------+----------------------+------------+-------------+-----------+
|          0|       1|                1|                     1|          74|           15|         15|
|          0|       1|                1|                     1|          89|           15|         15|
|          0|       1|                1|                     1|          95|           19|         19|
|          0|       1|                1|                     1|         146|            9|          9|
|          0|       1|                1|                     1|         163|           14|         14|
|          0|       1|                1|                     1|         163|           18|         18|
|          0|       1|                1|                     1|         1

In [32]:
t.write.option('driver', 'com.microsoft.sqlserver.jdbc.SQLServerDriver') \
.option('batchsize', 100000) \
.jdbc(AZ_JDBC_URL, 'fact_ride', mode='append')

Y así, el datawarehouse ha sido cargado por completo :)