# Ingesting and transforming IOT sensors from Wind Turbinge using Delta Lake and Spark API

<img style="float: right" width="300px" src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/retail/resources/images/lakehouse-retail/lakehouse-retail-churn-2.png" />

In this notebook, we'll show you an alternative to Delta Live Table: building an ingestion pipeline with the Spark API.

As you'll see, this implementation is lower level than the Delta Live Table pipeline, and you'll have control over all the implementation details (handling checkpoints, data quality etc).

Lower level also means more power. Using Spark API, you'll have unlimited capabilities to ingest data in Batch or Streaming.

If you're unsure what to use, start with Delta Live Table!

*Remember that Databricks workflow can be used to orchestrate a mix of Delta Live Table pipeline with standard Spark pipeline.*

### Dataset:

As reminder, we have multiple data sources coming from different system:

* <strong>Turbine metadata</strong>: Turbine ID, location (1 row per turbine)
* <strong>Turbine sensor stream</strong>: Realtime streaming flow from wind turbine sensor (vibration, energy produced, speed etc)
* <strong>Turbine status</strong>: Historical turbine status based to analyse which part is faulty (used as label in our ML model)


Leveraging Spark and Delta Lake makes such an implementation easy.


<!-- Collect usage data (view). Remove it to disable collection. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=lakehouse&org_id=4003492105941350&notebook=%2F01-Data-ingestion%2Fplain-spark-delta-pipeline%2F01.5-Delta-pipeline-spark-iot-turbine&demo_name=lakehouse-iot-platform&event=VIEW&path=%2F_dbdemos%2Flakehouse%2Flakehouse-iot-platform%2F01-Data-ingestion%2Fplain-spark-delta-pipeline%2F01.5-Delta-pipeline-spark-iot-turbine&version=1">

In [0]:
%pip install mlflow==2.22.0

In [0]:
%run ../../_resources/00-setup

## Building a Spark Data pipeline with Delta Lake

In this example, we'll implement a end 2 end pipeline consuming our IOT sources. We'll use the medaillon architecture but could build a star schema, data vault or any other modelisation.



This can be challenging with traditional systems due to the following:
 * Data quality issue
 * Running concurrent operation
 * Running DELETE/UPDATE/MERGE over files
 * Governance & schema evolution
 * Performance ingesting millions of small files on cloud buckets
 * Processing & analysing unstructured data (image, video...)
 * Switching between batch or streaming depending of your requirement...

## Solving these challenges with Delta Lake

<div style="float:left">

**What's Delta Lake? It's a new OSS standard to bring SQL Transactional database capabilities on top of parquet files!**

Used as a new Spark format, built on top of Spark API / SQL

* **ACID transactions** (Multiple writers can simultaneously modify a data set)
* **Full DML support** (UPDATE/DELETE/MERGE)
* **BATCH and STREAMING** support
* **Data quality** (expectatiosn, Schema Enforcement, Inference and Evolution)
* **TIME TRAVEL** (Look back on how data looked like in the past)
* **Performance boost** with ZOrder, data skipping and Caching, solves small files issue 
</div>


<img src="https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-logo.png" style="height: 200px"/>

<br style="clear: both">

We'll incrementally load new data with the autoloader, enrich this information and then load a model from MLFlow to perform our predictive maintenance forecast.

This information will then be used to build our DBSQL dashboard to analyse current turbine farm and impact on stock.

Let'simplement the following flow: 
 
<div><img width="1100px" src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/manufacturing/lakehouse-iot-turbine/lakehouse-manuf-iot-turbine-spark-full.png"/></div>

*Note that we're including the ML model our [Data Scientist built](TODO) using Databricks AutoML to predict the churn.*

## ![](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) 1/ Explore the dataset

Let's review the files being received

In [0]:
%sql LIST '/Volumes/main/dbdemos_iot_turbine/turbine_raw_landing/incoming_data'

In [0]:
%sql
SELECT * FROM PARQUET.`/Volumes/main/dbdemos_iot_turbine/turbine_raw_landing/incoming_data`

### 1/ Loading our data using Databricks Autoloader (cloud_files)
<div style="float:right">
  <img width="700px" src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/manufacturing/lakehouse-iot-turbine/lakehouse-manuf-iot-turbine-spark-1.png"/>
</div>
  
Autoloader allow us to efficiently ingest millions of files from a cloud storage, and support efficient schema inference and evolution at scale.

For more details on autoloader, run `dbdemos.install('auto-loader')`

Let's use it to create our pipeline and ingest the raw JSON & CSV data being delivered in our blob storage `/demos/retail/churn/...`. 

