In [1]:
# Importando as bibliotecas necessárias
from pyspark.sql import SparkSession

# Criando a Spark session
spark = SparkSession.builder.appName("Resolvendo_Questions_ifood").getOrCreate()

# Caminho do arquivo Parquet de origem
caminho_origem = ["/home/jovyan/work/silver/taxi_roadtrip.parquet"]

# Lendo o arquivo Parquet
df = spark.read.parquet(*caminho_origem)

# Mostrando as primeiras linhas para verificar o conteúdo
#df.show()

In [2]:
# Registrar o DataFrame como uma tabela temporária para executar consultas SQL, para fácil manipulação/entendimento do usuário. 
df.createOrReplaceTempView("dados")

In [3]:
# Executar uma consulta SQL para filtrar ou manipular os dados
qst1_resultado_sql = spark.sql("""

        SELECT 
            CAST(AVG(vl_total) AS DECIMAL(18,4)) as VALOR_media_mes 
        FROM (
            SELECT 
                sum(total_amount) as vl_total, 
                month(tpep_pickup_datetime) as mes
            FROM dados
                Where right(left(arquivo_origem,15),1) = 'y' -- filtro para pegar apenas os taxis amarelos.
            group by month(tpep_pickup_datetime) 
        ) A
                      
""")

qst1_resultado_sql.show()

+---------------+
|VALOR_media_mes|
+---------------+
|  75100918.8366|
+---------------+



In [4]:
# Executar uma consulta SQL para filtrar ou manipular os dados
qst21_resultado_sql = spark.sql("""

SELECT 
    CAST(AVG(passageiros) AS DECIMAL(18,4)) as PASSAGEIROS_media_dia 
from (
    SELECT 
        sum(passenger_count) AS passageiros, 
        DATE_FORMAT(tpep_pickup_datetime, "dd/MM/yyyy") AS dia
    FROM dados
        WHERE SUBSTRING(arquivo_origem, 15, 1) = 'y' -- filtro para pegar apenas os táxis amarelos
    GROUP BY DATE_FORMAT(tpep_pickup_datetime, "dd/MM/yyyy")
)
                      
""")

qst21_resultado_sql.show()

+---------------------+
|PASSAGEIROS_media_dia|
+---------------------+
|          250973.1345|
+---------------------+



In [5]:
# Executar uma consulta SQL para filtrar ou manipular os dados
qst22_resultado_sql = spark.sql("""

SELECT 
    CAST(AVG(passageiros) AS DECIMAL(18,4)) as PASSAGEIROS_media_hora 
from (
    SELECT 
        SUM(passenger_count) AS passageiros,
        DATE_FORMAT(tpep_pickup_datetime, "dd/MM/yyyy") AS dia,
        DATE_FORMAT(tpep_pickup_datetime, "hh") AS hora
    FROM dados
        WHERE SUBSTRING(arquivo_origem, 15, 1) = 'y' -- filtro para pegar apenas os táxis amarelos
    GROUP BY 
        DATE_FORMAT(tpep_pickup_datetime, "dd/MM/yyyy"), DATE_FORMAT(tpep_pickup_datetime, "hh")
)
""")

qst22_resultado_sql.show()

+----------------------+
|PASSAGEIROS_media_hora|
+----------------------+
|            23073.3366|
+----------------------+



In [6]:
# Caminho para gravar os arquivos
caminho_qst1 = "/home/jovyan/work/gold/Question_1.parquet"
caminho_qst21 = "/home/jovyan/work/gold/Question_2.parquet"
caminho_qst22 = "/home/jovyan/work/gold/Question_22.parquet"

# Gravando os arquivos "gold's"(respostas dos questionamentos) em formato Parquet, em outro local(gold) para conferência.
qst1_resultado_sql.write.mode("overwrite").parquet(caminho_qst1)
qst21_resultado_sql.write.mode("overwrite").parquet(caminho_qst21)
qst22_resultado_sql.write.mode("overwrite").parquet(caminho_qst22)

# Parando a Spark session
spark.stop()

In [7]:
#código de auxilio para busca do caminho lógico do jupyter
import os
print(os.listdir())

['.git', '.ipynb_checkpoints', 'cria_origem.py', 'Dados', 'ETL.py', 'External_Table_Create.sql', 'External_Table_Create_Resumo.sql', 'External_Table_Create_Resumo_1.sql', 'gold', 'ifood_poc', 'output_files', 'parquets_origem', 'poetry.lock', 'pyproject.toml', 'Questions(parte 2).ipynb', 'README.md', 'silver']
