In [4]:
from pyspark.sql.functions import col, lit, monotonically_increasing_id, year, month, dayofmonth, to_date, hour
from pyspark.sql.types import IntegerType

StatementMeta(, 738ffdf9-e75a-4936-a899-c0f2ec5186b0, 6, Finished, Available, Finished)

In [3]:
# Load Silver Table
df_silver = spark.read.table("silver_iot_tilapia_table")

StatementMeta(, 738ffdf9-e75a-4936-a899-c0f2ec5186b0, 5, Finished, Available, Finished)

In [8]:
df_silver.show(5)

StatementMeta(, 738ffdf9-e75a-4936-a899-c0f2ec5186b0, 10, Finished, Available, Finished)

+-------------------+-------+---------------------+-----------------+------------------------+-------------+---------------------+-----------+-------------+-----------+-----------+-------------------------+------------------------+-----------------+------------------+-----------------+----------------+-----------+-----------+-----------+--------------+-----------+---------------+---------------------+-------------------+------------------+----------------+-------------+--------------------+--------------------+----------+
|           datetime|  month|average_fish_weight_g|survival_rate_pct|disease_occurrence_cases|temperature_c|dissolved_oxygen_mg_l|         ph|turbidity_ntu|  month_num|    month_x|oxygenation_interventions|corrective_interventions|avg_temperature_c|high_temperature_c|low_temperature_c|precipitation_in|    month_y|        day|       hour|oxigeno_scaled|  ph_scaled|turbidez_scaled|oxygenation_automatic|corrective_measures|thermal_risk_index|low_oxygen_alert|health_statu

In [9]:
df_silver = df_silver.withColumn("datetime", col("datetime").cast("timestamp"))
# create a date (date_key) and ensure hour is integer
df_silver = df_silver.withColumn("date_key", to_date(col("datetime")))
df_silver = df_silver.withColumn("hour", hour(col("datetime")).cast(IntegerType()))

StatementMeta(, 738ffdf9-e75a-4936-a899-c0f2ec5186b0, 11, Finished, Available, Finished)

In [11]:
#dim date creation

dim_date = (
    df_silver
    .select("date_key")
    .dropDuplicates()
    .withColumn("date_id", monotonically_increasing_id())
    .withColumn("year", year(col("date_key")))
    .withColumn("month", month(col("date_key")))
    .withColumn("day", dayofmonth(col("date_key")))
    .select("date_id", "date_key", "year", "month", "day")
)

# write and register
dim_date.write.format("delta").mode("overwrite").saveAsTable("gold_dim_date")
print("gold_dim_date created:", dim_date.count(), "rows")

StatementMeta(, 738ffdf9-e75a-4936-a899-c0f2ec5186b0, 13, Finished, Available, Finished)

gold_dim_date created: 183 rows


In [12]:
#dim time creation
dim_time = (
    df_silver
    .select("hour")
    .dropDuplicates()
    .orderBy("hour")
    .withColumn("time_id", monotonically_increasing_id())
    .select("time_id", "hour")
)

dim_time.write.format("delta").mode("overwrite").saveAsTable("gold_dim_time")
print("gold_dim_time created:", dim_time.count(), "rows")

StatementMeta(, 738ffdf9-e75a-4936-a899-c0f2ec5186b0, 14, Finished, Available, Finished)

gold_dim_time created: 24 rows


In [13]:
# dim healh status creation

dim_health_status = (
    df_silver
    .select("health_status")
    .dropDuplicates()
    .withColumn("health_status_id", monotonically_increasing_id())
    .select("health_status_id", "health_status")
)

dim_health_status.write.format("delta").mode("overwrite").saveAsTable("gold_dim_health_status")
print("gold_dim_health_status created:", dim_health_status.count(), "rows")


StatementMeta(, 738ffdf9-e75a-4936-a899-c0f2ec5186b0, 15, Finished, Available, Finished)

gold_dim_health_status created: 2 rows


In [14]:
#dim risk index creation

dim_risk_index = (
    df_silver
    .select("thermal_risk_index")
    .dropDuplicates()
    .withColumn("risk_index_id", monotonically_increasing_id())
    .select("risk_index_id", "thermal_risk_index")
)

dim_risk_index.write.format("delta").mode("overwrite").saveAsTable("gold_dim_risk_index")
print("gold_dim_risk_index created:", dim_risk_index.count(), "rows")

StatementMeta(, 738ffdf9-e75a-4936-a899-c0f2ec5186b0, 16, Finished, Available, Finished)

gold_dim_risk_index created: 2 rows


