# ETL Bus Position

In [4]:
import pytz
import os
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import explode, col

In [5]:
# Inicializa a sessão Spark
spark = SparkSession.builder.appName("ProcessRawToTrusted").getOrCreate()

# Configura o acesso ao MinIO
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", "datalake")
hadoop_conf.set("fs.s3a.secret.key", "datalake")
hadoop_conf.set("fs.s3a.endpoint", "http://minio:9000")
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

spark

In [6]:
# Defina o caminho de entrada e saída com base na data
fuso_horario_brasilia = pytz.timezone('America/Sao_Paulo')
brasilia_time = datetime.now(fuso_horario_brasilia)
today = brasilia_time.strftime('%Y-%m-%d')

raw_path = f"s3a://raw/busdata/{today}"
trusted_path = f"s3a://trusted/busposition/{today}"

routes_trusted_path = f"s3a://trusted/routes/{today}/"
positions_trusted_path = f"s3a://trusted/positions/{today}/"

In [8]:
# Exibe a lista de arquivos
files_df = spark.read.format("binaryFile").load(raw_path)

# Pega o primeiro arquivo para gerar o df_bus_lines
first_raw_file_path = files_df.collect()[0]["path"]
file_name = os.path.basename(first_raw_file_path).replace(".json", "")
                                                          
df_raw = spark.read.json(first_raw_file_path)

# Explodir o array "l" para acessar as informações de cada linha de ônibus
df_lines = df_raw.select(explode(col("l")).alias("linha"))

# DataFrame com as informações de "c" até "qv" (linhas de ônibus)
df_bus_lines = df_lines.select(
    col("linha.cl").alias("codigo_trajeto"),
    col("linha.sl").alias("sentido"),
    col("linha.c").alias("letreiro"),
    col("linha.lt0").alias("terminal_primario"),
    col("linha.lt1").alias("terminal_secundario"),
    col("linha.qv").alias("qnt_veiculos")
)
# Salva o df_bus_lines no caminho apenas uma vez
# Define o caminho de destino usando o nome do arquivo original
destination_path = os.path.join(trusted_path, file_name)
    
# Salva o DataFrame processado no formato Parquet
df_bus_lines.write.mode("overwrite").parquet(destination_path)
#df_bus_lines.write.mode("overwrite").json(routes_trusted_path + 'routes_' + today)
df_bus_lines.show()

+--------------+-------+--------+--------------------+-------------------+------------+
|codigo_trajeto|sentido|letreiro|   terminal_primario|terminal_secundario|qnt_veiculos|
+--------------+-------+--------+--------------------+-------------------+------------+
|          1215|      1| 6037-10|     TERM. CAPELINHA|      JD. MITSUTANI|           5|
|         34390|      2| 695Y-21|      EST. AUTÓDROMO|  TERM. PARELHEIROS|           2|
|         33935|      2| 6L11-10|        TERM. GRAJAÚ|     ILHA DO BORORÉ|           5|
|         33104|      2| 4718-10|     METRÔ STA. CRUZ|        JD. CELESTE|           1|
|         34856|      2| 9050-10|          ITAIM BIBI|         TERM. LAPA|           6|
|          2135|      1| 199D-10|     TERM. PINHEIROS|  CONEXÃO VL. IÓRIO|           5|
|         34076|      2| 5185-10|TERM. PQ. D. PEDR...| TERM. GUARAPIRANGA|          11|
|         33020|      2| 263C-10|            COHAB II|         JD. HELENA|           3|
|          1074|      1| 3796-10

