In [0]:
import pandas as pd

# Popula a dimens達o data

if spark.table("dim_data").count() == 0:
    datas_pandas = pd.date_range(
        start='2001-01-01',
        end='2025-01-01',
        freq='D'
    )
    pdf = pd.DataFrame({
        'id': range(1, len(datas_pandas) + 1), 
        'data': datas_pandas.date
    })
    df_dim_data = spark.createDataFrame(pdf)
    df_dim_data.write.mode("append").insertInto("dim_data")

display(spark.table("dim_data"))

In [0]:
import pandas as pd
import numpy as np

# Popula a dimens達o incerteza

if spark.table("dim_incerteza").count() == 0:
    valores = np.arange(0.0, 1.1, 0.1)
    pdf = pd.DataFrame({
        'id': range(1, len(valores) + 1), 
        'valor': valores
    })
    df_dim_incerteza = spark.createDataFrame(pdf)
    df_dim_incerteza.write.mode("append").insertInto("dim_incerteza")

display(spark.table("dim_incerteza"))

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window

# Popula a dimens達o observador

if spark.table("dim_observador").count() == 0:
    df_silver = spark.table("silver_stars")
    df_observador_new = df_silver.select(col("observer_code".alias("sigla"))).distinct()
    windowSpec = Window.orderBy("sigla")    
    df_final = df_observador_new.withColumn("id", F.row_number().over(windowSpec))
    df_final = df_final.select("id", "sigla")
    df_final.write.mode("append").insertInto("dim_observador")

display(spark.table("dim_observador"))

In [0]:

from pyspark.sql.functions import col
from pyspark.sql.window import Window

# Popula a dimens達o estrela

if spark.table("dim_estrela").count() == 0:
    df_silver = spark.table("silver_stars")
    df_stars_new = df_silver.select(col("star_name").alias("nome")).distinct()
    display(df_stars_new)
    windowSpec = Window.orderBy("nome")    
    df_final = df_stars_new.withColumn("id", F.row_number().over(windowSpec))
    df_final = df_final.select("id", "nome")
    df_final.write.mode("append").insertInto("dim_estrela")

display(spark.table("dim_estrela"))

In [0]:
# Carregamento da tabela fato
from pyspark.sql import functions as F

if spark.table("fato_medicao").count() == 0:
    df_silver = spark.table("silver_stars")
    dim_estrela = spark.table("dim_estrela")
    dim_observador = spark.table("dim_observador")
    dim_data = spark.table("dim_data")
    dim_incerteza = spark.table("dim_incerteza")


    df_fato = df_silver.alias("s") \
        .join(dim_estrela.alias("e"), F.col("s.star_name") == F.col("e.nome"), "inner") \
        .join(dim_observador.alias("o"), F.col("s.observer_code") == F.col("o.sigla"), "inner") \
        .join(dim_data.alias("d"), F.to_date(F.col("s.data_gregoriana")) == F.col("d.data"), "inner") \
        .join(dim_incerteza.alias("i"), F.round(F.col("s.uncertainty"), 1) == F.col("i.valor"), "left") \
        .select(
            F.monotonically_increasing_id().alias("id"),
            F.col("e.id").alias("id_estrela"),
            F.col("o.id").alias("id_observador"),
            F.col("d.id").alias("id_data"),
            F.coalesce(F.col("i.id"), F.lit(-1)).alias("id_incerteza"),
            F.round(F.col("s.magnitude"), 3).alias("magnitude")
        )

display(df_fato.limit(10))
df_fato.write.mode("overwrite").insertInto("fato_medicao")