In [0]:
from pyspark.sql import SparkSession

# Leitura do arquivo CSV com dados de transporte
df = spark.read \
    .option("inferSchema", "false") \
    .option("header", "false") \
    .option("delimiter", ";") \
    .csv("/FileStore/tables/info_transportes.csv")

# Exibe as primeiras linhas para conferência
df.show(5)


+----------------+----------------+---------+------------+-----------+---------+-----------+
|             _c0|             _c1|      _c2|         _c3|        _c4|      _c5|        _c6|
+----------------+----------------+---------+------------+-----------+---------+-----------+
|     DATA_INICIO|        DATA_FIM|CATEGORIA|LOCAL_INICIO|  LOCAL_FIM|DISTANCIA|  PROPOSITO|
|01-01-2016 21:11|01-01-2016 21:17|  Negocio| Fort Pierce|Fort Pierce|       51|Alimentação|
|01-02-2016 01:25|01-02-2016 01:37|  Negocio| Fort Pierce|Fort Pierce|        5|       NULL|
|01-02-2016 20:25|01-02-2016 20:38|  Negocio| Fort Pierce|Fort Pierce|       48|   Entregas|
|01-05-2016 17:31|01-05-2016 17:45|  Negocio| Fort Pierce|Fort Pierce|       47|    Reunião|
+----------------+----------------+---------+------------+-----------+---------+-----------+
only showing top 5 rows



In [0]:
df = df.selectExpr(
    "_c0 as DATA_INICIO",
    "_c1 as DATA_FIM",
    "_c2 as CATEGORIA",
    "_c3 as LOCAL_INICIO",
    "_c4 as LOCAL_FIM",
    "_c5 as DISTANCIA",
    "_c6 as PROPOSITO"
)

In [0]:
from pyspark.sql.functions import to_date, col

# Ajustando tipo da coluna DISTANCIA para double (para poder fazer cálculos)
df = df.withColumn("DISTANCIA", col("DISTANCIA").cast("double"))

# Criando nova coluna DT_REFE com a data formatada
df = df.withColumn("DT_REFE", to_date(col("DATA_INICIO"), "dd-MM-yyyy HH:mm"))

# Verificando o resultado
df.select("DATA_INICIO", "DISTANCIA", "DT_REFE").show(5)


+----------------+---------+----------+
|     DATA_INICIO|DISTANCIA|   DT_REFE|
+----------------+---------+----------+
|     DATA_INICIO|     NULL|      NULL|
|01-01-2016 21:11|     51.0|2016-01-01|
|01-02-2016 01:25|      5.0|2016-02-01|
|01-02-2016 20:25|     48.0|2016-02-01|
|01-05-2016 17:31|     47.0|2016-05-01|
+----------------+---------+----------+
only showing top 5 rows



In [0]:
# Remove linha onde DATA_INICIO está igual a 'DATA_INICIO'
df = df.filter(df["DATA_INICIO"] != "DATA_INICIO")

# Mostra o resultado limpo
df.select("DATA_INICIO", "DISTANCIA", "DT_REFE").show(5)


+----------------+---------+----------+
|     DATA_INICIO|DISTANCIA|   DT_REFE|
+----------------+---------+----------+
|01-01-2016 21:11|     51.0|2016-01-01|
|01-02-2016 01:25|      5.0|2016-02-01|
|01-02-2016 20:25|     48.0|2016-02-01|
|01-05-2016 17:31|     47.0|2016-05-01|
|01-06-2016 14:42|    637.0|2016-06-01|
+----------------+---------+----------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import sum, max, min, avg

# atualizando por categoria
df_agg_categoria = df.groupBy("CATEGORIA").agg(
    sum("DISTANCIA").alias("DISTANCIA_TOTAL"),
    max("DISTANCIA").alias("DISTANCIA_MAXIMA"),
    min("DISTANCIA").alias("DISTANCIA_MINIMA"),
    avg("DISTANCIA").alias("DISTANCIA_MEDIA")
)


df_agg_categoria.show()


+---------+---------------+----------------+----------------+-----------------+
|CATEGORIA|DISTANCIA_TOTAL|DISTANCIA_MAXIMA|DISTANCIA_MINIMA|  DISTANCIA_MEDIA|
+---------+---------------+----------------+----------------+-----------------+
|  Negocio|        98980.0|          3103.0|             1.0|91.98884758364312|
|  Pessoal|         6898.0|          1802.0|             1.0|89.58441558441558|
+---------+---------------+----------------+----------------+-----------------+



In [0]:
from pyspark.sql.functions import col, to_date, count, max, min, avg, when

# Cria a coluna com a data formatada (yyyy-MM-dd)
df = df.withColumn("DT_REFE", to_date(col("DATA_INICIO"), "MM-dd-yyyy HH:mm"))

# Gera por data de início
df_agg = df.groupBy("DT_REFE").agg(
    count("*").alias("QT_CORR"),
    # Quantidade de corridas com categoria Negócio
    count(when(col("CATEGORIA") == "Negocio", True)).alias("QT_CORR_NEG"),
    # Quantidade de corridas com categoria Pessoal
    count(when(col("CATEGORIA") == "Pessoal", True)).alias("QT_CORR_PESS"),
    # Maior distância
    max("DISTANCIA").alias("VL_MAX_DIST"),
    # Menor distância
    min("DISTANCIA").alias("VL_MIN_DIST"),
    # Média da distância
    avg("DISTANCIA").alias("VL_AVG_DIST"),
    # Corridas com propósito "Reunião"
    count(when(col("PROPOSITO") == "Reunião", True)).alias("QT_CORR_REUNI"),
    # Corridas com propósito diferente de reunião e não nulo
    count(when((col("PROPOSITO").isNotNull()) & (col("PROPOSITO") != "Reunião"), True)).alias("QT_CORR_NAO_REUNI")
)

display(df_agg)


DT_REFE,QT_CORR,QT_CORR_NEG,QT_CORR_PESS,VL_MAX_DIST,VL_MIN_DIST,VL_AVG_DIST,QT_CORR_REUNI,QT_CORR_NAO_REUNI
2016-03-01,2,2,0,8.0,8.0,8.0,1,1
2016-05-03,1,1,0,25.0,25.0,25.0,0,1
2016-07-26,4,4,0,25.0,21.0,23.25,0,2
2016-08-15,3,3,0,259.0,141.0,185.66666666666663,0,1
2016-10-03,4,4,0,127.0,16.0,69.0,0,0
2016-01-28,3,3,0,157.0,19.0,107.66666666666669,2,1
2016-07-17,2,0,2,1802.0,151.0,976.5,0,2
2016-11-08,4,4,0,122.0,3.0,68.5,1,1
2016-12-19,10,10,0,102.0,7.0,41.8,1,4
2016-07-03,3,3,0,99.0,31.0,53.66666666666666,1,2


In [0]:
df_agg.write.format("parquet").mode("overwrite").save("/FileStore/tables/info_corridas_do_dia_parquet")