In [5]:
# Itera sobre os arquivos para processar as posições dos veículos
for row in files_df.collect():
    raw_file_path = row["path"]
    raw_file_date = raw_file_path.split('_')[-1].split('.')[0]
    df_raw = spark.read.json(raw_file_path)
    
    # Explodir o array "l" para acessar as informações de cada linha de ônibus
    df_lines = df_raw.select(explode(col("l")).alias("linha"))

    # Explodir o array "vs" para acessar as informações de cada veículo dentro da linha de ônibus
    df_vehicles = df_lines.select(
        col("linha.cl").alias("codigo_trajeto"),
        col("linha.sl").alias("sentido"),
        explode(col("linha.vs")).alias("vehicle")
    )

    # DataFrame com as informações de "p" até "px" (posições dos veículos)
    df_vehicles_position = df_vehicles.select(
        col("codigo_trajeto"),
        col("sentido"),
        col("vehicle.p").alias("prefixo_veiculo"),
        col("vehicle.py").alias("latitude"),
        col("vehicle.px").alias("longitude")
    )
    
    # Salva as posições dos veículos no caminho de saída
    df_vehicles_position.write.mode("overwrite").json(positions_trusted_path + 'positions_' + raw_file_date)

In [6]:
df_vehicles_position.show()

+--------------+-------+---------------+-------------------+-------------------+
|codigo_trajeto|sentido|prefixo_veiculo|           latitude|          longitude|
+--------------+-------+---------------+-------------------+-------------------+
|          1215|      1|          78141|-23.654442500000002|        -46.7623235|
|          1215|      1|          78196|         -23.649639|        -46.7841365|
|          1215|      1|          78995|        -23.6493575|        -46.7848365|
|          1215|      1|          78874|-23.658293999999998|        -46.7752675|
|         34209|      2|          61793|-23.767454999999998|-46.716910999999996|
|         34209|      2|          61588|      -23.710868125|-46.699293624999996|
|         34209|      2|          61552|-23.647087499999998|          -46.63931|
|         34209|      2|          61795|       -23.67837375|-46.684760749999995|
|         34209|      2|          61541|       -23.64654725|       -46.64174575|
|         34209|      2|    

In [10]:
# Encerra a sessão Spark
spark.stop()

In [6]:
# ----------------------------------------------------------------------------------------------------------------- #

# ETL Volumetria de Passageiros 

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import col
import os

In [13]:
# Inicializa a sessão Spark
spark = SparkSession.builder \
    .appName("ProcessPassangers") \
    .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:3.2.1_0.17.6") \
    .getOrCreate()

# Configura o acesso ao MinIO
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", "datalake")
hadoop_conf.set("fs.s3a.secret.key", "datalake")
hadoop_conf.set("fs.s3a.endpoint", "http://minio:9000")
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

spark

In [14]:
# Defina o caminho onde os arquivos Excel estão localizados e onde serão salvos
raw_path = "s3a://raw/passageiros/2023/"
trusted_path = "s3a://raw/passageiros/2023/"

In [15]:
# Lista todos os arquivos Excel no diretório
files_df = spark.read.format("binaryFile").load(raw_path)
# Itera sobre os arquivos encontrados
for row in files_df.collect():
    excel_file_path = row["path"]
    file_name = os.path.basename(excel_file_path).replace(".xlsx", "")
    
    # Lê cada planilha Excel (primeira aba por padrão)
    df_excel = spark.read.format("com.crealytics.spark.excel") \
        .option("header", "true") \
        .option("dataAddress", "'Sheet1'!A3") \
        .option("useHeader", "true") \
        .option("maxRowsInMemory", 1000) \
        .option("inferSchema", "true") \
        .load(excel_file_path)
    
    # Selecionar apenas as colunas 'Linha' e 'Tot Passageiros Transportados'
    df_selected = df_excel.select("Linha", "Tot Passageiros Transportados")

    # Adiciona a coluna 'Média Diária', dividindo 'Tot Passageiros Transportados' por 30
    df_media_mensal = df_selected.withColumn("Média Diária", col("Tot Passageiros Transportados") / 30)

    # Define o caminho de destino usando o nome do arquivo original
    destination_path = os.path.join(trusted_path, file_name)
    
    # Salva o DataFrame processado no formato Parquet
    df_media_mensal.write.mode("overwrite").parquet(destination_path)

Py4JJavaError: An error occurred while calling o102.load.
: java.lang.ClassNotFoundException: 
Failed to find data source: com.crealytics.spark.excel. Please find packages at
https://spark.apache.org/third-party-projects.html
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:587)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:207)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:185)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: com.crealytics.spark.excel.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:587)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:661)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:661)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:661)
	... 15 more
