# ETL Bronze to Silver

---

# 1. Bibliotecas

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (IntegerType, DecimalType, StringType)

import psycopg2
from psycopg2.extras import execute_batch

# 2. Configurações

In [2]:
PATH = '../Data Layer/raw/dados_brutos.csv'

DB_CONFIG = {
    'host': 'localhost',
    'port': 5432,
    'database': 'cars_trips',
    'user': 'postgres',
    'password': 'postgres'
}

# 3. Visualização dos Dados e Retirada de colunas

Novamente será feito a visualização dos dados onde já sera feito a retirada das duas colunas desnecessárias, sendo elas `_c20` e `Vehicle Images`.

O id único `Booking_ID` também vai ser dispensado, uma vez que ele não traz nenhuma informação a ser analisada, servidno apenas como id, o que vai ser substituido por um id de fato no banco.

In [3]:
spark = SparkSession.builder.appName("silver").getOrCreate()

df_silver = spark.read.csv(PATH, header=True, nullValue="null", emptyValue=None, nanValue="nan").drop('_c20')
df_silver = df_silver.drop('Vehicle Images')
df_silver = df_silver.drop('Booking_ID')
df_silver.show(n=10)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/19 08:19:55 WARN Utils: Your hostname, CyberCore, resolves to a loopback address: 127.0.1.1; using 192.168.1.3 instead (on interface wlp0s20f3)
26/01/19 08:19:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/19 08:19:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/01/19 08:19:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


+-------------------+--------+--------------------+-----------+------------+-----------------+-------------+-----+-----+--------------------------+------------------------+----------------+-----------------------+-------------+--------------+-------------+--------------+---------------+
|               Date|    Time|      Booking_Status|Customer_ID|Vehicle_Type|  Pickup_Location|Drop_Location|V_TAT|C_TAT|Canceled_Rides_by_Customer|Canceled_Rides_by_Driver|Incomplete_Rides|Incomplete_Rides_Reason|Booking_Value|Payment_Method|Ride_Distance|Driver_Ratings|Customer_Rating|
+-------------------+--------+--------------------+-----------+------------+-----------------+-------------+-----+-----+--------------------------+------------------------+----------------+-----------------------+-------------+--------------+-------------+--------------+---------------+
|2024-07-26 14:00:00|14:00:00|  Canceled by Driver|  CID713523| Prime Sedan|      Tumkur Road|     RT Nagar| NULL| NULL|                

# 4. Retirando Not Found

Para o próximo passo, se ira retirar todas as linhas onde se tem corridas com o `Booking_Status` é `Driver Not Found`, assim deixando apenas as colunas de corridas que foram realizadas.

In [4]:
df_silver = df_silver.filter(F.col("Booking_Status") != "Driver Not Found")
df_silver.show()

+-------------------+--------+--------------------+-----------+------------+-----------------+-------------+-----+-----+--------------------------+------------------------+----------------+-----------------------+-------------+--------------+-------------+--------------+---------------+
|               Date|    Time|      Booking_Status|Customer_ID|Vehicle_Type|  Pickup_Location|Drop_Location|V_TAT|C_TAT|Canceled_Rides_by_Customer|Canceled_Rides_by_Driver|Incomplete_Rides|Incomplete_Rides_Reason|Booking_Value|Payment_Method|Ride_Distance|Driver_Ratings|Customer_Rating|
+-------------------+--------+--------------------+-----------+------------+-----------------+-------------+-----+-----+--------------------------+------------------------+----------------+-----------------------+-------------+--------------+-------------+--------------+---------------+
|2024-07-26 14:00:00|14:00:00|  Canceled by Driver|  CID713523| Prime Sedan|      Tumkur Road|     RT Nagar| NULL| NULL|                

# 5. Padronizando Nomes

Para padronizar os nomes, todos eles serão colocados como minúsculo e algumas colunas serão renomeadas para se adequar ao banco onde serão carregadas.

As colunas a serem renomeadas para fazerem melhor sentido são:

* `v_tat` => `driver_time_accept`
* `c_tat` => `passenger_time_boarding`
* `canceled_rides_by_customer` => `canceled_by_customer`
* `canceled_rides_by_driver` => `canceled_by_driver`

In [5]:
df_silver = df_silver.select([column.lower() for column in df_silver.columns])


renames = {
    "v_tat": "driver_time_accept",
    "c_tat": "passenger_time_boarding",
    "canceled_rides_by_customer": "canceled_by_customer",
    "canceled_rides_by_driver": "canceled_by_driver"
}

for old, new in renames.items():
    df_silver = df_silver.withColumnRenamed(old, new)


df_silver.show()

