In [1]:
#####################################################################################################
#
# Geração de estatísticas gerais obtidas a partir dos arquivos da camada Prata, que contêm informações da
# API Posicao da SPTRANS. Essas estatísticas serão armazenadas na camada ouro
#
# Abaixo as estatisticas geradas:
# 1. Estatística com a quantidade total por linha de ônibus circulando a cada medição
# 2. Estatística com a quantidade total geral de ônibus circulando a cada medição
# 3. Estatística com a média de ônibus por linha circulando por hora
# 4. Estatística com a média geral de ônibus circulando por hora
#
# *Obs: a medição se refere a cada chamada da API Posicao
#

In [2]:
#pip install psycopg2-binary

In [3]:
from pyspark.sql import SparkSession

In [4]:
from pyspark.sql.functions import explode

In [5]:
from pyspark.sql.functions import from_utc_timestamp,split, substring, sum, avg, max,filter, ceil

In [6]:
from datetime import datetime, timedelta
import zoneinfo as zi

In [7]:
spark = SparkSession.builder.appName("FIA-Proj-SPTRANS-Ouro").enableHiveSupport().getOrCreate()

In [8]:
#####################################################################################################
#
# Leitura dos arquivos da camada Prata relativos a uma determinada hora com as infos da API Posicao 
#

In [9]:
#Calcula a hora a ser processada
GMT = zi.ZoneInfo('GMT')
LOCAL_TZ_STR='America/Sao_Paulo'
LOCAL_TZ = zi.ZoneInfo(LOCAL_TZ_STR)

dt_localtime=datetime.now(tz=LOCAL_TZ)
dt_lasthour= dt_localtime - timedelta(hours=0)

str_lasthour= dt_lasthour.strftime('%Y/%m/%d/%H')


In [10]:
#Seta o path da camada prata onde foram persistidos as informacões de posição dos ônibus
prata= 's3a://prata/POSICAO_PARQUET/' +  str_lasthour + "/"
#prata= 's3a://prata/POSICAO_PARQUET/*'

In [11]:
#Seta o path da camada ouro onde serão persistidos a média de ônibus circulando por linha por hora
ouro_avg_linha= 's3a://ouro/MEDIA_ONIBUS_POR_LINHA/' +  str_lasthour + "/"

In [12]:
#Seta o path da camada ouro onde serão persistidos o total médio de ônibus circulando por hora
ouro_avg_geral= 's3a://ouro/MEDIA_ONIBUS_GERAL/' +  str_lasthour + "/"

In [13]:
#Seta o path da camada ouro onde serão persistidos o total por linha de onibus circulando em cada medição
ouro_total_linha= 's3a://ouro/TOTAL_ONIBUS_POR_LINHA/' +  str_lasthour + "/"

In [14]:
#Seta o path da camada ouro onde serão persistidos o total de ônibus circulando em cada medição
ouro_total_geral= 's3a://ouro/TOTAL_ONIBUS_GERAL/' +  str_lasthour + "/"

In [15]:
print(prata)
print(ouro_avg_linha)
print(ouro_avg_geral)
print(ouro_total_linha)
print(ouro_total_geral)

s3a://prata/POSICAO_PARQUET/2024/09/18/21/
s3a://ouro/MEDIA_ONIBUS_POR_LINHA/2024/09/18/21/
s3a://ouro/MEDIA_ONIBUS_GERAL/2024/09/18/21/
s3a://ouro/TOTAL_ONIBUS_POR_LINHA/2024/09/18/21/
s3a://ouro/TOTAL_ONIBUS_GERAL/2024/09/18/21/


In [16]:
#Lê os arquivos da camada prata com as infos retornados pela API Posicao
df_prata= spark.read.parquet(prata)

In [17]:
#Exibe o schema 
df_prata.printSchema()

root
 |-- data_ref: string (nullable = true)
 |-- hora_ref: string (nullable = true)
 |-- cod_onibus: long (nullable = true)
 |-- sentido_linha: long (nullable = true)
 |-- let_cod_linha: string (nullable = true)
 |-- let_destino: string (nullable = true)
 |-- let_origem: string (nullable = true)
 |-- timestamp_pos: timestamp (nullable = true)
 |-- latitude_pos: double (nullable = true)
 |-- longitude_pos: double (nullable = true)
 |-- id_linha: long (nullable = true)
 |-- qtde_onibus: long (nullable = true)



In [18]:
df_prata.count()

