In [0]:

volume_path = f"/Volumes/{catalog_name}/raw/raw_data/farms.csv"

farms_df = (
    spark.read
         .option("header", True)
         .option("inferSchema", False)
         .csv(volume_path)
)

farms_df.write.mode("overwrite").saveAsTable(f'{catalog_name}.bronze.farms')

In [0]:

from pyspark.sql.functions import col, when, lit
from datetime import date

# Read Bronze table 
farms_bronze_df = (
    spark.read
         .format("delta")
         .table(f'{catalog_name}.bronze.farms')
)

#Apply schema
farms_bronze_df = (
    farms_bronze_df
        .select(
            col("farm_id").cast("string"),
            col("region").cast("string"),
            col("crop_type").cast("string"),
            col("area_hectares").cast("long"),
            col("owner_name").cast("string"),
            col("start_date").cast("date")
        )
)

# Validation
today = date.today()

valid_crop_types = ["Corn", "Soy", "Wheat", "Barley", "Peas", "Rye", "Oats", "Potatoes", "Carrots"]
valid_date_condition = (
    (col("start_date").isNotNull()) &
    (col("start_date") >= "2000-01-01") &
    (col("start_date") <= str(today))
)

validated_df = farms_bronze_df.withColumn(
    "validation_status",
    when(
        (col("farm_id").isNotNull()) &
        (col("region").isNotNull()) &
        (col("crop_type").isin(valid_crop_types)) &
        (col("area_hectares") > 0) &
        valid_date_condition,
        lit("valid")
    ).otherwise(lit("invalid"))
)

# Handle duplicates 
validated_df = validated_df.dropDuplicates()

# Write to Silver layer 
validated_df.write.mode("overwrite").format("delta").saveAsTable(f'{catalog_name}.silver.farms')


In [0]:
volume_path = f"/Volumes/{catalog_name}/raw/raw_data/sensor_readings.csv"

sensor_readings_df = (
    spark.read
         .option("header", True)
         .option("inferSchema", False)
         .csv(volume_path)
)
sensor_readings_df.write.mode("overwrite").saveAsTable(f"{catalog_name}.bronze.sensor_readings")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when, lit, trim
from datetime import date


# Function for timestamp standardization
def standardise_timestamp(df, col_name, formats):
    parsed_ts = F.coalesce(*[
        F.expr(f"try_to_timestamp({col_name}, '{fmt}')") for fmt in formats
    ])
    return df.withColumn(col_name, parsed_ts)


# Read Bronze table 
sensor_readings_df = (
    spark.read
         .format("delta")
         .option("mergeSchema", "true")
         .table(f"{catalog_name}.bronze.sensor_readings")
)

# Apply Schema
sensor_readings_df = (
    sensor_readings_df
        .select(
            col("reading_id").cast("string"),
            col("sensor_type").cast("string"),
            col("reading_value").cast("double"),
            col("reading_ts").cast("string"),
            col("farm_id").cast("string")
        )
)

# Standardize timestamps

timestamp_formats = [
    "dd/MM/yyyy HH:mm:ss",
    "dd/MM/yyyy HH:mm",
    "yyyy-MM-dd HH:mm:ss",
    "yyyy/MM/dd HH:mm:ss"
]

sensor_readings_df = standardise_timestamp(
        sensor_readings_df,
        col_name="reading_ts",
        formats=timestamp_formats
)

# Remove duplicates
# Assume unique key: reading_id + reading_ts

sensor_readings_df = sensor_readings_df.dropDuplicates(["reading_id", "reading_ts"])


# Validation

valid_sensor_types = ["Temperature", "SoilMoisture"]

validated_df = sensor_readings_df.withColumn(
    "validation_status",
    when(
        (col("reading_id").isNotNull()) &
        (col("farm_id").isNotNull()) &
        (col("reading_ts").isNotNull()) &
        (col("reading_value").isNotNull()) &
        (col("sensor_type").isin(valid_sensor_types)),
        lit("valid")
    ).otherwise(lit("invalid"))
)

validated_df.where(col("validation_status") == "invalid").display()

In [0]:
validated_df = validated_df.withColumn(
    "sensor_type",
    when(col("sensor_type") == "SoilM", "SoilMoisture")
    .when(col("sensor_type") == "Temp", "Temperature")
    .otherwise(col("sensor_type"))
)


validated_df = validated_df.withColumn(
    "reading_ts",
    when(
        (col("reading_id") == "READ_00324") & (col("farm_id") == "FARM_003"),
        F.to_timestamp(lit("08-03-2023 17:31:00"), "dd-MM-yyyy HH:mm:ss")
    ).otherwise(
        col("reading_ts")
    )
)

#validated_df.display()

In [0]:
#Save to silver
validated_df.write.mode("overwrite").format("delta").saveAsTable(f"{catalog_name}.silver.sensor_readings")

In [0]:
from pyspark.sql.functions import avg, date_trunc, to_date

df = spark.read.format("delta").table(f"{catalog_name}.silver.sensor_readings")
farms_df = spark.read.format("delta").table(f"{catalog_name}.silver.farms")
#doing this to generate lineage, not really required
df = df.join(farms_df, on="farm_id", how="left")

avg_daily_df = (
    df.withColumn("date", to_date(col("reading_ts")))
      .groupBy("farm_id", "date", "sensor_type")
      .agg(avg("reading_value").alias("avg_reading_value"))
      .orderBy("farm_id", "date", "sensor_type")
)

display(avg_daily_df)
avg_daily_df.write.mode("overwrite").format("delta").saveAsTable(f"{catalog_name}.gold.avg_daily_moist_temp")

In [0]:
%sql
select * from ARE_PROJECT.gold.avg_daily_moist_temp
where sensor_type = 'SoilMoisture'

Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, when, sum as spark_sum, date_diff

avg_daily_df = spark.read.format("delta").table(f"{catalog_name}.gold.avg_daily_moist_temp")


# Filter for SoilMoisture and avg_reading_value < 30
low_moisture_df = avg_daily_df.filter(
    (col("sensor_type") == "SoilMoisture") & (col("avg_reading_value") < 30)
)

# Assign a flag for each day with low moisture
low_moisture_df = low_moisture_df.withColumn("low_flag", when(col("avg_reading_value") < 30, 1).otherwise(0))

# Create a window partitioned by farm_id and ordered by date
window_spec = Window.partitionBy("farm_id").orderBy("date")

# Identify consecutive days by checking if the previous day is also low moisture
low_moisture_df = low_moisture_df.withColumn(
    "prev_date",
    lag("date").over(window_spec)
)

low_moisture_df = low_moisture_df.withColumn(
    "is_consecutive",
    when(
        (col("prev_date").isNotNull()) &
        (date_diff(col("date"), col("prev_date")) == 1),
        1
    ).otherwise(0)
)

# Assign a group id for consecutive low moisture days
from pyspark.sql.functions import sum as spark_sum

low_moisture_df = low_moisture_df.withColumn(
    "group_id",
    spark_sum(1 - col("is_consecutive")).over(window_spec)
)

# Count consecutive days in each group
consecutive_days_df = (
    low_moisture_df.groupBy("farm_id", "group_id")
    .agg(
        spark_sum("low_flag").alias("consecutive_days"),
        F.min("date").alias("start_date"),
        F.max("date").alias("end_date")
    )
    .filter(col("consecutive_days") >= 3)
)

result = consecutive_days_df.select("farm_id", "start_date", "end_date", "consecutive_days")
result.display()

result.write.mode("overwrite").format("delta").saveAsTable(f"{catalog_name}.gold.consecutive_days_low_moist")