+-------------------+--------+--------------------+-----------+------------+-----------------+-------------+------------------+-----------------------+--------------------+--------------------+----------------+-----------------------+-------------+--------------+-------------+--------------+---------------+
|               date|    time|      booking_status|customer_id|vehicle_type|  pickup_location|drop_location|driver_time_accept|passenger_time_boarding|canceled_by_customer|  canceled_by_driver|incomplete_rides|incomplete_rides_reason|booking_value|payment_method|ride_distance|driver_ratings|customer_rating|
+-------------------+--------+--------------------+-----------+------------+-----------------+-------------+------------------+-----------------------+--------------------+--------------------+----------------+-----------------------+-------------+--------------+-------------+--------------+---------------+
|2024-07-26 14:00:00|14:00:00|  Canceled by Driver|  CID713523| Prime Sed

# 6. Conversão de tipos

Para se adequar ao banco, o próximo passo é fazer as correções dos tipos para os tipos definidos.

In [6]:
df_silver = (
    df_silver
    .withColumn( "date",F.to_date(F.col("date").substr(1, 10), "yyyy-MM-dd"))
    .withColumn("time", F.col("time").cast(StringType())) # Fara o cast automatico ao subir par ao banco

    .withColumn("booking_status", F.col("booking_status").cast(StringType()))
    .withColumn("customer_id", F.col("customer_id").cast(StringType()))
    .withColumn("vehicle_type", F.col("vehicle_type").cast(StringType()))
    .withColumn("pickup_location", F.col("pickup_location").cast(StringType()))
    .withColumn("drop_location", F.col("drop_location").cast(StringType()))
    
    .withColumn(
        "canceled_by_customer",
        F.when(F.col("canceled_by_customer") == "NULL", None)
         .otherwise(F.col("canceled_by_customer"))
    )
    .withColumn(
        "canceled_by_driver",
        F.when(F.col("canceled_by_driver") == "NULL", None)
         .otherwise(F.col("canceled_by_driver"))
    )
    .withColumn(
        "incomplete_rides_reason",
        F.when(F.col("incomplete_rides_reason") == "NULL", None)
         .otherwise(F.col("incomplete_rides_reason"))
    )

    .withColumn("driver_time_accept", F.col("driver_time_accept").cast(IntegerType()))
    .withColumn("passenger_time_boarding", F.col("passenger_time_boarding").cast(IntegerType()))
    .withColumn("ride_distance", F.col("ride_distance").cast(IntegerType()))

    .withColumn(
        "incomplete_rides",
        F.when(F.col("incomplete_rides") == "Yes", True)
         .when(F.col("incomplete_rides") == "No", False)
         .otherwise(None)
    )

    .withColumn("booking_value", F.col("booking_value").cast(DecimalType(10, 2)))
    .withColumn("driver_ratings", F.col("driver_ratings").cast(DecimalType(3, 2)))
    .withColumn("customer_rating", F.col("customer_rating").cast(DecimalType(3, 2)))
)

df_silver.show()

+----------+--------+--------------------+-----------+------------+-----------------+-------------+------------------+-----------------------+--------------------+--------------------+----------------+-----------------------+-------------+--------------+-------------+--------------+---------------+
|      date|    time|      booking_status|customer_id|vehicle_type|  pickup_location|drop_location|driver_time_accept|passenger_time_boarding|canceled_by_customer|  canceled_by_driver|incomplete_rides|incomplete_rides_reason|booking_value|payment_method|ride_distance|driver_ratings|customer_rating|
+----------+--------+--------------------+-----------+------------+-----------------+-------------+------------------+-----------------------+--------------------+--------------------+----------------+-----------------------+-------------+--------------+-------------+--------------+---------------+
|2024-07-26|14:00:00|  Canceled by Driver|  CID713523| Prime Sedan|      Tumkur Road|     RT Nagar| 

# 7. Carregando os dados no banco

In [8]:
df_data = df_silver.select(
    "date",
    "time",
    "booking_status",
    "customer_id",
    "vehicle_type",
    "pickup_location",
    "drop_location",
    "driver_time_accept",
    "passenger_time_boarding",
    "canceled_by_customer",
    "canceled_by_driver",
    "incomplete_rides",
    "incomplete_rides_reason",
    "booking_value",
    "payment_method",
    "ride_distance",
    "driver_ratings",
    "customer_rating"
)

rows = (
    tuple(r) 
    for r in df_data.toLocalIterator()
)



# Conectando no Banco
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()

insert_sql = """
INSERT INTO silver.road_trips (
    date, time, booking_status, customer_id,
    vehicle_type, pickup_location, drop_location,
    driver_time_accept, passenger_time_boarding,
    canceled_by_customer, canceled_by_driver,
    incomplete_rides, incomplete_rides_reason,
    booking_value, payment_method, ride_distance,
    driver_ratings, customer_rating
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""

execute_batch(cur, insert_sql, rows, page_size=1000)
conn.commit()

cur.close()
conn.close()


[Stage 5:>                                                          (0 + 1) / 1]