# Carga dos dados na camada Bronze

In [0]:
#Parametro de carga 
dbutils.widgets.text("Ano", "","")
dbutils.widgets.get("Ano")
ParamAno = getArgument("Ano")
ParamAno = str(ParamAno).split(',')

#Parametro de carga 
dbutils.widgets.text("Mês", "","")
dbutils.widgets.get("Mês")
ParamMes= getArgument("Mês")
ParamMes = str(ParamMes).split(',')

In [0]:
import requests
import os
import pandas as pd
from pyspark.sql.functions import current_date, current_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType

schema = StructType([
    StructField("VendorID", IntegerType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", DoubleType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("RatecodeID", DoubleType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("congestion_surcharge", DoubleType(), True),
])


anos = ParamAno
meses = ParamMes

local_path = "/mnt/stage/bronze"
for ano in anos:
    for mes in meses:
        url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{ano}-{mes}.parquet"
        file_name = f"yellow_tripdata_{ano}-{mes}.parquet"        

        sink_path = f"/mnt/stage/bronze/{ano}/{mes}/"       
        df_pd = pd.read_parquet(url)
        df = spark.createDataFrame(df_pd, schema=schema)

        # Adiciona colunas de carga
        df_bronze = df.withColumn("dt_carga", current_date()) \
                      .withColumn("ts_carga", current_timestamp())
        
        # Salva o DataFrame 
        df_bronze.write.mode('append').format("delta").saveAsTable("table_bronze")        

In [0]:
%sql
select * from table_bronze

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,dt_carga,ts_carga
2,2023-03-01T00:06:43Z,2023-03-01T00:16:43Z,1.0,0.0,1.0,N,238,42,2,8.6,1.0,0.5,0.0,0.0,1.0,11.1,0.0,2025-10-01,2025-10-01T15:24:49.8Z
2,2023-03-01T00:08:25Z,2023-03-01T00:39:30Z,2.0,12.4,1.0,N,138,231,1,52.7,6.0,0.5,12.54,0.0,1.0,76.49,2.5,2025-10-01,2025-10-01T15:24:49.8Z
1,2023-03-01T00:15:04Z,2023-03-01T00:29:26Z,0.0,3.3,1.0,N,140,186,1,18.4,3.5,0.5,4.65,0.0,1.0,28.05,2.5,2025-10-01,2025-10-01T15:24:49.8Z
1,2023-03-01T00:49:37Z,2023-03-01T01:01:05Z,1.0,2.9,1.0,N,140,43,1,15.6,3.5,0.5,4.1,0.0,1.0,24.7,2.5,2025-10-01,2025-10-01T15:24:49.8Z
2,2023-03-01T00:08:04Z,2023-03-01T00:11:06Z,1.0,1.23,1.0,N,79,137,1,7.2,1.0,0.5,2.44,0.0,1.0,14.64,2.5,2025-10-01,2025-10-01T15:24:49.8Z
1,2023-03-01T00:09:09Z,2023-03-01T00:17:34Z,1.0,1.2,1.0,N,162,137,1,10.0,3.5,0.5,3.0,0.0,1.0,18.0,2.5,2025-10-01,2025-10-01T15:24:49.8Z
1,2023-03-01T00:32:21Z,2023-03-01T00:42:08Z,1.0,1.8,1.0,N,170,48,1,12.1,3.5,0.5,3.4,0.0,1.0,20.5,2.5,2025-10-01,2025-10-01T15:24:49.8Z
1,2023-03-01T00:45:12Z,2023-03-01T00:52:37Z,1.0,2.0,1.0,N,48,164,2,10.7,3.5,0.5,0.0,0.0,1.0,15.7,2.5,2025-10-01,2025-10-01T15:24:49.8Z
1,2023-03-01T00:19:43Z,2023-03-01T00:39:37Z,1.0,5.3,1.0,N,113,61,1,26.1,3.5,0.5,9.3,0.0,1.0,40.4,2.5,2025-10-01,2025-10-01T15:24:49.8Z
2,2023-03-01T00:08:42Z,2023-03-01T00:18:45Z,1.0,2.27,1.0,N,239,263,1,13.5,1.0,0.5,3.7,0.0,1.0,22.2,2.5,2025-10-01,2025-10-01T15:24:49.8Z
