# Explanation

This notebook is split into 3 sections:
## EDA
I loaded all the data into a table in my labs area in Unity in our set up and did some EDA on the data to understand it\'s features.
## Pipeline
These are the cells that form the pipeline. This section would be extracted into a separate ingestion notebook and run as a notebook task from Databricks Workflows/Jobs. 

I would deploy this using DAB, which would define the pipeline and the job cluster which would be set to autoscale. With some refactoring, the pipeline could alternatively be changed into an SQL task and run on a DW. 

## Additional Tests
I did some additional tests/sanity checks to ensure that imputation worked for missing values. The mechanism to impute any missing values and label any anomalies is in a function and so can be tested with test data and expected values.

# EDA

In [None]:
from pyspark.sql.functions import *


In [None]:
# I just loaded the data into a table for easy access for EDA
wt_df = spark.table("dev_labs.lab_alistair_mclean.wt_data")
wt_df.limit(10).display()

timestamp,turbine_id,wind_speed,wind_direction,power_output
2022-03-01T00:00:00Z,6,15.0,112,4.2
2022-03-01T00:00:00Z,7,10.8,308,3.9
2022-03-01T00:00:00Z,8,12.6,201,2.9
2022-03-01T00:00:00Z,9,9.5,8,3.2
2022-03-01T00:00:00Z,10,10.7,44,3.4
2022-03-01T01:00:00Z,6,14.4,213,3.5
2022-03-01T01:00:00Z,7,14.8,246,4.5
2022-03-01T01:00:00Z,8,11.4,289,3.2
2022-03-01T01:00:00Z,9,13.1,0,1.6
2022-03-01T01:00:00Z,10,14.5,337,3.3


In [None]:
wt_pd = wt_df.toPandas()
wt_pd.describe()

Unnamed: 0,turbine_id,wind_speed,wind_direction,power_output
count,11160.0,11160.0,11160.0,11159.0
mean,8.0,12.002724,179.903943,3.001972
std,4.320687,1.735366,103.353513,0.869857
min,1.0,9.0,0.0,1.5
25%,4.0,10.5,91.0,2.25
50%,8.0,12.0,180.0,3.0
75%,12.0,13.5,269.0,3.8
max,15.0,15.0,359.0,4.5


# get some basic summary stats for the given data

In [None]:
wt_df.describe().display()

summary,turbine_id,wind_speed,wind_direction,power_output
count,11160.0,11160.0,11160.0,11159.0
mean,8.0,12.002724014336922,179.90394265232976,3.0019715028228355
stddev,4.320687382459022,1.7353660714427142,103.35351302055336,0.8698571304534896
min,1.0,9.0,0.0,1.5
max,15.0,15.0,359.0,4.5


## Any nulls

In [None]:
null_counts = wt_df.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in wt_df.columns if c != 'timestamp']).display()


turbine_id,wind_speed,wind_direction,power_output
0,0,0,0


## Any duplicates?

In [None]:
%sql
select count(*) from dev_labs.lab_alistair_mclean.wt_data
group by timestamp, turbine_id
having count(*) > 1

count(1)


## Any missing hours?

In [None]:
%sql
select * from (
select *, CASE WHEN CAST(DATE_DIFF AS INT) > 3600 THEN 1 ELSE 0 END as comparison 
from (
  select *,
    timestamp - LAG(timestamp, 1) OVER (partition by turbine_id ORDER by timestamp) AS DATE_DIFF, 
    MAX(timestamp) OVER (partition by turbine_id) AS max_date
  from dev_labs.lab_alistair_mclean.wt_data
  )
)
where comparison = 1
order by turbine_id, timestamp

timestamp,turbine_id,wind_speed,wind_direction,power_output,DATE_DIFF,max_date,comparison


# Pipeline for Ingestion

The described scenario is that new data is appended to each file 1/day.
A given time period is suggested (24 hours) for the summary stats but this could be a parameter. If the param was longer than 24 hours then the whole file would be needed, otherwise just the new data could be used.

