<a href="https://colab.research.google.com/github/StefanBelgica/homeassignment1/blob/main/Colibri_Home_Exercise.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Prerequisites to running PySpark and SQLite in Google Collab

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark
!curl -O https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.34.0/sqlite-jdbc-3.34.0.jar

## Set up environment variables

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

## Import PySpark and create Spark Session

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").config(
        "spark.jars",
        "sqlite-jdbc-3.34.0.jar".format(os.getcwd())).config(
        "spark.driver.extraClassPath",
        "sqlite-jdbc-3.34.0.jar".format(os.getcwd())).getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

## PySpark function imports

In [None]:
from pyspark.sql.functions import col, mean, stddev, max, min, window

## Check that our CSV's are in place (these were imported to the root of Google Drive and not to the Collab session as it does not keep files after deactivation)

In [None]:
!ls drive/MyDrive/'Colibri Dataset'

data_group_1.csv  data_group_2.csv  data_group_3.csv


## Read the data and inspect the schema

In [None]:
data = spark.read.csv("drive/MyDrive/Colibri Dataset/", header=True, sep=',', inferSchema=True)
data.show()
print(data.schema)
print(f'Number of entries: {data.count()}')

+-------------------+----------+----------+--------------+------------+
|          timestamp|turbine_id|wind_speed|wind_direction|power_output|
+-------------------+----------+----------+--------------+------------+
|2022-03-01 00:00:00|        11|       9.1|           269|         2.9|
|2022-03-01 00:00:00|        12|      11.3|           316|         2.5|
|2022-03-01 00:00:00|        13|      11.2|           148|         3.7|
|2022-03-01 00:00:00|        14|      10.7|            97|         1.6|
|2022-03-01 00:00:00|        15|      11.0|            81|         4.4|
|2022-03-01 01:00:00|        11|      12.3|           245|         1.8|
|2022-03-01 01:00:00|        12|      11.0|           293|         2.2|
|2022-03-01 01:00:00|        13|      11.4|           270|         1.9|
|2022-03-01 01:00:00|        14|      10.4|           140|         2.3|
|2022-03-01 01:00:00|        15|      14.6|           283|         4.3|
|2022-03-01 02:00:00|        11|      14.3|           135|      

## Drop missing values, as imputing them with the mean value or something similar would affect our stats and then mistakenly mark anomalies

In [None]:
cleaned_data = data.dropna()

## Calculate summary stats

In [None]:
summary_stats = (
    cleaned_data.groupBy("turbine_id", window("timestamp", "24 hours"))
    .agg(
        min(col("power_output")).alias("min_power_output"),
        max(col("power_output")).alias("max_power_output"),
        mean(col("power_output")).alias("mean_power_output"),
        stddev(col("power_output")).alias("stddev_power_output"),
        )
    )
summary_stats

turbine_id,window,min_power_output,max_power_output,mean_power_output,stddev_power_output
6,{2022-03-09 00:00...,1.5,4.5,2.816666666666667,0.978759937110604
2,{2022-03-01 00:00...,1.6,4.4,2.9833333333333325,0.980535198431101
1,{2022-03-06 00:00...,1.5,4.4,2.958333333333333,0.9245053514866944
13,{2022-03-01 00:00...,1.8,4.3,3.0333333333333337,0.796004515317468
11,{2022-03-11 00:00...,1.6,4.1,2.891666666666667,0.7552233569220299
7,{2022-03-28 00:00...,1.8,4.5,3.0250000000000004,0.846090754716887
1,{2022-03-25 00:00...,1.5,4.4,3.0666666666666664,0.8646017764751921
1,{2022-03-13 00:00...,1.5,4.5,2.904166666666667,0.9424663143174872
14,{2022-03-09 00:00...,1.9,4.3,3.1625,0.7716794220625445
13,{2022-03-23 00:00...,1.6,4.4,3.266666666666666,0.9253279872877352


## Get anomalies from cleaned dataset

In [None]:
anomalies = (
    summary_stats.withColumn("lower_bound", col("mean_power_output") - 2 * col("stddev_power_output"))
    .withColumn("upper_bound", col("mean_power_output") + 2 * col("stddev_power_output"))
    .join(cleaned_data, "turbine_id")
    .filter((col("power_output") < col("lower_bound")) | (col("power_output") > col("upper_bound")))
    .select(col("timestamp"), col("turbine_id"), col("wind_speed"), col("wind_direction"), col("power_output"), col("mean_power_output"), col("lower_bound"), col("upper_bound"))
)
anomalies

timestamp,turbine_id,wind_speed,wind_direction,power_output,mean_power_output,lower_bound,upper_bound
2022-03-01 00:00:00,14,10.7,97,1.6,2.966666666666667,1.7470207213677624,4.186312611965571
2022-03-01 00:00:00,14,10.7,97,1.6,3.304166666666666,1.6055952293887892,5.002738103944543
2022-03-01 00:00:00,14,10.7,97,1.6,3.1625,1.6191411558749107,4.705858844125089
2022-03-01 00:00:00,15,11.0,81,4.4,2.6625,1.0583613309426094,4.266638669057391
2022-03-01 00:00:00,15,11.0,81,4.4,2.995833333333333,1.6146187869328088,4.3770478797338574
2022-03-01 00:00:00,15,11.0,81,4.4,2.7125,1.2771542607077735,4.147845739292226
2022-03-01 01:00:00,11,12.3,245,1.8,3.225,1.855455486698938,4.594544513301062
2022-03-01 01:00:00,15,14.6,283,4.3,2.6625,1.0583613309426094,4.266638669057391
2022-03-01 01:00:00,15,14.6,283,4.3,2.7125,1.2771542607077735,4.147845739292226
2022-03-01 02:00:00,15,9.7,303,4.5,2.6625,1.0583613309426094,4.266638669057391


In [None]:
import sqlite3

## Attempt to write to SQLite database (doesn't work in collab)

In [None]:
with sqlite3.connect("cleaned_data.db"):
  cleaned_data.write.jdbc("jdbc:sqlite:cleaned_data.db", "cleaned_data", mode="overwrite")
with sqlite3.connect("summary_stats.db"):
  summary_stats.write.jdbc("jdbc:sqlite:summary_stats.db", "summary_stats", mode="overwrite")


Py4JJavaError: ignored