In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MinIO Test") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

df = spark.read.csv(
    "s3a://trabalho/2023_For_Hire_Vehicles_Trip_Data_20251206.csv",
    header=True,
    inferSchema=True
)

df.show(5)


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/07 16:48:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/07 16:48:44 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

+--------------------+--------------------+--------------------+------------+------------+-------+----------------------+
|dispatching_base_num|     pickup_datetime|    dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+--------------------+--------------------+------------+------------+-------+----------------------+
|              B00001|03/01/2023 12:24:...|03/01/2023 02:35:...|        NULL|        NULL|   NULL|                B00001|
|              B00008|01/01/2023 12:30:...|01/01/2023 01:00:...|        NULL|        NULL|   NULL|                B00008|
|              B00078|01/01/2023 12:01:...|01/01/2023 03:15:...|        NULL|        NULL|   NULL|                B00078|
|              B00111|01/01/2023 12:30:...|01/01/2023 01:05:...|        NULL|        NULL|   NULL|                B03406|
|              B00112|01/01/2023 12:34:...|01/01/2023 12:52:...|        NULL|          14|   NULL|                B00112|
+--------------------+--

In [3]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType, TimestampType

# ================================================
# 1. Padronizar nomes das colunas (snake_case)
# ================================================
df_clean = df.select(
    F.col("dispatching_base_num").alias("dispatching_base_num"),
    F.col("pickup_datetime").alias("pickup_datetime"),
    F.col("dropOff_datetime").alias("dropoff_datetime"),
    F.col("PUlocationID").alias("pu_location_id"),
    F.col("DOlocationID").alias("do_location_id"),
    F.col("SR_Flag").alias("sr_flag"),
    F.col("Affiliated_base_number").alias("affiliated_base_number")
)

# ================================================
# 2. Converter tipos corretamente
# ================================================

df_clean = (
    df_clean
    .withColumn("pickup_datetime",
                F.to_timestamp("pickup_datetime", "MM/dd/yyyy hh:mm:ss a"))
    .withColumn("dropoff_datetime",
                F.to_timestamp("dropoff_datetime", "MM/dd/yyyy hh:mm:ss a"))
)


# ================================================
# 3. Criar novas features úteis
# ================================================

df_clean = (
    df_clean
    # duração da viagem (minutos)
    .withColumn(
        "trip_duration_min",
        (F.unix_timestamp("dropoff_datetime") - 
         F.unix_timestamp("pickup_datetime")) / 60
    )
    # Ano / mês / dia / hora
    .withColumn("pickup_date", F.to_date("pickup_datetime"))
    .withColumn("pickup_year", F.year("pickup_datetime"))
    .withColumn("pickup_month", F.month("pickup_datetime"))
    .withColumn("pickup_hour", F.hour("pickup_datetime"))
    # Indicador se a viagem teve zona de origem ou destino faltando
    .withColumn("missing_pu_location", F.col("pu_location_id").isNull())
    .withColumn("missing_do_location", F.col("do_location_id").isNull())
)

# ================================================
# 4. Remover registros inconsistentes
# ================================================
df_clean = df_clean.filter(
    (F.col("dropoff_datetime") > F.col("pickup_datetime")) & 
    (F.col("trip_duration_min") < 720)  # 12h limite
)

# ================================================
# 5. Remover duplicatas
# ================================================
df_clean = df_clean.dropDuplicates()

# ================================================
# 6. Resultado final
# ================================================
df_clean.printSchema()
df_clean.show(5)


root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- pu_location_id: integer (nullable = true)
 |-- do_location_id: integer (nullable = true)
 |-- sr_flag: string (nullable = true)
 |-- affiliated_base_number: string (nullable = true)
 |-- trip_duration_min: double (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- pickup_year: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- missing_pu_location: boolean (nullable = false)
 |-- missing_do_location: boolean (nullable = false)



[Stage 6:>                                                          (0 + 1) / 1]

+--------------------+-------------------+-------------------+--------------+--------------+-------+----------------------+------------------+-----------+-----------+------------+-----------+-------------------+-------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|pu_location_id|do_location_id|sr_flag|affiliated_base_number| trip_duration_min|pickup_date|pickup_year|pickup_month|pickup_hour|missing_pu_location|missing_do_location|
+--------------------+-------------------+-------------------+--------------+--------------+-------+----------------------+------------------+-----------+-----------+------------+-----------+-------------------+-------------------+
|              B00856|2023-01-01 00:53:48|2023-01-01 01:02:40|          NULL|            76|   NULL|                B03436| 8.866666666666667| 2023-01-01|       2023|           1|          0|               true|              false|
|              B01843|2023-01-01 00:17:00|2023-01-01 00:27:00|          

                                                                                

In [4]:
df_clean.write.mode("overwrite").parquet(
    "s3a://trabalho/limpo/for_hire_2023/"
)


                                                                                