In [0]:
%sql
-- Note: tables are automatically created during  .writeStream.table("sensor_bronze") operation, but we can also use plain SQL to create them:
CREATE TABLE IF NOT EXISTS spark_sensor_bronze (
  energy   DOUBLE,
  sensor_A DOUBLE,
  sensor_B DOUBLE,
  sensor_C DOUBLE,
  sensor_D DOUBLE,
  sensor_E DOUBLE,
  sensor_F DOUBLE,
  timestamp LONG,
  turbine_id STRING     
  ) using delta 
    CLUSTER BY (turbine_id) -- Requests by turbine ID will be faster, Databricks manage the file layout for you out of the box. 
    TBLPROPERTIES (
     delta.autooptimize.optimizewrite = TRUE,
     delta.autooptimize.autocompact   = TRUE ); 
-- With these 2 last options, Databricks engine will solve small files & optimize write out of the box!

In [0]:
volume_folder = f'/Volumes/{catalog}/{db}/{volume_name}'
def ingest_folder(folder, data_format, table):
  bronze_products = (spark.readStream
                              .format("cloudFiles")
                              .option("cloudFiles.format", data_format)
                              .option("cloudFiles.inferColumnTypes", "true")
                              .option("cloudFiles.schemaLocation", f"{volume_folder}/schema/{table}") #Autoloader will automatically infer all the schema & evolution
                              .load(folder))

  return (bronze_products.writeStream
                    .option("checkpointLocation", f"{volume_folder}/checkpoint/{table}") #exactly once delivery on Delta tables over restart/kill
                    .option("mergeSchema", "true") #merge any new column dynamically
                    .trigger(availableNow= True) #Remove for real time streaming
                    .table("spark_"+table)) #Table will be created if we haven't specified the schema first
  
ingest_folder(f'{volume_folder}/historical_turbine_status', 'json', 'spark_historical_turbine_status')
ingest_folder(f'{volume_folder}/turbine', 'json', 'spark_turbine')
ingest_folder(f'{volume_folder}/incoming_data', 'parquet', 'spark_sensor_bronze').awaitTermination()

In [0]:
%sql 
-- Note the "_rescued_data" column. If we receive wrong data not matching existing schema, it'll be stored here
select * from spark_sensor_bronze;

In [0]:
%sql 
-- Note the "_rescued_data" column. If we receive wrong data not matching existing schema, it'll be stored here
select * from spark_turbine;

In [0]:
#Let's explore a bit our datasets with pandas on spark.
first_turbine = spark.table('spark_sensor_bronze').limit(1).collect()[0]['turbine_id']
df = spark.table('spark_sensor_bronze').where(f"turbine_id == '{first_turbine}' ").orderBy('timestamp').pandas_api()
df.plot(x="timestamp", y=["sensor_F", "sensor_E"], kind="line")


## ![](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) 2/ Silver data: date cleaned

<img width="700px" style="float:right" src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/manufacturing/lakehouse-iot-turbine/lakehouse-manuf-iot-turbine-spark-2.png"/>

We can chain these incremental transformation between tables, consuming only new data.

This can be triggered in near realtime, or in batch fashion, for example as a job running every night to consume daily data.

In [0]:
import pyspark.sql.functions as F
#Compute std and percentil of our timeserie per hour
sensors = [c for c in spark.read.table("spark_sensor_bronze").columns if "sensor" in c]
aggregations = [F.avg("energy").alias("avg_energy")]
for sensor in sensors:
  aggregations.append(F.stddev_pop(sensor).alias("std_"+sensor))
  aggregations.append(F.percentile_approx(sensor, [0.1, 0.3, 0.6, 0.8, 0.95]).alias("percentiles_"+sensor))
  
df = (spark.table("spark_sensor_bronze")
          .withColumn("hourly_timestamp", F.date_trunc("hour", F.from_unixtime("timestamp")))
          .groupBy('hourly_timestamp', 'turbine_id').agg(*aggregations))

df.write.mode('overwrite').saveAsTable("spark_sensor_hourly")
display(spark.table("spark_sensor_hourly"))
#Note: a more scalable solution would be to switch to streaming API and compute the aggregation with a ~3hours watermark and MERGE (upserting) the final output. For this demo clarity we we'll go with a full table update instead.


## ![](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) 3/ Build our training dataset

<img width="700px" style="float:right" src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/manufacturing/lakehouse-iot-turbine/lakehouse-manuf-iot-turbine-spark-3.png"/>

We can chain these incremental transformation between tables, consuming only new data.

This can be triggered in near realtime, or in batch fashion, for example as a job running every night to consume daily data.

In [0]:
turbine = spark.table("spark_turbine")
health = spark.table("spark_historical_turbine_status")
(spark.table("spark_sensor_hourly")
  .join(turbine, ['turbine_id']).drop("row", "_rescued_data")
  .join(health, ['turbine_id'])
  .drop("_rescued_data")
  .write.mode('overwrite').saveAsTable("spark_turbine_training_dataset"))

