<a href="https://colab.research.google.com/github/dougl-costa/code_assignment/blob/main/AE_Diario_de_Bordo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, count, when, sum as _sum, max as _max, min as _min, avg as _avg

In [8]:
# Inicializar a sessão Spark
spark = SparkSession.builder.appName("AnaliseCorridas").getOrCreate()

In [12]:
# Carregar o arquivo CSV
df = spark.read.csv("info_transportes.csv", header=True, inferSchema=True, sep=";")


DataFrame[DATA_INICIO: string, DATA_FIM: string, CATEGORIA: string, LOCAL_INICIO: string, LOCAL_FIM: string, DISTANCIA: int, PROPOSITO: string]

In [27]:
# Converter a coluna DATA_INICIO para o formato de data yyyy-MM-dd
df = df.withColumn("DT_REFE", to_date(col("DATA_INICIO"), "yyyy-MM-dd"))

In [21]:
# Agrupar por DT_REFE e calcular as métricas
info_corridas_do_dia = df.groupBy("DT_REFE").agg(
    count("*").alias("QT_CORR"),  # Quantidade total de corridas
    _sum(when(col("CATEGORIA") == "Negócio", 1).otherwise(0)).alias("QT_CORR_NEG"),  # Corridas de Negócio
    _sum(when(col("CATEGORIA") == "Pessoal", 1).otherwise(0)).alias("QT_CORR_PESS"),  # Corridas Pessoais
    _max("DISTANCIA").alias("VL_MAX_DIST"),  # Distância máxima
    _min("DISTANCIA").alias("VL_MIN_DIST"),  # Distância mínima
    _avg("DISTANCIA").alias("VL_AVG_DIST"),  # Média de distância
    _sum(when(col("PROPOSITO") == "Reunião", 1).otherwise(0)).alias("QT_CORR_REUNI"),  # Corridas para Reunião
    _sum(when((col("PROPOSITO").isNotNull()) & (col("PROPOSITO") != "Reunião"), 1).otherwise(0)).alias("QT_CORR_NAO_REUNI")  # Corridas com propósito diferente de Reunião
)


In [23]:
display(info_corridas_do_dia)

DataFrame[DT_REFE: date, QT_CORR: bigint, QT_CORR_NEG: bigint, QT_CORR_PESS: bigint, VL_MAX_DIST: int, VL_MIN_DIST: int, VL_AVG_DIST: double, QT_CORR_REUNI: bigint, QT_CORR_NAO_REUNI: bigint]

In [26]:
# Ordenar por DT_REFE (opcional)
info_corridas_do_dia = info_corridas_do_dia.orderBy("DT_REFE")

# Exibir o resultado
# info_corridas_do_dia.show()

info_corridas_do_dia.write.format("parquet")

<pyspark.sql.readwriter.DataFrameWriter at 0x7f20f04bb790>