In [7]:
from google.cloud.dataproc_spark_connect import DataprocSparkSession
import requests
from datetime import datetime, timedelta
import os
from google.cloud import storage
from google.cloud import bigquery
from pyspark.sql.functions import col

# Configuração para o Google Cloud Storage
project_id = "coastal-stream-464319-q3"
gcs_bucket = "teste_ifood"
gcs_path = f"gs://{gcs_bucket}/nyc_data/"

# Inicializar cliente Storage e BigQuery
storage_client = storage.Client(project=project_id)
bucket = storage_client.bucket(gcs_bucket)
bq_client = bigquery.Client(project=project_id)

# Diretório temporário local
temp_dir = "/tmp/nyc_data/"

# Criar o diretório temporário
if not os.path.exists(temp_dir):
    os.makedirs(temp_dir)
    print(f"Diretório temporário criado: {temp_dir}")

# Função para baixar arquivos
def download_file(url, local_path):
    try:
        response = requests.get(url)
        response.raise_for_status()
        with open(local_path, 'wb') as file:
            file.write(response.content)
        print(f"Arquivo baixado para {local_path}")
        return True
    except Exception as e:
        print(f"Erro ao baixar arquivo de {url}: {str(e)}")
        return False

# Função para fazer upload para o GCS
def upload_to_gcs(local_path, gcs_blob_name):
    try:
        blob = bucket.blob(gcs_blob_name)
        blob.upload_from_filename(local_path)
        print(f"Arquivo enviado para gs://{gcs_bucket}/{gcs_blob_name}")
        return f"gs://{gcs_bucket}/{gcs_blob_name}"
    except Exception as e:
        print(f"Erro ao fazer upload para GCS: {str(e)}")
        return None

# Inicializar Spark com suporte a Delta
print("Iniciando sessão Spark...")
spark = DataprocSparkSession.builder \
    .appName("NYC Taxi Data Delta") \
    .config("spark.jars", "gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.28.0.jar") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.2.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()
print("Sessão Spark iniciada")

# Parâmetros
base_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/"
file_names = ["yellow_tripdata_", "green_tripdata_"]
start_date = "2023-01"
end_date = "2023-05"

# Gerar lista de meses
start = datetime.strptime(start_date, "%Y-%m")
end = datetime.strptime(end_date, "%Y-%m")
months = []
current = start
while current <= end:
    months.append(current.strftime("%Y-%m"))
    current += timedelta(days=32)
    current = current.replace(day=1)

print(f"Processando meses: {', '.join(months)}")

# Dicionário para rastrear todos os DataFrames por tipo de táxi
taxi_dataframes = {}

# Loop para cada combinação de arquivo e mês
for file_name in file_names:
    # Extrair o nome da pasta removendo o underscore final
    folder_name = file_name.rstrip("_")
    print(f"\nProcessando arquivos {folder_name}...")

    # Lista para armazenar DataFrames deste tipo de táxi
    month_dataframes = []

    for month in months:
        file_url = f"{base_url}{file_name}{month}.parquet"
        local_file_path = f"{temp_dir}{file_name}{month}.parquet"
        gcs_temp_path = f"nyc_data/temp/{file_name}{month}.parquet"
        delta_path = f"{gcs_path}{folder_name}_delta/{month}"

        print(f"Baixando {file_name}{month}...")
        if download_file(file_url, local_file_path):
            # Fazer upload para o GCS temporariamente
            print(f"Enviando {file_name}{month} para o GCS...")
            gcs_uri = upload_to_gcs(local_file_path, gcs_temp_path)

            if gcs_uri:
                print(f"Lendo {file_name}{month} com Spark do GCS...")
                try:
                    # Ler o arquivo Parquet do GCS com Spark
                    df = spark.read.parquet(gcs_uri)

                    # Mostrar quantidade de registros e schema
                    count = df.count()
                    print(f"Arquivo {file_name}{month} contém {count} registros")

                    # Salvar como Delta Lake
                    print(f"Salvando {file_name}{month} como Delta Lake em {delta_path}...")
                    df.write \
                        .format("delta") \
                        .mode("overwrite") \
                        .save(delta_path)
                    print(f"✓ Arquivo {file_name}{month} salvo como Delta Lake em {delta_path}")

                    # Adicionar DataFrame à lista para consolidação
                    month_dataframes.append(df)

                except Exception as e:
                    print(f"✗ Erro ao processar {file_name}{month} com Spark: {str(e)}")
            else:
                print(f"✗ Falha no upload para GCS, pulando processamento")
        else:
            print(f"✗ Falha ao baixar {file_name}{month}, pulando processamento")

        # Remover arquivo local para economizar espaço
        if os.path.exists(local_file_path):
            os.remove(local_file_path)
            print(f"Arquivo local {local_file_path} removido")

    # Armazenar lista de DataFrames para este tipo de táxi
    taxi_dataframes[folder_name] = month_dataframes