In [15]:
#dim oxygen alert creation
dim_oxygen_alert = (
    df_silver
    .select("low_oxygen_alert")
    .dropDuplicates()
    .withColumn("oxygen_alert_id", monotonically_increasing_id())
    .select("oxygen_alert_id", "low_oxygen_alert")
)

dim_oxygen_alert.write.format("delta").mode("overwrite").saveAsTable("gold_dim_oxygen_alert")
print("gold_dim_oxygen_alert created:", dim_oxygen_alert.count(), "rows")

StatementMeta(, 738ffdf9-e75a-4936-a899-c0f2ec5186b0, 17, Finished, Available, Finished)

gold_dim_oxygen_alert created: 1 rows


In [17]:
#dim corrective measures 

dim_corrective_measures = (
    df_silver
    .select("corrective_measures")
    .dropDuplicates()
    .withColumn("corrective_id", monotonically_increasing_id())
    .select("corrective_id", "corrective_measures")
)

dim_corrective_measures.write.format("delta").mode("overwrite").saveAsTable("gold_dim_corrective_measures")
print("gold_dim_corrective_measures created:", dim_corrective_measures.count(), "rows")

StatementMeta(, 738ffdf9-e75a-4936-a899-c0f2ec5186b0, 19, Finished, Available, Finished)

gold_dim_corrective_measures created: 2 rows


In [18]:
#dim source file

dim_source_file = (
    df_silver
    .select("source_file")
    .dropDuplicates()
    .withColumn("source_id", monotonically_increasing_id())
    .select("source_id", "source_file")
)

dim_source_file.write.format("delta").mode("overwrite").saveAsTable("gold_dim_source_file")
print("gold_dim_source_file created:", dim_source_file.count(), "rows")


StatementMeta(, 738ffdf9-e75a-4936-a899-c0f2ec5186b0, 20, Finished, Available, Finished)

gold_dim_source_file created: 1 rows


In [19]:
#fact table creation

fact = df_silver.alias("s") \
    .join(dim_date.alias("d"), on=[col("s.date_key") == col("d.date_key")], how="left") \
    .join(dim_time.alias("t"), on=[col("s.hour") == col("t.hour")], how="left") \
    .join(dim_health_status.alias("h"), on=[col("s.health_status") == col("h.health_status")], how="left") \
    .join(dim_risk_index.alias("r"), on=[col("s.thermal_risk_index") == col("r.thermal_risk_index")], how="left") \
    .join(dim_oxygen_alert.alias("o"), on=[col("s.low_oxygen_alert") == col("o.low_oxygen_alert")], how="left") \
    .join(dim_corrective_measures.alias("c"), on=[col("s.corrective_measures") == col("c.corrective_measures")], how="left") \
    .join(dim_source_file.alias("sf"), on=[col("s.source_file") == col("sf.source_file")], how="left")


StatementMeta(, 738ffdf9-e75a-4936-a899-c0f2ec5186b0, 21, Finished, Available, Finished)

In [20]:
# Select fact columns and FK ids
fact_environment = fact.select(
    col("d.date_id").alias("date_id"),
    col("t.time_id").alias("time_id"),
    col("h.health_status_id").alias("health_status_id"),
    col("r.risk_index_id").alias("risk_index_id"),
    col("o.oxygen_alert_id").alias("oxygen_alert_id"),
    col("c.corrective_id").alias("corrective_id"),
    col("sf.source_id").alias("source_id"),
    col("s.datetime").alias("datetime"),
    col("s.average_fish_weight_g"),
    col("s.survival_rate_pct"),
    col("s.disease_occurrence_cases"),
    col("s.temperature_c"),
    col("s.dissolved_oxygen_mg_l"),
    col("s.ph"),
    col("s.turbidity_ntu"),
    col("s.oxigeno_scaled"),
    col("s.ph_scaled"),
    col("s.turbidez_scaled"),
    col("s.oxygenation_interventions"),
    col("s.corrective_interventions"),
    col("s.oxygenation_automatic"),
    col("s.thermal_risk_index"),
    col("s.low_oxygen_alert"),
    col("s.health_status"),
    col("s.ingestion_timestamp")
)

# Write fact table
fact_environment.write.format("delta").mode("overwrite").saveAsTable("gold_fact_environment")
print("gold_fact_environment created:", fact_environment.count(), "rows")


StatementMeta(, 738ffdf9-e75a-4936-a899-c0f2ec5186b0, 22, Finished, Available, Finished)

gold_fact_environment created: 4383 rows