28030

In [19]:
df_prata.show(5)

+--------+--------+----------+-------------+-------------+-----------+----------+-------------------+-------------------+-------------------+--------+-----------+
|data_ref|hora_ref|cod_onibus|sentido_linha|let_cod_linha|let_destino|let_origem|      timestamp_pos|       latitude_pos|      longitude_pos|id_linha|qtde_onibus|
+--------+--------+----------+-------------+-------------+-----------+----------+-------------------+-------------------+-------------------+--------+-----------+
|20240918|   21:07|     68116|            2|      5010-10| STO. AMARO| JABAQUARA|2024-09-18 21:07:16|       -46.67079125|        -23.6381705|   33882|          7|
|20240918|   21:07|     68630|            2|      5010-10| STO. AMARO| JABAQUARA|2024-09-18 21:06:53|-46.631516500000004|         -23.662217|   33882|          7|
|20240918|   21:07|     68409|            2|      5010-10| STO. AMARO| JABAQUARA|2024-09-18 21:06:44|-46.660439499999995|         -23.647402|   33882|          7|
|20240918|   21:07|   

In [20]:
#####################################################################################################
#
# Cria dataframe com a quantidade de ônibus circulando por linha em cada medição
#

In [21]:
df_group_time=df_prata.groupby('data_ref','hora_ref',substring('hora_ref',0,2).alias('hora_id_ref'),'id_linha','sentido_linha','let_cod_linha','let_destino','let_origem').agg( max('qtde_onibus').alias('qtde_onibus'))

In [22]:
df_group_time.show(5)

+--------+--------+-----------+--------+-------------+-------------+------------------+------------------+-----------+
|data_ref|hora_ref|hora_id_ref|id_linha|sentido_linha|let_cod_linha|       let_destino|        let_origem|qtde_onibus|
+--------+--------+-----------+--------+-------------+-------------+------------------+------------------+-----------+
|20240918|   21:07|         21|     111|            1|      675X-10|     AACD-SERVIDOR|      TERM. GRAJAÚ|          8|
|20240918|   21:07|         21|   33158|            2|      273N-10| METRÔ VL. MATILDE|     CID. KEMEL II|         10|
|20240918|   21:07|         21|    1280|            1|      5106-10|LGO. SÃO FRANCISCO|         JD. SELMA|          5|
|20240918|   21:07|         21|   34788|            2|      709G-10|        ITAIM BIBI|TERM. GUARAPIRANGA|          2|
|20240918|   21:07|         21|    2462|            1|      709M-10|   TERM. PINHEIROS|  TERM. STO. AMARO|          8|
+--------+--------+-----------+--------+--------

In [23]:
#####################################################################################################
#
# Calcula a quantidade de ônibus por linha em circulação a cada medição
#

In [24]:
df_total_linha= df_group_time.select('data_ref','hora_ref','id_linha','sentido_linha','let_cod_linha','let_destino','let_origem','qtde_onibus')

In [25]:
df_total_linha.show(5)

+--------+--------+--------+-------------+-------------+------------------+------------------+-----------+
|data_ref|hora_ref|id_linha|sentido_linha|let_cod_linha|       let_destino|        let_origem|qtde_onibus|
+--------+--------+--------+-------------+-------------+------------------+------------------+-----------+
|20240918|   21:07|     111|            1|      675X-10|     AACD-SERVIDOR|      TERM. GRAJAÚ|          8|
|20240918|   21:07|   33158|            2|      273N-10| METRÔ VL. MATILDE|     CID. KEMEL II|         10|
|20240918|   21:07|    1280|            1|      5106-10|LGO. SÃO FRANCISCO|         JD. SELMA|          5|
|20240918|   21:07|   34788|            2|      709G-10|        ITAIM BIBI|TERM. GUARAPIRANGA|          2|
|20240918|   21:07|    2462|            1|      709M-10|   TERM. PINHEIROS|  TERM. STO. AMARO|          8|
+--------+--------+--------+-------------+-------------+------------------+------------------+-----------+
only showing top 5 rows



In [26]:
#####################################################################################################
#
# Calcula a quantidade média por hora do número de ônibus por linha circulando
#

In [27]:
#df_group_time.groupby('hr_id','c','cl','sl','lt0','lt1').agg( ceil(avg('qv')).alias('avg_qv')).show()