# Criar dataset no BigQuery se não existir
dataset_id = f"{project_id}.nyc_taxi_data"
try:
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = "southamerica-east1"
    bq_client.create_dataset(dataset, exists_ok=True)
    print(f"Dataset {dataset_id} configurado com sucesso")
except Exception as e:
    print(f"Erro ao configurar dataset: {str(e)}")

# Consolidar e escrever tabelas para BigQuery
for taxi_type, dataframes in taxi_dataframes.items():
    if not dataframes:
        print(f"Nenhum dado disponível para {taxi_type}, pulando")
        continue

    print(f"\nConsolidando todos os meses para {taxi_type}...")

    # União de todos os DataFrames deste tipo
    consolidated_df = dataframes[0]
    for df in dataframes[1:]:
        consolidated_df = consolidated_df.union(df)

    # Mostrar contagem total
    total_count = consolidated_df.count()
    print(f"Total de registros para {taxi_type}: {total_count}")

    # Determinar colunas específicas para cada tipo de táxi
    if "yellow" in taxi_type:
        pickup_col = "tpep_pickup_datetime"
        dropoff_col = "tpep_dropoff_datetime"
    else:  # green
        pickup_col = "lpep_pickup_datetime" if "lpep_pickup_datetime" in consolidated_df.columns else "tpep_pickup_datetime"
        dropoff_col = "lpep_dropoff_datetime" if "lpep_dropoff_datetime" in consolidated_df.columns else "tpep_dropoff_datetime"

    # Selecionar apenas as colunas obrigatórias
    try:
        clean_df = consolidated_df.select(
            col("VendorID").cast("integer"),
            col("passenger_count").cast("float"),
            col("total_amount").cast("float"),
            col(pickup_col).alias("tpep_pickup_datetime").cast("timestamp"),
            col(dropoff_col).alias("tpep_dropoff_datetime").cast("timestamp")
        )

        # Mostrar amostra
        print(f"Amostra de dados consolidados para {taxi_type}:")
        clean_df.show(5)

        # Escrever para BigQuery
        table_name = f"{taxi_type}"
        bq_table = f"{project_id}.nyc_taxi_data.{table_name}"

        print(f"Escrevendo tabela consolidada para BigQuery: {bq_table}")
        clean_df.write \
            .format("bigquery") \
            .option("temporaryGcsBucket", gcs_bucket) \
            .option("table", bq_table) \
            .option("createDisposition", "CREATE_IF_NEEDED") \
            .option("writeDisposition", "WRITE_TRUNCATE") \
            .save()

        print(f"✓ Tabela {bq_table} criada com sucesso no BigQuery")

    except Exception as e:
        print(f"✗ Erro ao criar tabela BigQuery para {taxi_type}: {str(e)}")

# Encerrar a sessão Spark
spark.stop()
print("\nProcessamento completo! Todas as tabelas foram criadas no BigQuery.")

Iniciando sessão Spark...


█████████████████████████████████████████████████████████▌                      







Sessão Spark iniciada
Processando meses: 2023-01, 2023-02, 2023-03, 2023-04, 2023-05

Processando arquivos yellow_tripdata...
Baixando yellow_tripdata_2023-01...
Arquivo baixado para /tmp/nyc_data/yellow_tripdata_2023-01.parquet
Enviando yellow_tripdata_2023-01 para o GCS...
Arquivo enviado para gs://teste_ifood/nyc_data/temp/yellow_tripdata_2023-01.parquet
Lendo yellow_tripdata_2023-01 com Spark do GCS...


Arquivo yellow_tripdata_2023-01 contém 3066766 registros
Salvando yellow_tripdata_2023-01 como Delta Lake em gs://teste_ifood/nyc_data/yellow_tripdata_delta/2023-01...


✓ Arquivo yellow_tripdata_2023-01 salvo como Delta Lake em gs://teste_ifood/nyc_data/yellow_tripdata_delta/2023-01
Arquivo local /tmp/nyc_data/yellow_tripdata_2023-01.parquet removido
Baixando yellow_tripdata_2023-02...
Arquivo baixado para /tmp/nyc_data/yellow_tripdata_2023-02.parquet
Enviando yellow_tripdata_2023-02 para o GCS...
Arquivo enviado para gs://teste_ifood/nyc_data/temp/yellow_tripdata_2023-02.parquet
Lendo yellow_tripdata_2023-02 com Spark do GCS...


Arquivo yellow_tripdata_2023-02 contém 2913955 registros
Salvando yellow_tripdata_2023-02 como Delta Lake em gs://teste_ifood/nyc_data/yellow_tripdata_delta/2023-02...


