In [3]:
!pip -q install "pyspark>=3.5.0" pyarrow fastparquet


[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.8 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.2/1.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━[0m [32m1.2/1.8 MB[0m [31m17.6 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m19.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [18]:

import os, urllib.request, glob

pasta_bruto = "/content/PipelineDadosSpark/dados/bruto"
os.makedirs(pasta_bruto, exist_ok=True)

base = "https://d37ci6vzurychx.cloudfront.net/trip-data"
arquivos = ["yellow_tripdata_2023-01.parquet"]

for a in arquivos:
    url = f"{base}/{a}"
    destino = f"{pasta_bruto}/{a}"
    if not os.path.exists(destino):
        print("Baixando:", url)
        urllib.request.urlretrieve(url, destino)

print("Arquivos brutos baixados:")
print(glob.glob(pasta_bruto + "/*.parquet"))


Baixando: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet
Arquivos brutos baixados:
['/content/PipelineDadosSpark/dados/bruto/yellow_tripdata_2023-01.parquet']


In [19]:
from pyspark.sql import SparkSession

sessao = (SparkSession.builder
          .appName("PipelineTaxiPTBR")
          .config("spark.sql.session.timeZone", "UTC")
          .config("spark.sql.shuffle.partitions", "200")
          .getOrCreate())

sessao.sparkContext.setLogLevel("WARN")
print("Versão Spark:", sessao.version)


Versão Spark: 3.5.1


In [20]:
caminho_bruto = "/content/PipelineDadosSpark/dados/bruto/*.parquet"
dados_brutos = sessao.read.parquet(caminho_bruto)

print("Linhas lidas (bruto):", dados_brutos.count())
dados_brutos.printSchema()
dados_brutos.show(5, truncate=False)


Linhas lidas (bruto): 3066766
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------

In [21]:
from pyspark.sql import functions as F, types as T

# ---- mapeamento de nomes (bruto -> português) ----
mapa_colunas = {
    "vendor_id": "id_fornecedor",
    "tpep_pickup_datetime": "coleta_datahora",
    "tpep_dropoff_datetime": "entrega_datahora",
    "passenger_count": "passageiros",
    "trip_distance": "distancia_km",
    "RatecodeID": "id_tarifa",
    "store_and_fwd_flag": "armazenar_encaminhar",
    "PULocationID": "id_local_coleta",
    "DOLocationID": "id_local_entrega",
    "payment_type": "tipo_pagamento",
    "fare_amount": "valor_corrida",
    "extra": "extra",
    "mta_tax": "taxa_mta",
    "tip_amount": "valor_gorjeta",
    "tolls_amount": "valor_pedagio",
    "improvement_surcharge": "sobretaxa_melhoria",
    "total_amount": "valor_total",
    "congestion_surcharge": "sobretaxa_congestao",
    "airport_fee":"taxa_aeroporto"
}

colunas_existentes = dados_brutos.columns
dados = dados_brutos
for antigo, novo in mapa_colunas.items():
    if antigo in colunas_existentes:
        dados = dados.withColumnRenamed(antigo, novo)

In [22]:
# ---- coerção de tipos importantes ----
if "coleta_datahora" in dados.columns:
    dados = dados.withColumn("coleta_datahora", F.to_timestamp("coleta_datahora"))
if "entrega_datahora" in dados.columns:
    dados = dados.withColumn("entrega_datahora", F.to_timestamp("entrega_datahora"))

In [23]:
# monetárias e numéricas
colunas_monetarias = ["valor_corrida","extra","taxa_mta","valor_gorjeta","valor_pedagio",
                      "sobretaxa_melhoria","sobretaxa_congestao","valor_total"]
for c in colunas_monetarias:
    if c in dados.columns:
        dados = dados.withColumn(c, F.col(c).cast("double"))

if "distancia_km" in dados.columns:
    dados = dados.withColumn("distancia_km", F.col("distancia_km").cast("double"))

if "passageiros" in dados.columns:
    dados = dados.withColumn("passageiros", F.col("passageiros").cast("int"))

In [24]:
# armazenar_encaminhar para 'SIM'/'NAO'
if "armazenar_encaminhar" in dados.columns:
    dados = dados.withColumn("armazenar_encaminhar", F.upper(F.trim(F.col("armazenar_encaminhar"))))
    dados = dados.withColumn(
        "armazenar_encaminhar",
        F.when(F.col("armazenar_encaminhar") == "Y", F.lit("SIM"))
         .when(F.col("armazenar_encaminhar") == "N", F.lit("NAO"))
         .otherwise(F.col("armazenar_encaminhar"))
    )

In [25]:
# ---- enriquecimento (ano/mes, duracao, velocidade) ----
if "coleta_datahora" in dados.columns:
    dados = dados.withColumn("ano", F.year("coleta_datahora"))
    dados = dados.withColumn("mes", F.month("coleta_datahora"))

if set(["coleta_datahora","entrega_datahora"]).issubset(set(dados.columns)):
    dados = dados.withColumn("duracao_minutos",
                             (F.unix_timestamp("entrega_datahora") - F.unix_timestamp("coleta_datahora"))/60.0)
    dados = dados.withColumn("duracao_minutos",
                             F.when(F.col("duracao_minutos") > 0, F.col("duracao_minutos")).otherwise(F.lit(None)))
    if "distancia_km" in dados.columns:
        dados = dados.withColumn(
            "velocidade_media_kmh",
            F.when(F.col("duracao_minutos") > 0,
                   F.col("distancia_km") / (F.col("duracao_minutos")/60.0)
            ).otherwise(F.lit(None))
        )


In [26]:
# tipo_pagamento descrição
if "tipo_pagamento" in dados.columns:
    dados = dados.withColumn("tipo_pagamento", F.col("tipo_pagamento").cast("int"))
    dados = (dados
        .withColumn("tipo_pagamento_descricao",
            F.when(F.col("tipo_pagamento")==1, F.lit("CartaoCredito"))
             .when(F.col("tipo_pagamento")==2, F.lit("Dinheiro"))
             .when(F.col("tipo_pagamento")==3, F.lit("SemCobranca"))
             .when(F.col("tipo_pagamento")==4, F.lit("Disputa"))
             .when(F.col("tipo_pagamento")==5, F.lit("Desconhecido"))
             .when(F.col("tipo_pagamento")==6, F.lit("Anulado"))
             .otherwise(F.lit("Outro"))
        )
    )

In [27]:
 #filtros de qualidade
linhas_iniciais = dados.count()

# distância plausível 0..200 km
if "distancia_km" in dados.columns:
    dados = dados.filter((F.col("distancia_km") >= 0) & (F.col("distancia_km") <= 200))

# valores monetários não-negativos
for c in [c for c in colunas_monetarias if c in dados.columns]:
    dados = dados.filter((F.col(c).isNull()) | (F.col(c) >= 0))

# duração plausível (até 24h)
if "duracao_minutos" in dados.columns:
    dados = dados.filter((F.col("duracao_minutos").isNull()) | (F.col("duracao_minutos") <= 24*60))


In [28]:
# deduplicação por chave composta
chaves = [c for c in ["coleta_datahora","entrega_datahora","id_fornecedor",
                      "id_local_coleta","id_local_entrega","valor_total"] if c in dados.columns]
linhas_pre_dedup = dados.count()
dados = dados.dropDuplicates(subset=chaves) if chaves else dados
linhas_pos_dedup = dados.count()

linhas_finais = dados.count()

print("Linhas iniciais:", linhas_iniciais)
print("Removidas por filtros:", linhas_iniciais - linhas_pre_dedup)
print("Duplicatas removidas:", linhas_pre_dedup - linhas_pos_dedup)
print("Linhas finais:", linhas_finais)

dados.select(*[c for c in ["coleta_datahora","entrega_datahora","passageiros",
                           "distancia_km","valor_total","tipo_pagamento","tipo_pagamento_descricao",
                           "ano","mes","duracao_minutos","velocidade_media_kmh"] if c in dados.columns])\
     .show(10, truncate=False)

Linhas iniciais: 3066766
Removidas por filtros: 25308
Duplicatas removidas: 1
Linhas finais: 3041457
+-------------------+-------------------+-----------+------------+-----------+--------------+------------------------+----+---+-------------------+--------------------+
|coleta_datahora    |entrega_datahora   |passageiros|distancia_km|valor_total|tipo_pagamento|tipo_pagamento_descricao|ano |mes|duracao_minutos    |velocidade_media_kmh|
+-------------------+-------------------+-----------+------------+-----------+--------------+------------------------+----+---+-------------------+--------------------+
|2023-01-03 12:48:35|2023-01-03 13:48:50|1          |0.0         |0.0        |1             |CartaoCredito           |2023|1  |60.25              |0.0                 |
|2023-01-05 14:32:39|2023-01-05 14:33:04|1          |0.0         |0.0        |1             |CartaoCredito           |2023|1  |0.4166666666666667 |0.0                 |
|2023-01-08 12:43:13|2023-01-08 12:43:24|1          |0

In [29]:
from pyspark.sql.functions import col

# 1. remover corridas sem distância e sem valor
dados_limpos = dados.filter(~((col("distancia_km") == 0) & (col("valor_total") == 0)))

# 2. (opcional) remover corridas com velocidade média impossível (> 200 km/h)
dados_limpos = dados_limpos.filter((col("velocidade_media_kmh") >= 0) & (col("velocidade_media_kmh") < 200))

# Ver como ficou
# dados_limpos.show(20, truncate=False)
dados_limpos.select(*[c for c in ["coleta_datahora","entrega_datahora","passageiros",
                           "distancia_km","valor_total","tipo_pagamento","tipo_pagamento_descricao",
                           "ano","mes","duracao_minutos","velocidade_media_kmh"] if c in dados.columns])\
     .show(20, truncate=False)


+-------------------+-------------------+-----------+------------+-----------+--------------+------------------------+----+---+------------------+--------------------+
|coleta_datahora    |entrega_datahora   |passageiros|distancia_km|valor_total|tipo_pagamento|tipo_pagamento_descricao|ano |mes|duracao_minutos   |velocidade_media_kmh|
+-------------------+-------------------+-----------+------------+-----------+--------------+------------------------+----+---+------------------+--------------------+
|2023-01-08 10:09:59|2023-01-08 10:47:12|1          |18.1        |0.0        |3             |SemCobranca             |2023|1  |37.21666666666667 |29.18047469771608   |
|2023-01-15 04:37:40|2023-01-15 04:45:40|1          |1.2         |0.0        |3             |SemCobranca             |2023|1  |8.0               |9.0                 |
|2023-01-18 18:00:23|2023-01-18 18:03:31|2          |0.7         |0.0        |1             |CartaoCredito           |2023|1  |3.1333333333333333|13.40425531914

In [30]:
dados_limpos.show(30, truncate=False)


+--------+-------------------+-------------------+-----------+------------+---------+--------------------+---------------+----------------+--------------+-------------+-----+--------+-------------+-------------+------------------+-----------+-------------------+--------------+----+---+------------------+--------------------+------------------------+
|VendorID|coleta_datahora    |entrega_datahora   |passageiros|distancia_km|id_tarifa|armazenar_encaminhar|id_local_coleta|id_local_entrega|tipo_pagamento|valor_corrida|extra|taxa_mta|valor_gorjeta|valor_pedagio|sobretaxa_melhoria|valor_total|sobretaxa_congestao|taxa_aeroporto|ano |mes|duracao_minutos   |velocidade_media_kmh|tipo_pagamento_descricao|
+--------+-------------------+-------------------+-----------+------------+---------+--------------------+---------------+----------------+--------------+-------------+-----+--------+-------------+-------------+------------------+-----------+-------------------+--------------+----+---+----------

In [31]:
pasta_saida_part = "/content/PipelineDadosSpark/dados/saida_csv_particionado"
(dados
 .write.mode("overwrite")
 .option("header", "true")
 .partitionBy("ano","mes")
 .csv(pasta_saida_part)
)
print("Gerado em:", pasta_saida_part)

Gerado em: /content/PipelineDadosSpark/dados/saida_csv_particionado


In [32]:
pasta_saida_unico = "/content/PipelineDadosSpark/dados/saida_csv_unico"
(dados
 .limit(200_000)
 .coalesce(1)
 .write.mode("overwrite")
 .option("header", "true")
 .csv(pasta_saida_unico)
)
print("Gerado em:", pasta_saida_unico)


Gerado em: /content/PipelineDadosSpark/dados/saida_csv_unico


In [1]:
from google.colab import auth
auth.authenticate_user()


In [33]:
!git config --global user.email "andre28ap@gmail.com"
!git config --global user.name "Andre2217"


In [36]:
%cd /content
!git clone https://github.com/Andre2217/PipelineDadosSpark.git

/content
fatal: destination path 'PipelineDadosSpark' already exists and is not an empty directory.


In [37]:
%cd /content/PipelineDadosSpark/


/content/PipelineDadosSpark


In [38]:
!git add .
!git commit -m "Primeira versão do Pipeline de Dados com Spark"
!git push origin main


On branch main
Your branch is ahead of 'origin/main' by 1 commit.
  (use "git push" to publish your local commits)

nothing to commit, working tree clean
fatal: could not read Username for 'https://github.com': No such device or address


In [39]:
!cd /content && zip -r pipeline_spark.zip PipelineDadosSpark/


  adding: PipelineDadosSpark/ (stored 0%)
  adding: PipelineDadosSpark/.git/ (stored 0%)
  adding: PipelineDadosSpark/.git/objects/ (stored 0%)
  adding: PipelineDadosSpark/.git/objects/16/ (stored 0%)
  adding: PipelineDadosSpark/.git/objects/16/6dc11abc7d46da146334c0ff62c711a04a0a92 (stored 0%)
  adding: PipelineDadosSpark/.git/objects/a9/ (stored 0%)
  adding: PipelineDadosSpark/.git/objects/a9/6a7e949e3822d4a587c3650715bf7796d13240 (stored 0%)
  adding: PipelineDadosSpark/.git/objects/a8/ (stored 0%)
  adding: PipelineDadosSpark/.git/objects/a8/905242681cd44295c8a0bd5854cb0ebf7d4e62 (stored 0%)
  adding: PipelineDadosSpark/.git/objects/b5/ (stored 0%)
  adding: PipelineDadosSpark/.git/objects/b5/426c22af763b49a179b3e4acaf17285d8c3aed (stored 0%)
  adding: PipelineDadosSpark/.git/objects/e3/ (stored 0%)
  adding: PipelineDadosSpark/.git/objects/e3/c78e2b274c0c8f94f5b8284491a235a57bcee9 (stored 0%)
  adding: PipelineDadosSpark/.git/objects/2c/ (stored 0%)
  adding: PipelineDadosSpark