In [28]:
df_avg_hr_linha=df_group_time.groupby('data_ref','hora_id_ref','id_linha','sentido_linha','let_cod_linha','let_destino','let_origem').agg( ceil(avg('qtde_onibus')).alias('qtde_onibus'))

In [29]:
df_avg_hr_linha.show(5)

+--------+-----------+--------+-------------+-------------+--------------------+--------------------+-----------+
|data_ref|hora_id_ref|id_linha|sentido_linha|let_cod_linha|         let_destino|          let_origem|qtde_onibus|
+--------+-----------+--------+-------------+-------------+--------------------+--------------------+-----------+
|20240918|         21|    2190|            1|      4017-10|         VL. YOLANDA|        METALÚRGICOS|          4|
|20240918|         21|     917|            1|      2705-10|      METRÔ ITAQUERA|      JD. FANGANIELO|          3|
|20240918|         21|   35037|            2|      2026-10|      PQ. NOVO MUNDO|              JAÇANÃ|          1|
|20240918|         21|   33280|            2|      7903-10|PÇA. RAMOS DE AZE...|JD. JOÃO XXIII/EDUC.|          5|
|20240918|         21|     353|            1|      5154-10| TERM. PRINC. ISABEL|    TERM. STO. AMARO|          4|
+--------+-----------+--------+-------------+-------------+--------------------+--------

In [30]:
#####################################################################################################
#
# Calcula a quantidade total geral a cada medição, dos ônibus em circulação
#

In [31]:
#df_group_time.groupby('hr').agg( sum('qv')).alias('total_geral_qv').show()

In [32]:
df_total_geral_aux=df_group_time.groupby('data_ref','hora_id_ref','hora_ref').agg( sum('qtde_onibus').alias('qtde_onibus'))

In [33]:
df_total_geral_aux.show(5)

+--------+-----------+--------+-----------+
|data_ref|hora_id_ref|hora_ref|qtde_onibus|
+--------+-----------+--------+-----------+
|20240918|         21|   21:07|       9335|
|20240918|         21|   21:12|       9247|
|20240918|         21|   21:02|       9448|
+--------+-----------+--------+-----------+



In [34]:
df_total_geral= df_total_geral_aux.select('data_ref','hora_ref','qtde_onibus')

In [35]:
df_total_geral.show(5)

+--------+--------+-----------+
|data_ref|hora_ref|qtde_onibus|
+--------+--------+-----------+
|20240918|   21:07|       9335|
|20240918|   21:12|       9247|
|20240918|   21:02|       9448|
+--------+--------+-----------+



In [36]:
#####################################################################################################
#
# Calcula a média geral por hora do número de ônibus em circulação
#

In [37]:
df_avg_geral= df_total_geral_aux.groupby('data_ref','hora_id_ref').agg( ceil(avg('qtde_onibus')).alias('qtde_onibus'))

In [38]:
df_avg_geral.show()

+--------+-----------+-----------+
|data_ref|hora_id_ref|qtde_onibus|
+--------+-----------+-----------+
|20240918|         21|       9344|
+--------+-----------+-----------+



In [39]:
#####################################################################################################
#
# Gravação dos dataframe na camada Ouro em formato parquet:
# 1. A quantidade de ônibus por linha em circulação a cada medição
# 2. A quantidade total geral de ônibus em circulação a cada medição
# 3. A média de ônibus por linha circulando por hora
# 4. A média geral de ônibus circulando por hora
#

In [40]:
df_total_linha.write.parquet(ouro_total_linha, mode='overwrite')

In [41]:
df_total_geral.write.parquet(ouro_total_geral, mode='overwrite')

In [42]:
df_avg_hr_linha.write.parquet(ouro_avg_linha, mode='overwrite')

In [43]:
df_avg_geral.write.parquet(ouro_avg_geral, mode='overwrite')

In [44]:
#####################################################################################################
#
# Gravação dos dataframe na base de dados Postgres:
# 1. A quantidade de ônibus por linha em circulação a cada medição
# 2. A quantidade total geral de ônibus em circulação a cada medição
# 3. A média de ônibus por linha circulando por hora
# 4. A média geral de ônibus circulando por hora
#

In [45]:
#configuração das informações de conexão com a base de dados Postgres

In [46]:
url = "jdbc:postgresql://db:5432/dvdrental"

properties = {
"user": "admin",
"password": "admin",
"driver": "org.postgresql.Driver"
}

In [47]:
#gravação dos dataframes nas tabelas stages 