Although the EDA above shows there are no missing values, duplicates or nulls, the description does indicate this can happen. This evidence indicates this is rare and therefore we can impute any missing wind direction, speed, and power using the mean of those values for the particular turbine and time (over the day's worth of data). A better approximation would use historic data and could use seasonality and possibly a bias for the given day using the day's mean.

For a v1, we'll assume 24 hours as a fixed time period, which means we can take the last day's worth of data when this job is run.

The same input files are overwritten each day which means it doesn't make sense to use autoloader as it generally processes files once only. So we'll just reread the entire file each day and extract the previous day's data. 

The following would be a run as a notebook task, perhaps scheduled on a daily basis to run at a certain time, or triggered using some orchestration system so that the data arrival is complete before execution.

The steps are:
1. load all the data into a dataframe and filter to keep only the latest data
2. compute the stats and anomalies and do any imputation
3. write the results in append mode as we are only doing 1 day of data each run
   (if multiple runs are possible/day or for additional resilience we could use a merge op)

In [None]:
import datetime as dt
# Let's test with the last day of the provided data
#yesterday = (dt.datetime.now() - dt.timedelta(days=1)).strftime("%Y-%m-%d")
yesterday = (dt.datetime(2022, 4, 1) - dt.timedelta(days=1)).strftime("%Y-%m-%d")

wt_df_y = spark.table("dev_labs.lab_alistair_mclean.wt_data").where(f"cast(timestamp as date) = '{yesterday}'")
# use this when reading from source
# wt_df = spark.csv(header=True, inferSchema=True, path="<path to files>/data_group*_.csv").where(f"cast(timestamp as date) = '{yesterday}'")

# remove when ready for prod
wt_df_y.limit(10).display()

timestamp,turbine_id,wind_speed,wind_direction,power_output
2022-03-31T00:00:00Z,11,12.5,71,2.4
2022-03-31T00:00:00Z,12,13.2,265,3.4
2022-03-31T00:00:00Z,13,11.0,168,2.3
2022-03-31T00:00:00Z,14,10.5,96,3.4
2022-03-31T00:00:00Z,15,13.7,180,4.2
2022-03-31T01:00:00Z,11,9.3,38,4.2
2022-03-31T01:00:00Z,12,14.8,74,2.5
2022-03-31T01:00:00Z,13,12.6,68,2.8
2022-03-31T01:00:00Z,14,12.6,171,3.4
2022-03-31T01:00:00Z,15,12.2,325,2.8


### Compute the stats and anomalies for the day

In [None]:
def compute_stats(df : DataFrame) -> DataFrame:
    return spark.sql("""
        select
        * except(power_output),
        case when isnull(power_output) then mean(power_output) over (partition by turbine_id) else power_output end as power_output,
        min(power_output) over (partition by turbine_id) as min_power,
        max(power_output) over (partition by turbine_id) as max_power,
        mean(power_output) over (partition by turbine_id) as avg_power,
        stddev(power_output) over (partition by turbine_id) as std_dev,
        case
            when
            abs(power_output - mean(power_output) over (partition by turbine_id)) > 2.0
            * stddev(power_output) over (partition by turbine_id)
            then
            true
            else false
        end as is_anomaly
        from
        wt
        """)

In [None]:
wt_df_y.createOrReplaceTempView("wt")
res = compute_stats(wt_df_y)

# remove when read for prod
res.display()

timestamp,turbine_id,wind_speed,wind_direction,power_output,min_power,max_power,avg_power,std_dev,is_anomaly
2022-03-31T01:00:00Z,1,10.7,238,2.6,1.6,4.3,2.958333333333334,0.7706105240068714,False
2022-03-31T02:00:00Z,1,15.0,295,4.0,1.6,4.3,2.958333333333334,0.7706105240068714,False
2022-03-31T03:00:00Z,1,11.3,214,3.3,1.6,4.3,2.958333333333334,0.7706105240068714,False
2022-03-31T04:00:00Z,1,9.8,312,2.5,1.6,4.3,2.958333333333334,0.7706105240068714,False
2022-03-31T05:00:00Z,1,13.0,232,2.1,1.6,4.3,2.958333333333334,0.7706105240068714,False
2022-03-31T06:00:00Z,1,10.5,304,3.0,1.6,4.3,2.958333333333334,0.7706105240068714,False
2022-03-31T07:00:00Z,1,9.1,305,2.8,1.6,4.3,2.958333333333334,0.7706105240068714,False
2022-03-31T08:00:00Z,1,14.7,131,3.2,1.6,4.3,2.958333333333334,0.7706105240068714,False
2022-03-31T09:00:00Z,1,9.1,242,3.1,1.6,4.3,2.958333333333334,0.7706105240068714,False
2022-03-31T10:00:00Z,1,10.2,279,2.2,1.6,4.3,2.958333333333334,0.7706105240068714,False


### Use append mode to add the data to the stats table

In [None]:
res.write.mode('append').saveAsTable('dev_labs.lab_alistair_mclean.wt_data_processed')

In [None]:
##############
# If we want to be more 'safe' in case of multiple runs in a day then we could use a merge into

# from delta.tables import *

# # Load the Delta table
# target = DeltaTable.forName(spark, 'dev_labs.lab_alistair_mclean.wt_data_processed')

# target.alias("target").merge(
#     res.alias("source"),
#     "target.timestamp = source.timestamp AND target.turbine_id = source.turbine_id"
# ).whenMatchedUpdateAll(
# ).whenNotMatchedInsertAll(
# ).execute()
###############

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

# End of pipeline

The above cells would be extracted into a separate notebook which would then be run as a notebook task using workflows.

Some additional tests below

### Test imputation for null values.
The assumption is that the row will be written but the power output will be null, rather than no row being written at all.~

In [None]:
old_power = spark.sql("SELECT power_output FROM dev_labs.lab_alistair_mclean.wt_data WHERE turbine_id = 1 AND timestamp = '2022-03-31 00:00:00'")
old_power.collect()[0][0]  # 2.5
# use this to set back if needed

2.5

In [None]:
%sql
-- set one of the power_putput cells to null and test imputation
update dev_labs.lab_alistair_mclean.wt_data set power_output = null where turbine_id = 1 and timestamp = '2022-03-31 00:00:00';

select * from wt where power_output is null;


timestamp,turbine_id,wind_speed,wind_direction,power_output
2022-03-31T00:00:00Z,1,14.7,186,


### Imputation works

In [None]:
compute_stats(wt_df_y).display()

timestamp,turbine_id,wind_speed,wind_direction,power_output,min_power,max_power,avg_power,std_dev,is_anomaly
2022-03-31T01:00:00Z,1,10.7,238,2.6,1.6,4.3,2.978260869565218,0.7815814514286986,False
2022-03-31T02:00:00Z,1,15.0,295,4.0,1.6,4.3,2.978260869565218,0.7815814514286986,False
2022-03-31T03:00:00Z,1,11.3,214,3.3,1.6,4.3,2.978260869565218,0.7815814514286986,False
2022-03-31T04:00:00Z,1,9.8,312,2.5,1.6,4.3,2.978260869565218,0.7815814514286986,False
2022-03-31T05:00:00Z,1,13.0,232,2.1,1.6,4.3,2.978260869565218,0.7815814514286986,False
2022-03-31T06:00:00Z,1,10.5,304,3.0,1.6,4.3,2.978260869565218,0.7815814514286986,False
2022-03-31T07:00:00Z,1,9.1,305,2.8,1.6,4.3,2.978260869565218,0.7815814514286986,False
2022-03-31T08:00:00Z,1,14.7,131,3.2,1.6,4.3,2.978260869565218,0.7815814514286986,False
2022-03-31T09:00:00Z,1,9.1,242,3.1,1.6,4.3,2.978260869565218,0.7815814514286986,False
2022-03-31T10:00:00Z,1,10.2,279,2.2,1.6,4.3,2.978260869565218,0.7815814514286986,False