✓ Arquivo yellow_tripdata_2023-02 salvo como Delta Lake em gs://teste_ifood/nyc_data/yellow_tripdata_delta/2023-02
Arquivo local /tmp/nyc_data/yellow_tripdata_2023-02.parquet removido
Baixando yellow_tripdata_2023-03...
Arquivo baixado para /tmp/nyc_data/yellow_tripdata_2023-03.parquet
Enviando yellow_tripdata_2023-03 para o GCS...
Arquivo enviado para gs://teste_ifood/nyc_data/temp/yellow_tripdata_2023-03.parquet
Lendo yellow_tripdata_2023-03 com Spark do GCS...


Arquivo yellow_tripdata_2023-03 contém 3403766 registros
Salvando yellow_tripdata_2023-03 como Delta Lake em gs://teste_ifood/nyc_data/yellow_tripdata_delta/2023-03...


✓ Arquivo yellow_tripdata_2023-03 salvo como Delta Lake em gs://teste_ifood/nyc_data/yellow_tripdata_delta/2023-03
Arquivo local /tmp/nyc_data/yellow_tripdata_2023-03.parquet removido
Baixando yellow_tripdata_2023-04...
Arquivo baixado para /tmp/nyc_data/yellow_tripdata_2023-04.parquet
Enviando yellow_tripdata_2023-04 para o GCS...
Arquivo enviado para gs://teste_ifood/nyc_data/temp/yellow_tripdata_2023-04.parquet
Lendo yellow_tripdata_2023-04 com Spark do GCS...


Arquivo yellow_tripdata_2023-04 contém 3288250 registros
Salvando yellow_tripdata_2023-04 como Delta Lake em gs://teste_ifood/nyc_data/yellow_tripdata_delta/2023-04...


✓ Arquivo yellow_tripdata_2023-04 salvo como Delta Lake em gs://teste_ifood/nyc_data/yellow_tripdata_delta/2023-04
Arquivo local /tmp/nyc_data/yellow_tripdata_2023-04.parquet removido
Baixando yellow_tripdata_2023-05...
Arquivo baixado para /tmp/nyc_data/yellow_tripdata_2023-05.parquet
Enviando yellow_tripdata_2023-05 para o GCS...
Arquivo enviado para gs://teste_ifood/nyc_data/temp/yellow_tripdata_2023-05.parquet
Lendo yellow_tripdata_2023-05 com Spark do GCS...


Arquivo yellow_tripdata_2023-05 contém 3513649 registros
Salvando yellow_tripdata_2023-05 como Delta Lake em gs://teste_ifood/nyc_data/yellow_tripdata_delta/2023-05...


✓ Arquivo yellow_tripdata_2023-05 salvo como Delta Lake em gs://teste_ifood/nyc_data/yellow_tripdata_delta/2023-05
Arquivo local /tmp/nyc_data/yellow_tripdata_2023-05.parquet removido

Processando arquivos green_tripdata...
Baixando green_tripdata_2023-01...
Arquivo baixado para /tmp/nyc_data/green_tripdata_2023-01.parquet
Enviando green_tripdata_2023-01 para o GCS...
Arquivo enviado para gs://teste_ifood/nyc_data/temp/green_tripdata_2023-01.parquet
Lendo green_tripdata_2023-01 com Spark do GCS...


Arquivo green_tripdata_2023-01 contém 68211 registros
Salvando green_tripdata_2023-01 como Delta Lake em gs://teste_ifood/nyc_data/green_tripdata_delta/2023-01...


✓ Arquivo green_tripdata_2023-01 salvo como Delta Lake em gs://teste_ifood/nyc_data/green_tripdata_delta/2023-01
Arquivo local /tmp/nyc_data/green_tripdata_2023-01.parquet removido
Baixando green_tripdata_2023-02...
Arquivo baixado para /tmp/nyc_data/green_tripdata_2023-02.parquet
Enviando green_tripdata_2023-02 para o GCS...
Arquivo enviado para gs://teste_ifood/nyc_data/temp/green_tripdata_2023-02.parquet
Lendo green_tripdata_2023-02 com Spark do GCS...


Arquivo green_tripdata_2023-02 contém 64809 registros
Salvando green_tripdata_2023-02 como Delta Lake em gs://teste_ifood/nyc_data/green_tripdata_delta/2023-02...


✓ Arquivo green_tripdata_2023-02 salvo como Delta Lake em gs://teste_ifood/nyc_data/green_tripdata_delta/2023-02
Arquivo local /tmp/nyc_data/green_tripdata_2023-02.parquet removido
Baixando green_tripdata_2023-03...
Arquivo baixado para /tmp/nyc_data/green_tripdata_2023-03.parquet
Enviando green_tripdata_2023-03 para o GCS...
Arquivo enviado para gs://teste_ifood/nyc_data/temp/green_tripdata_2023-03.parquet
Lendo green_tripdata_2023-03 com Spark do GCS...


