# Processamento dos dados de viagens de t√°xis de Nova York - dados da NYC

* Limpeza e padroniza√ß√£o dos dados
* Enriquecimento: inclus√£o de colunas de metadados - ve√≠culo, ano, m√™s, nome arquivo
* Manuten√ß√£o de todas as colunas: permitir an√°lises futuras
* Manuten√ß√£o de todos os ve√≠culos: permitir an√°lises futuras
* Particionamento ano e m√™s: otimiza√ß√£o de salvamento e consulta

## Camada silver

## Bibliotecas

In [0]:
from pyspark.sql.functions import lit, input_file_name, to_timestamp, col, regexp_extract, when
from pyspark.sql.types import *
from datetime import datetime
from dateutil.relativedelta import relativedelta
from dateutil.rrule import rrule, MONTHLY
from pyspark.sql.types import IntegerType, DoubleType

## Par√¢metros

In [0]:
# Par√¢metros de entrada
dbutils.widgets.dropdown("vehicle_type", "yellow", ["yellow", "green", "fhv", "fhvhv"], "Tipo de Ve√≠culo")
dbutils.widgets.text("start_date", "2023-01", "Data In√≠cio (YYYY-MM)")
dbutils.widgets.text("end_date", "2023-01", "Data Fim (YYYY-MM)")

In [0]:
# Obter par√¢metros
vehicle_type = dbutils.widgets.get("vehicle_type")
start_date = dbutils.widgets.get("start_date")
end_date = dbutils.widgets.get("end_date")

print(f"Processando {vehicle_type} t√°xis de {start_date} a {end_date}")

Processando fhvhv t√°xis de 2023-01 a 2023-05


## Caminhos

In [0]:
# Caminhos Bronze (origem) e Silver (destino)
bronze_path = f"/case_ifood_nyc/bronze/nyc_taxi/{vehicle_type}"
silver_path = f"/case_ifood_nyc/silver/nyc_taxi_delta/{vehicle_type}"  # Novo caminho Delta

# Criar diret√≥rio Silver se n√£o existir
dbutils.fs.mkdirs(silver_path)

Out[212]: True

## Fun√ß√µes

In [0]:
# Fun√ß√£o para verificar parti√ß√µes
def partition_exists(path):
    try:
        return len(dbutils.fs.ls(path)) > 0
    except Exception:
        return False

In [0]:
# Fun√ß√£o de Processamento
def process_partition(vehicle, year_month):
    try:
        year, month = year_month.split("-")
        print(f"\nProcessando {year_month}...")
        
        # 1. Ler dados da Bronze
        df = spark.read.parquet(f"{bronze_path}/year={year}/month={month}")
        
        # 2. Normaliza√ß√£o de campos
        numeric_cols = [f.name for f in df.schema.fields if str(f.dataType) in ['IntegerType', 'DoubleType', 'LongType']]
        for col_name in numeric_cols:
            df = df.withColumn(col_name, col(col_name).cast("string").cast("double"))
        
        # 3. Adicionar metadados
        df = (df.withColumn("meta_vehicle_type", lit(vehicle))
              .withColumn("meta_source_file", input_file_name())
              .withColumn("meta_file_year", lit(int(year)))
              .withColumn("meta_file_month", lit(int(month))))
        
        # 4. Limpeza espec√≠fica para yellow taxi
        if vehicle == "yellow":
            df = (df.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime")))
                  .withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))
                  .withColumn("passenger_count", 
                             when(col("passenger_count") < 1, 1)
                             .otherwise(col("passenger_count"))))
        
        # 5. Salvar em parti√ß√£o separada
        partition_path = f"{silver_path}/year={year}/month={month}"
        (df.write
           .format("delta")
           .mode("overwrite")
           .save(partition_path))
        
        print(f"‚úÖ Dados salvos em: {partition_path}")
        return True
        
    except Exception as e:
        print(f"‚ùå Erro ao processar {year_month}: {str(e)}")
        return False

## Processamento

In [0]:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
dbutils.fs.rm(silver_path, recurse=True)

start = datetime.strptime(start_date, "%Y-%m")
end = datetime.strptime(end_date, "%Y-%m")
success_count = 0

months_to_process = [dt.strftime("%Y-%m") for dt in rrule(MONTHLY, dtstart=start, until=end)]

for year_month in months_to_process:
    if process_partition(vehicle_type, year_month):
        success_count += 1

# Cria√ß√£o da View Unificada
processed_partitions = []
for year_month in months_to_process:
    year, month = year_month.split("-")
    if partition_exists(f"{silver_path}/year={year}/month={month}"):
        processed_partitions.append(f"SELECT * FROM delta.`{silver_path}/year={year}/month={month}`")


Processando 2023-01...
‚úÖ Dados salvos em: /case_ifood_nyc/silver/nyc_taxi_delta/fhvhv/year=2023/month=01

Processando 2023-02...
‚úÖ Dados salvos em: /case_ifood_nyc/silver/nyc_taxi_delta/fhvhv/year=2023/month=02

Processando 2023-03...
‚úÖ Dados salvos em: /case_ifood_nyc/silver/nyc_taxi_delta/fhvhv/year=2023/month=03

Processando 2023-04...
‚úÖ Dados salvos em: /case_ifood_nyc/silver/nyc_taxi_delta/fhvhv/year=2023/month=04

Processando 2023-05...
‚úÖ Dados salvos em: /case_ifood_nyc/silver/nyc_taxi_delta/fhvhv/year=2023/month=05


In [0]:
if processed_partitions:
    union_all_query = " UNION ALL ".join(processed_partitions)
    
    try:
        spark.sql(f"""
        CREATE OR REPLACE VIEW case_ifood_nyc_taxi.silver_{vehicle_type}_taxi_view AS
        {union_all_query}
        """)
        
        print("\n‚úÖ View unificada criada com sucesso!")
        print(f"üîç Verifique os dados com: spark.table('case_ifood_nyc_taxi.silver_{vehicle_type}_taxi_view')")
        
        # An√°lise final
        print("\nüìä Resumo do processamento:")
        display(spark.sql(f"""
        SELECT 
            meta_file_year,
            meta_file_month, 
            COUNT(*) as total_records
        FROM case_ifood_nyc_taxi.silver_{vehicle_type}_taxi_view
        GROUP BY meta_file_year, meta_file_month
        ORDER BY meta_file_year, meta_file_month
        """))
        
    except Exception as e:
        print(f"\n‚ö†Ô∏è Erro ao criar view: {str(e)}")
        print("\nüí° Acesso alternativo por parti√ß√£o:")
        for part in processed_partitions:
            print(part.replace("SELECT * FROM delta.`", "spark.read.format('delta').load('").replace("`", "'"))
else:
    print("\n‚ùå Nenhuma parti√ß√£o foi processada com sucesso.")

print(f"\nProcessamento conclu√≠do. {success_count}/{len(months_to_process)} meses processados.")


‚úÖ View unificada criada com sucesso!
üîç Verifique os dados com: spark.table('case_ifood_nyc_taxi.silver_fhvhv_taxi_view')

üìä Resumo do processamento:


meta_file_year,meta_file_month,total_records
2023,1,18479031
2023,2,17960971
2023,3,20413539
2023,4,19144903
2023,5,19847676



Processamento conclu√≠do. 5/5 meses processados.