In [48]:
df_total_linha.write.jdbc( url=url, table='sptrans.total_onibus_por_linha_stage',mode="overwrite", properties= properties)

In [49]:
df_total_geral.write.jdbc( url=url, table='sptrans.total_onibus_geral_stage',mode="overwrite", properties= properties)

In [50]:
df_avg_hr_linha.write.jdbc( url=url, table='sptrans.media_onibus_por_linha_stage',mode="overwrite", properties= properties)

In [51]:
df_avg_geral.write.jdbc( url=url, table='sptrans.media_onibus_geral_stage',mode="overwrite", properties= properties)

In [52]:
##################################################################
#
# upsert dos dados das stages nas tabelas fatos
#

In [53]:
import psycopg2
from psycopg2 import sql

In [54]:
#conexão na base de dados Postgres
conn = psycopg2.connect(
    dbname="dvdrental",
    user="admin",
    password="admin",
    host="db",
    port="5432"
)

# Criar um cursor
cur = conn.cursor()

In [55]:
# Upsert na tabela total_onibus_geral_por_linha com as linhas da respectiva tabela stage

In [56]:
query = """
INSERT INTO sptrans.total_onibus_por_linha(data_ref, hora_ref, id_linha, sentido_linha, let_cod_linha, let_destino, let_origem, qtde_onibus)
   SELECT data_ref, hora_ref, id_linha, sentido_linha, let_cod_linha, let_destino, let_origem, qtde_onibus
   FROM sptrans.total_onibus_por_linha_stage
ON CONFLICT (data_ref, hora_ref, id_linha) 
DO UPDATE SET 
    sentido_linha = EXCLUDED.sentido_linha,
    let_cod_linha = EXCLUDED.let_cod_linha,
    let_destino = EXCLUDED.let_destino,
    let_origem = EXCLUDED.let_origem,
    qtde_onibus = EXCLUDED.qtde_onibus;
"""

In [57]:
# Executar a consulta
cur.execute(query)

In [58]:
# Commit das mudanças
#conn.commit()

In [59]:
# Upsert na tabela total_onibus_geral com as linhas da respectiva tabela stage

In [60]:
query = """
INSERT INTO sptrans.total_onibus_geral(data_ref, hora_ref, qtde_onibus)
   SELECT data_ref, hora_ref, qtde_onibus
   FROM sptrans.total_onibus_geral_stage
ON CONFLICT (data_ref, hora_ref) 
DO UPDATE SET 
    qtde_onibus = EXCLUDED.qtde_onibus;
"""

In [61]:
# Executar a consulta
cur.execute(query)

In [62]:
# Commit das mudanças
#conn.commit()

In [63]:
# Upsert na tabela media_onibus_por_linha com as linhas da respectiva tabela stage

In [64]:
query = """
INSERT INTO sptrans.media_onibus_por_linha(data_ref, hora_id_ref, id_linha, sentido_linha, let_cod_linha, let_destino, let_origem, qtde_onibus)
   SELECT data_ref, hora_id_ref, id_linha, sentido_linha, let_cod_linha, let_destino, let_origem, qtde_onibus
   FROM sptrans.media_onibus_por_linha_stage
ON CONFLICT (data_ref, hora_id_ref, id_linha) 
DO UPDATE SET 
    sentido_linha = EXCLUDED.sentido_linha,
    let_cod_linha = EXCLUDED.let_cod_linha,
    let_destino = EXCLUDED.let_destino,
    let_origem = EXCLUDED.let_origem,
    qtde_onibus = EXCLUDED.qtde_onibus;
"""

In [65]:
# Executar a consulta
cur.execute(query)

In [66]:
# Commit das mudanças
#conn.commit()

In [67]:
# Upsert na tabela media_onibus_geral com as linhas da respectiva tabela stage

In [68]:
query = """
INSERT INTO sptrans.media_onibus_geral(data_ref, hora_id_ref, qtde_onibus)
   SELECT data_ref, hora_id_ref, qtde_onibus
   FROM sptrans.media_onibus_geral_stage
ON CONFLICT (data_ref, hora_id_ref) 
DO UPDATE SET 
    qtde_onibus = EXCLUDED.qtde_onibus;
"""

In [69]:
# Executar a consulta
cur.execute(query)

In [70]:
# Commit das mudanças
conn.commit()

In [71]:
# Fechar o cursor e a conexão
cur.close()
conn.close()

In [72]:
#####################################################################################################
#
# Fim do processamento