In [0]:
from datetime import datetime, timedelta

# Create widgets for timestamp min and max
dbutils.widgets.text("timestamp_min", (datetime.now() - timedelta(days=2)).strftime("%Y-%m-%d"), "Timestamp Min")
dbutils.widgets.text("timestamp_max", datetime.now().strftime("%Y-%m-%d"), "Timestamp Max")

timestamp_min = dbutils.widgets.get("timestamp_min")
timestamp_max = dbutils.widgets.get("timestamp_max")

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, row_number, col, avg, stddev, min, max

transformed_data_all_df = spark.read.table("wind_farm.default.transformed_data").where(f"timestamp >= '{timestamp_min}' AND timestamp <= '{timestamp_max}'")
window_spec = Window.partitionBy("turbine_id").orderBy(desc("timestamp"))

transformed_data_latest_df = transformed_data_all_df.withColumn("row_number", row_number().over(window_spec)).filter(col("row_number") == 1).drop("row_number")

display(transformed_data_latest_df)



timestamp,turbine_id,wind_speed,wind_direction,power_output
2022-03-31T23:00:00Z,1,13.7,51,3.7
2022-03-31T23:00:00Z,2,10.3,244,2.7
2022-03-31T23:00:00Z,3,10.1,112,2.0
2022-03-31T23:00:00Z,4,10.1,43,2.7
2022-03-31T23:00:00Z,5,13.5,55,1.9
2022-03-31T23:00:00Z,6,9.9,37,3.0
2022-03-31T23:00:00Z,7,11.1,225,2.5
2022-03-31T23:00:00Z,8,14.3,21,3.9
2022-03-31T23:00:00Z,9,15.0,102,3.1
2022-03-31T23:00:00Z,10,10.1,19,2.7


In [0]:
# Calculate min, max, and average power output
aggregated_df = transformed_data_all_df.groupBy("turbine_id").agg(
    min("power_output").alias("min_power_output"),
    max("power_output").alias("max_power_output"),
    avg("power_output").alias("avg_power_output")
)


In [0]:

# Calculate mean and standard deviation for anomaly detection
stats_df = transformed_data_all_df.groupBy("turbine_id").agg(
    avg("power_output").alias("mean_power_output"),
    stddev("power_output").alias("stddev_power_output")
)


In [0]:

# Join the stats with the original data
joined_df = transformed_data_latest_df.join(stats_df, on="turbine_id")

# Identify anomalies
anomalies_df = joined_df.withColumn(
    "is_anomaly",
    (col("power_output") > col("mean_power_output") + 2 * col("stddev_power_output")) |
    (col("power_output") < col("mean_power_output") - 2 * col("stddev_power_output"))
)

display(anomalies_df)

turbine_id,timestamp,wind_speed,wind_direction,power_output,mean_power_output,stddev_power_output,is_anomaly
12,2022-03-31T23:00:00Z,9.5,189,3.2,3.051209677419356,0.8610761298124978,False
1,2022-03-31T23:00:00Z,13.7,51,3.7,3.016397849462364,0.8572389449056392,False
13,2022-03-31T23:00:00Z,12.8,302,4.5,3.0313172043010725,0.8773202723151071,False
6,2022-03-31T23:00:00Z,9.9,37,3.0,2.9838709677419377,0.8741128566464673,False
3,2022-03-31T23:00:00Z,10.1,112,2.0,2.97997311827957,0.8626320481064984,False
5,2022-03-31T23:00:00Z,13.5,55,1.9,3.016532258064516,0.8658829219297753,False
15,2022-03-31T23:00:00Z,12.0,56,2.3,3.0364247311827937,0.8537802013874646,False
9,2022-03-31T23:00:00Z,15.0,102,3.1,3.002822580645162,0.8742649845618979,False
4,2022-03-31T23:00:00Z,10.1,43,2.7,2.9463709677419327,0.8870309797871537,False
8,2022-03-31T23:00:00Z,14.3,21,3.9,2.98481182795699,0.8917585675170866,False


In [0]:
window_spec = Window.partitionBy("turbine_id").orderBy(desc("timestamp"))
result_df = anomalies_df

display(result_df)

turbine_id,timestamp,wind_speed,wind_direction,power_output,mean_power_output,stddev_power_output,is_anomaly
12,2022-03-31T23:00:00Z,9.5,189,3.2,3.051209677419356,0.8610761298124978,False
1,2022-03-31T23:00:00Z,13.7,51,3.7,3.016397849462364,0.8572389449056392,False
13,2022-03-31T23:00:00Z,12.8,302,4.5,3.0313172043010725,0.8773202723151071,False
6,2022-03-31T23:00:00Z,9.9,37,3.0,2.9838709677419377,0.8741128566464673,False
3,2022-03-31T23:00:00Z,10.1,112,2.0,2.97997311827957,0.8626320481064984,False
5,2022-03-31T23:00:00Z,13.5,55,1.9,3.016532258064516,0.8658829219297753,False
15,2022-03-31T23:00:00Z,12.0,56,2.3,3.0364247311827937,0.8537802013874646,False
9,2022-03-31T23:00:00Z,15.0,102,3.1,3.002822580645162,0.8742649845618979,False
4,2022-03-31T23:00:00Z,10.1,43,2.7,2.9463709677419327,0.8870309797871537,False
8,2022-03-31T23:00:00Z,14.3,21,3.9,2.98481182795699,0.8917585675170866,False


In [0]:
result_df.createOrReplaceTempView("temp_anomalies")

spark.sql("""
CREATE TABLE IF NOT EXISTS wind_farm.default.gold_data (
  turbine_id STRING,
  timestamp TIMESTAMP,
  power_output DOUBLE,
  mean_power_output DOUBLE,
  stddev_power_output DOUBLE,
  is_anomaly BOOLEAN
)
""")

spark.sql("""
MERGE INTO wind_farm.default.gold_data AS target
USING temp_anomalies AS source
ON target.turbine_id = source.turbine_id AND target.timestamp = source.timestamp
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
""")

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
select * from wind_farm.default.gold_data

turbine_id,timestamp,power_output,mean_power_output,stddev_power_output,is_anomaly
12,2022-03-31T23:00:00Z,3.2,3.051209677419356,0.8610761298124978,False
1,2022-03-31T23:00:00Z,3.7,3.016397849462364,0.8572389449056392,False
13,2022-03-31T23:00:00Z,4.5,3.0313172043010725,0.8773202723151071,False
6,2022-03-31T23:00:00Z,3.0,2.9838709677419377,0.8741128566464673,False
3,2022-03-31T23:00:00Z,2.0,2.97997311827957,0.8626320481064984,False
5,2022-03-31T23:00:00Z,1.9,3.016532258064516,0.8658829219297753,False
15,2022-03-31T23:00:00Z,2.3,3.0364247311827937,0.8537802013874646,False
9,2022-03-31T23:00:00Z,3.1,3.002822580645162,0.8742649845618979,False
4,2022-03-31T23:00:00Z,2.7,2.9463709677419327,0.8870309797871537,False
8,2022-03-31T23:00:00Z,3.9,2.98481182795699,0.8917585675170866,False
