In [15]:
from pyspark.sql import SparkSession, functions as F

SOURCE_BUCKET = "s3://bucket-bronze-layer-241963575180"

spark = SparkSession.builder.appName("Exploração de Dados - Camada Bronze").getOrCreate()


# Extração

In [16]:
df_yellow = spark.read.parquet(SOURCE_BUCKET + "/nyc_taxi_data_yellow")
df_yellow.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+
|vendorid|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|ratecodeid|store_and_fwd_flag|pulocationid|dolocationid|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|  data_hora_ingestao|ano_mes_referencia|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------------+------------------+
|       2| 2023-03-01 00:06:43|  2023-03-01 00:16:43|              

In [17]:
df_green = spark.read.parquet(SOURCE_BUCKET + "/nyc_taxi_data_green")
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+
|vendorid|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|ratecodeid|pulocationid|dolocationid|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|  data_hora_ingestao|ano_mes_referencia|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+--------------------+------------------+
|       2| 2023-03-01 00:25:10|  2023-03-01

In [18]:
df_yellow_tranformed = df_yellow.selectExpr(
    "vendorid as id_fornecedor",
    "passenger_count as quantidade_passageiros",
    "total_amount as valor_corrida",
    "tpep_pickup_datetime as data_hora_embarque",
    "tpep_dropoff_datetime as data_hora_desembarque",
    "payment_type as id_tipo_pagamento", 
    "ano_mes_referencia",
    '"YELLOW" as tipo_servico',
)

df_green_tranformed = df_green.selectExpr(
    "vendorid as id_fornecedor",
    "passenger_count as quantidade_passageiros",
    "total_amount as valor_corrida",
    "lpep_pickup_datetime as data_hora_embarque",
    "lpep_dropoff_datetime as data_hora_desembarque",
    "payment_type as id_tipo_pagamento", 
    "ano_mes_referencia",
    '"GREEN" as tipo_servico',
)

df = df_yellow_tranformed.union(df_green_tranformed)

df = (
    df.withColumn(
        "nome_fornecedor", 
        F.when(F.col("id_fornecedor") == 1, "Creative Mobile Technologies, LLC")
        .when(F.col("id_fornecedor") == 2, "Curb Mobility, LLC")
        .when(F.col("id_fornecedor") == 6, "Myle Technologies Inc")
        .when(F.col("id_fornecedor") == 7, "Helix")
        .otherwise("FORNECEDOR NÃO IDENTIFICADO")
    )
)

df = (
    df.withColumn(
        "descricao_tipo_pagamento", 
        F.when(F.col("id_tipo_pagamento") == 0, "Viagem com tarifa flexível")
        .when(F.col("id_tipo_pagamento") == 1, "Cartão de crédito")
        .when(F.col("id_tipo_pagamento") == 2, "Dinheiro")
        .when(F.col("id_tipo_pagamento") == 3, "Sem cobrança")
        .when(F.col("id_tipo_pagamento") == 4, "Contestação")
        .when(F.col("id_tipo_pagamento") == 5, "Desconhecido")
        .when(F.col("id_tipo_pagamento") == 6, "Viagem cancelada")
        .otherwise("TIPO DE PAGAMENTO NÃO IDENTIFICADO")
    )
)

df = df.withColumn("indicador_cancelamento", F.when(F.col("id_tipo_pagamento") == 6, 'S').otherwise('N'))
df = df.withColumn("id", F.expr("uuid()"))

df_viagem_taxi_ny = df.select(
    "id",
    "id_fornecedor",
    "nome_fornecedor",
    "quantidade_passageiros",
    "valor_corrida",
    "data_hora_embarque",
    "data_hora_desembarque",
    "indicador_cancelamento",
    "id_tipo_pagamento",
    "descricao_tipo_pagamento",
    "tipo_servico",
    "ano_mes_referencia",
)



df_viagem_taxi_ny.show()




+--------------------+-------------+--------------------+----------------------+-------------+-------------------+---------------------+----------------------+-----------------+------------------------+------------+------------------+
|                  id|id_fornecedor|     nome_fornecedor|quantidade_passageiros|valor_corrida| data_hora_embarque|data_hora_desembarque|indicador_cancelamento|id_tipo_pagamento|descricao_tipo_pagamento|tipo_servico|ano_mes_referencia|
+--------------------+-------------+--------------------+----------------------+-------------+-------------------+---------------------+----------------------+-----------------+------------------------+------------+------------------+
|c2bc1f53-bd81-4c8...|            2|  Curb Mobility, LLC|                     1|         11.1|2023-03-01 00:06:43|  2023-03-01 00:16:43|                     N|                2|                Dinheiro|      YELLOW|           2023-03|
|17356336-5cfa-4f4...|            2|  Curb Mobility, LLC|   

In [19]:
"""
    Qual a média de valor total (total_amount) recebido em um mês
    considerando todos os yellow táxis da frota?
"""


_ = (
    df_viagem_taxi_ny
    .filter(F.col("tipo_servico") == "YELLOW")
    .groupBy("ano_mes_referencia")
    .agg(F.round(F.avg("valor_corrida"), 2).alias("media_valor_corrida"))
    .orderBy("ano_mes_referencia")
    .show()
)


+------------------+-------------------+
|ano_mes_referencia|media_valor_corrida|
+------------------+-------------------+
|           2023-01|              27.02|
|           2023-02|               26.9|
|           2023-03|               27.8|
|           2023-04|              28.27|
+------------------+-------------------+



In [20]:
"""
Qual a média de passageiros (passenger_count) por cada hora do dia
que pegaram táxi no mês de maio considerando todos os táxis da
frota?
"""

_ = (
    df_viagem_taxi_ny
    .filter(F.col("ano_mes_referencia") == "2023-04")
    .withColumn("hora_embarque", F.hour(F.col("data_hora_embarque")))
    .groupBy("hora_embarque")
    .agg(F.round(F.avg("quantidade_passageiros"), 2).alias("media_passageiros"))
    .orderBy("hora_embarque")
    .show(24)
)

+-------------+-----------------+
|hora_embarque|media_passageiros|
+-------------+-----------------+
|            0|             1.42|
|            1|             1.43|
|            2|             1.44|
|            3|             1.45|
|            4|             1.39|
|            5|             1.27|
|            6|             1.25|
|            7|             1.26|
|            8|             1.27|
|            9|              1.3|
|           10|             1.34|
|           11|             1.36|
|           12|             1.37|
|           13|             1.38|
|           14|             1.39|
|           15|              1.4|
|           16|              1.4|
|           17|             1.39|
|           18|             1.38|
|           19|             1.39|
|           20|              1.4|
|           21|             1.42|
|           22|             1.43|
|           23|             1.42|
+-------------+-----------------+

