In [1]:
import os

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

## Criando o job Spark

In [2]:
spark = SparkSession.builder \
    .appName("SragGold") \
    .master("local[*]") \
    .getOrCreate()
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

/home/airflow/.local/lib/python3.8/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/04/28 22:03:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.sparkContext.setLogLevel("ERROR")

## Lendo os arquivos Parquet

In [4]:
base_dir = os.path.abspath(os.path.join(os.getcwd(), "../")) 
data_path = os.path.join(base_dir, "lakehouse_srag/data/silver/srag.parquet")
df = spark.read.parquet(
    data_path
)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

## Criando a fCasos

In [5]:
df_fato_casos = df.select(
    'DT_NOTIFIC', 'SEM_NOT', 'DT_SIN_PRI', 'SEM_PRI', 'EVOLUCAO',
    'CLASSI_FIN', 'CRITERIO', 'HOSPITAL', 'UTI', 'SUPORT_VEN',
    'PCR_RESUL', 'AN_SARS2', 'VACINA_COV', 'DOSE_1_COV', 'DOSE_2_COV', 
    'DOSE_REF', 'TRAT_COV', 'TIPO_TRAT'
)

## Criando a fSintomas

In [6]:
df_fato_sintomas = df.groupby('DT_NOTIFIC').agg(
    F.sum('FEBRE').alias('FEBRE'),
    F.sum('TOSSE').alias('TOSSE'),
    F.sum('GARGANTA').alias('GARGANTA'),
    F.sum('DISPNEIA').alias('DISPNEIA'),
    F.sum('SATURACAO').alias('SATURACAO'),
    F.sum('DIARREIA').alias('DIARREIA'),
    F.sum('VOMITO').alias('VOMITO'),
    F.sum('DOR_ABD').alias('DOR_ABD'),
    F.sum('FADIGA').alias('FADIGA'),
    F.sum('PERD_OLFT').alias('PERD_OLFT'),
    F.sum('PERD_PALA').alias('PERD_PALA'),
    F.sum('OUTRO_SIN').alias('OUTRO_SIN')
)

## Criando a fInternações

In [7]:
df_fato_internacoes = df.select(
    'DT_NOTIFIC', 'HOSPITAL', 'DT_INTERNA', 'UTI', 'DT_ENTUTI',
    'DT_SAIDUTI', 'SUPORT_VEN', 'RAIOX_RES', 'TOMO_RES'
)

## Criando a fTestes

In [8]:
df_fato_testes = df.select(
    'DT_NOTIFIC', 'PCR_RESUL', 'DT_PCR', 'POS_PCRFLU', 'PCR_SARS2',
    'POS_PCROUT', 'AN_SARS2', 'RES_IGG', 'RES_IGM', 'RES_IGA'
)

## Criando a dTempo

In [9]:
df_dim_tempo = df.select('DT_NOTIFIC').distinct().withColumn('Ano', F.year('DT_NOTIFIC'))\
    .withColumn('Mes', F.month('DT_NOTIFIC')).withColumn('Semana', F.weekofyear('DT_NOTIFIC'))

## Criando a dPacientes

In [10]:
df_dim_pacientes = df.select(
    'CS_SEXO', 'CS_RACA', 'NU_IDADE_N', 'CS_ESCOL_N', 'CS_ZONA',
    'PUERPERA', 'FATOR_RISC', 'CARDIOPATI', 'DIABETES', 'OBESIDADE'
)

## Criando a dLocalização

In [11]:
df_dim_localizacao = df.select(
    'SG_UF_NOT', 'CO_MUN_NOT', 'ID_REGIONA', 'ID_MN_RESI'
).distinct()

In [12]:
tabelas = {
    "fato_casos": df_fato_casos,
    "fato_sintomas": df_fato_sintomas,
    "fato_internacoes": df_fato_internacoes,
    "fato_testes": df_fato_testes,
    "dim_tempo": df_dim_tempo,
    "dim_pacientes": df_dim_pacientes,
    "dim_localizacao": df_dim_localizacao
}

In [13]:
for nome_tabela, df_tabela in tabelas.items():
    base_dir = os.path.abspath(os.path.join(os.getcwd(), "../")) 
    caminho = os.path.join(base_dir, f"lakehouse_srag/data/gold/{nome_tabela}")
    df_tabela.write.mode("overwrite").format("parquet").save(caminho)

[Stage 1:>                                                         (0 + 8) / 11]





[Stage 2:>                                                         (0 + 8) / 11]



                                                                                

[Stage 5:>                                                         (0 + 8) / 11]







                                                                                





                                                                                



                                                                                