display(spark.table("spark_turbine_training_dataset"))


## ![](https://pages.databricks.com/rs/094-YMS-629/images/delta-lake-tiny-logo.png) 4/ Call the ML model and get realtime turbine metrics

<img width="700px" style="float:right" src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/manufacturing/lakehouse-iot-turbine/lakehouse-manuf-iot-turbine-spark-4.png"/>

We can chain these incremental transformation between tables, consuming only new data.

This can be triggered in near realtime, or in batch fashion, for example as a job running every night to consume daily data.

In [0]:
import mlflow
mlflow.set_registry_uri('databricks-uc')
#                                                                                                    Stage/version  
#                                                                                       Model name         |        
#                                                                                           |              |        
predict_maintenance = mlflow.pyfunc.spark_udf(spark, f"models:/{catalog}.{db}.dbdemos_turbine_maintenance@prod", "string", env_manager='virtualenv')
#We can use the function in SQL
spark.udf.register("predict_maintenance", predict_maintenance)
columns = predict_maintenance.metadata.get_input_schema().input_names()

In [0]:
w = Window.partitionBy("turbine_id").orderBy(col("hourly_timestamp").desc())
(spark.table("spark_sensor_hourly")
  .withColumn("row", F.row_number().over(w))
  .filter(col("row") == 1)
  .join(spark.table('spark_turbine'), ['turbine_id']).drop("row", "_rescued_data")
  .withColumn("prediction", predict_maintenance(*columns))
  .write.mode('overwrite').saveAsTable("spark_current_turbine_metrics"))

In [0]:
%sql select * from spark_current_turbine_metrics

## Simplify your operations with transactional DELETE/UPDATE/MERGE operations

Traditional Data Lake struggle to run these simple DML operations. Using Databricks and Delta Lake, your data is stored on your blob storage with transactional capabilities. You can issue DML operation on Petabyte of data without having to worry about concurrent operations.

In [0]:
spark.sql(f"DELETE FROM spark_sensor_bronze where turbine_id='{first_turbine}'")

In [0]:
%sql describe history spark_sensor_bronze;

In [0]:
%sql 
 --also works with AS OF TIMESTAMP "yyyy-MM-dd HH:mm:ss"
select * from spark_sensor_bronze version as of 1 ;

-- You made the DELETE by mistake ? You can easily restore the table at a given version / date:
-- RESTORE TABLE spark_sensor_bronze TO VERSION AS OF 1

-- Or clone it (SHALLOW provides zero copy clone):
-- CREATE TABLE spark_sensor_bronze_clone SHALLOW|DEEP CLONE sensor_bronze VERSION AS OF 1

-- Turn on CDC to capture insert/update/delete operation:
-- ALTER TABLE spark_sensor_bronze SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

In [0]:
%sql
--Note: can be turned on by default or for all the database
ALTER TABLE spark_turbine                  SET TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE );
ALTER TABLE spark_sensor_bronze            SET TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE );
ALTER TABLE spark_current_turbine_metrics  SET TBLPROPERTIES (delta.autooptimize.optimizewrite = TRUE, delta.autooptimize.autocompact = TRUE );

## Our finale tables are now ready to be used to build SQL Dashboards and ML models for predictive maintenance!
<img style="float: right" width="400" src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/manufacturing/lakehouse-iot-turbine/lakehouse-manuf-iot-dashboard-1.png"/>

Switch to Databricks SQL to see how this data can easily be requested with the [Turbine DBSQL Dashboard](/sql/dashboards/a6bb11d9-1024-47df-918d-f47edc92d5f4) to start reviewing our Wind Turbine stats or the [DBSQL Predictive maintenance Dashboard](/sql/dashboards/d966eb63-6d37-4762-b90f-d3a2b51b9ba8).

Creating a single flow was simple.  However, handling many data pipeline at scale can become a real challenge:
* Hard to build and maintain table dependencies 
* Difficult to monitor & enforce advance data quality
* Impossible to trace data lineage
* Difficult pipeline operations (observability, error recovery)


#### To solve these challenges, Databricks introduced **Delta Live Table**
A simple way to build and manage data pipelines for fresh, high quality data!

# Next: secure and share data with Unity Catalog

Now that these tables are available in our Lakehouse, let's review how we can share them with the Data Scientists and Data Analysts teams.

Jump to the [Governance with Unity Catalog notebook]($../../02-Data-governance/02-UC-data-governance-security-iot-turbine) or [Go back to the introduction]($../../00-IOT-wind-turbine-introduction-lakehouse)