In [0]:
from src.utils.locate_filepaths import storage_filepaths
import glob
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType,DoubleType
from pyspark.sql.functions import round, col, lit, split, current_timestamp, to_date
paths = storage_filepaths()

dbutils.widgets.text("source", "")
source = dbutils.widgets.get("source")

files = dbutils.fs.ls(f"{paths['bronze']}/geus_groundwater/")


In [0]:
# Create a dictionary with station_id as key and station_name as value
station_file = [i.path for i in files if i.path.endswith('.txt')][0]
with open(station_file.replace('dbfs:','/dbfs'), "r") as f:
    lines = f.readlines()

station_dict = {}
for i in lines:
    key = i.split(',')[0]
    values = i.split(',')[1:]
    values_merged = ", ".join(s.strip() for s in values)
    station_dict[key] = values_merged

In [0]:
# Creating an empty Spark DataFrame
schema = StructType([
    StructField("observed_timestamp", TimestampType(), True),
    StructField("waterlevel", FloatType(), True),
    StructField("station_id", StringType(), True),
    StructField("station_name", StringType(), True),
])

empty_data = []
df = spark.createDataFrame(empty_data, schema)

In [0]:
# Applying groundwater data and station data for each groundwater item
groundwater_files = [i.path for i in files if i.path.endswith('.csv')]

for i in groundwater_files:
    station_id = i.split('grundvandsstand_')[1].split('.csv')[0]
    df_inc = spark.read.option("header", "true").schema(schema).csv(i)
    df_inc = df_inc.withColumn("waterlevel", round(col("waterlevel"),2))
    df_inc = df_inc.withColumn("station_id", lit(station_id))
    df_inc = df_inc.withColumn("station_name", lit(station_dict[station_id]))
    df = df.union(df_inc)

df = df.withColumn('ingestion_timestamp',current_timestamp())
df = df.withColumn('observed_date', to_date(col("observed_timestamp")))

In [0]:
# Saving
df.write.format("delta").mode("overwrite").partitionBy('station_id').save(f"{paths['silver']}/geus_groundwater/")