Arquivo green_tripdata_2023-03 contém 72044 registros
Salvando green_tripdata_2023-03 como Delta Lake em gs://teste_ifood/nyc_data/green_tripdata_delta/2023-03...


✓ Arquivo green_tripdata_2023-03 salvo como Delta Lake em gs://teste_ifood/nyc_data/green_tripdata_delta/2023-03
Arquivo local /tmp/nyc_data/green_tripdata_2023-03.parquet removido
Baixando green_tripdata_2023-04...
Arquivo baixado para /tmp/nyc_data/green_tripdata_2023-04.parquet
Enviando green_tripdata_2023-04 para o GCS...
Arquivo enviado para gs://teste_ifood/nyc_data/temp/green_tripdata_2023-04.parquet
Lendo green_tripdata_2023-04 com Spark do GCS...


Arquivo green_tripdata_2023-04 contém 65392 registros
Salvando green_tripdata_2023-04 como Delta Lake em gs://teste_ifood/nyc_data/green_tripdata_delta/2023-04...


✓ Arquivo green_tripdata_2023-04 salvo como Delta Lake em gs://teste_ifood/nyc_data/green_tripdata_delta/2023-04
Arquivo local /tmp/nyc_data/green_tripdata_2023-04.parquet removido
Baixando green_tripdata_2023-05...
Arquivo baixado para /tmp/nyc_data/green_tripdata_2023-05.parquet
Enviando green_tripdata_2023-05 para o GCS...
Arquivo enviado para gs://teste_ifood/nyc_data/temp/green_tripdata_2023-05.parquet
Lendo green_tripdata_2023-05 com Spark do GCS...


Arquivo green_tripdata_2023-05 contém 69174 registros
Salvando green_tripdata_2023-05 como Delta Lake em gs://teste_ifood/nyc_data/green_tripdata_delta/2023-05...


✓ Arquivo green_tripdata_2023-05 salvo como Delta Lake em gs://teste_ifood/nyc_data/green_tripdata_delta/2023-05
Arquivo local /tmp/nyc_data/green_tripdata_2023-05.parquet removido
Dataset coastal-stream-464319-q3.nyc_taxi_data configurado com sucesso

Consolidando todos os meses para yellow_tripdata...


Total de registros para yellow_tripdata: 16186386
Amostra de dados consolidados para yellow_tripdata:


+--------+---------------+------------+--------------------+---------------------+
|VendorID|passenger_count|total_amount|tpep_pickup_datetime|tpep_dropoff_datetime|
+--------+---------------+------------+--------------------+---------------------+
|       2|            1.0|        14.3| 2023-01-01 00:32:10|  2023-01-01 00:40:36|
|       2|            1.0|        16.9| 2023-01-01 00:55:08|  2023-01-01 01:01:27|
|       2|            1.0|        34.9| 2023-01-01 00:25:04|  2023-01-01 00:37:49|
|       1|            0.0|       20.85| 2023-01-01 00:03:48|  2023-01-01 00:13:25|
|       2|            1.0|       19.68| 2023-01-01 00:10:29|  2023-01-01 00:21:19|
+--------+---------------+------------+--------------------+---------------------+
only showing top 5 rows

Escrevendo tabela consolidada para BigQuery: coastal-stream-464319-q3.nyc_taxi_data.yellow_tripdata


✓ Tabela coastal-stream-464319-q3.nyc_taxi_data.yellow_tripdata criada com sucesso no BigQuery

Consolidando todos os meses para green_tripdata...


Total de registros para green_tripdata: 339630
Amostra de dados consolidados para green_tripdata:


+--------+---------------+------------+--------------------+---------------------+
|VendorID|passenger_count|total_amount|tpep_pickup_datetime|tpep_dropoff_datetime|
+--------+---------------+------------+--------------------+---------------------+
|       2|            1.0|       24.18| 2023-01-01 00:26:10|  2023-01-01 00:37:11|
|       2|            1.0|       15.84| 2023-01-01 00:51:03|  2023-01-01 00:57:49|
|       2|            1.0|       11.64| 2023-01-01 00:35:12|  2023-01-01 00:41:32|
|       1|            1.0|        10.2| 2023-01-01 00:13:14|  2023-01-01 00:19:03|
|       1|            1.0|         8.0| 2023-01-01 00:33:04|  2023-01-01 00:39:02|
+--------+---------------+------------+--------------------+---------------------+
only showing top 5 rows

Escrevendo tabela consolidada para BigQuery: coastal-stream-464319-q3.nyc_taxi_data.green_tripdata


✓ Tabela coastal-stream-464319-q3.nyc_taxi_data.green_tripdata criada com sucesso no BigQuery

Processamento completo! Todas as tabelas foram criadas no BigQuery.
