In [4]:
%%HTML
<link rel="stylesheet" type="text/css" href="custom.css">

# Workshop - Energy Time Series Storage and Analytics 

###  Michael Vollmer and Holger Trittenbach

# Production Monitoring with Smart Meters

![ipe1](https://www.ipe.kit.edu/img/Reinraum_V2.jpg)
<img src="https://www.ipe.kit.edu/img/Hybridlabor.jpg" alt="ipe2" width="50%"/>

# Introduction
<img src="img/intro_graphic_1.png" alt="Intro1" width="100%"/>

# Introduction
<img src="img/intro_graphic_2.png" alt="Intro2" width="100%"/>

# Introduction
<img src="img/intro_graphic_3.png" alt="Intro3" width="100%"/>

# Introduction
<img src="img/intro_graphic_4.png" alt="Intro4" width="100%"/>

# Introduction
<img src="img/intro_graphic_5.png" alt="Intro5" width="100%"/>

# Introduction
<img src="img/intro_graphic_6.png" alt="Intro6" width="100%"/>

# Introduction
### This workshop gives insights into modern stream processing

* **Challenges**
 * Functional requirements
 * Optimazation goals
* **Concepts**
 * Software architecture
 * Conflicts and design choices
* **A competetive and available solution**
 * Concrete frameworks
 * Brief demonstrations

### This workshop does not
* Discuss network protocols
* Provide a full tutorial on software configuration and usage
* Claim the definitive solution for everyone 

# Architecture - Soft Goals

* Consistency
* Fault Tolerance
* Throughput / Performance
* Scalability
* Extensibility
* Security
* Maintainability
* Price

In [2]:
%%html
<style>
table {float:left}
table {width: 60%}
</style>

# Architecture - Number of Servers


<span style="font-size:30pt">Single Server</span> | <span style="font-size:30pt">Cluster</span> 
------------- | -------
<span style="font-size:25pt;color:#009682">Consistency</span> | <span style="font-size:25pt;color:#A22223">Consistency</span>
<span style="font-size:25pt;color:#A22223">Fault Tolerance</span> | <span style="font-size:25pt;color:#009682">Fault Tolerance</span>
<span style="font-size:25pt">Throughput / Performance</span> | <span style="font-size:25pt">Throughput / Performance</span>
<span style="font-size:25pt;color:#A22223">Scalability</span> | <span style="font-size:25pt;color:#009682">Scalability</span>
<span style="font-size:25pt;color:#A22223">Extensibility</span> | <span style="font-size:25pt;color:#009682">Extensibility</span>
<span style="font-size:25pt">Security</span> | <span style="font-size:25pt">Security</span>
<span style="font-size:25pt;color:#009682">Maintainability</span> | <span style="font-size:25pt;color:#A22223">Maintainability</span>
<span style="font-size:25pt">Price</span> | <span style="font-size:25pt">Price</span>

# Architecture - Data Ingress
<img src="img/pipeline_overview_1.png" alt="Pipeline1" width="100%"/>

* Reliable acceptance and forwarding of incoming data
* Single point of contact for all sources
* Uniform access / messages for all "data sinks"

# Architecture - Data Storage
<img src="img/pipeline_overview_2.png" alt="Pipeline2" width="100%"/>

* Persist data with fault tolerance
* High throughput for real-time applications
* Proper arrangement of the data on disk

# Architecture - Processing / Analysis
<img src="img/pipeline_overview_3.png" alt="Pipeline3" width="100%"/>

* Scheduling and load balancing
* Efficient usage of ressources
* Synergy with storage framework (data locaility)


# Architecture - Interaction
<img src="img/pipeline_overview_4.png" alt="Pipeline4" width="100%"/>

* Provide "easy" access to relevant information
* Allow collaborative work

# Architecture - Software Frameworks
<img src="img/pipeline_overview_5.png" alt="Pipeline5" width="100%"/>

# Architecture - Service Locations
<img src="img/location_1.png" alt="Location1" width="100%"/>
* Locations: Sensor, Storage Servers and Viewing Device

# Architecture - Service Locations
<img src="img/location_2.png" alt="Location2" width="100%"/>
* Locations: Sensor, Storage Servers and Viewing Device
* Cassandra is the storage service and Spark runs on the same hardware

# Architecture - Service Locations
<img src="img/location_3.png" alt="Location3" width="100%"/>
* Locations: Sensor, Storage Servers and Viewing Device
* Cassandra is the storage service and Spark runs on the same hardware
* Location/Hardware of kafka and Jupyter depends on situation

# Architecture - Demonstration Setup

* Simulate separate machines in a connected network using docker
 * 1 Node for kafka
 * 2 Nodes for cassandra + spark
 * 1 Node as spark master
 * 1 Node for jupyter notebooks
<br><br><br>
* Images and documentation [available via github](https://github.com/holtri/energydata-docker)
 * Intended as functional sandbox
 * Additional simulation of an input stream is also available


# Architecture - Demonstration Setup

* Data is used from [https://data.open-power-system-data.org](https://data.open-power-system-data.org)
 * Solar and wind generation in germany in 15 min timesteps
 * Also used during the second part
 * More specifics are avialble in the [github documentation](https://github.com/holtri/energydata-docker)

``ts,type,region,value
2012-01-01 06:00:00,solar,DE,0.0
2012-01-01 06:15:00,solar,DE,0.0
2012-01-01 06:30:00,solar,DE,0.0
2012-01-01 06:45:00,solar,DE,0.0
2012-01-01 07:00:00,solar,DE,8.0
2012-01-01 07:15:00,solar,DE,30.0
2012-01-01 07:30:00,solar,DE,74.0
2012-01-01 07:45:00,solar,DE,137.0
...``

# Kafka - Ingress Responsibilities

* Always available for new messages
* Route all incoming messages to the correct recipients
* Enable asynchronous delivery

# Kafka - Delivery Paradigm

### Classic system: Message Queue
* Deliver message from one sendet to one recipient
* Message must be sent separately for each recipient
* Sender must know recipients beforehand
* Messages are buffered for each recipient

### Publish and Subscribe Approach
* Sender publishes a message in a category
* Recipients can subscribe any number of categories
* Senders and recipients must only know the categories but not each other
* Mesages are buffered only once and (at least) until each subscribers received it

# Kafka - Delivery Paradigm

<img src="img/kafka_pubsub_1.png" alt="PubSub1" width="100%"/>

# Kafka - Delivery Paradigm

<img src="img/kafka_pubsub_2.png" alt="PubSub2" width="100%"/>

# Kafka - Delivery Paradigm

<img src="img/kafka_pubsub_3.png" alt="PubSub3" width="100%"/>

![kafka_logo](https://kafka.apache.org/images/logo.png)

* Adapts PublishSubscribe paradimn for cluster
* Developed by LinkedIn and made open source
* Scalable through distributed design
 * LinkedIn: 1100 nodes, receives 175TB and distributes 650TB daily
 * Netflix: 4000 nodes, 700 billion messages daily

# Kafka - Features

* Persists data for a while
* Messages are not sent immediatly but delivered on request
* Allows groups of recipients to read in parallel
* Uses replication for fault tolerance
* Messages are organized as topics that can be distributed as partitions
* Messages are delivered in order of arrival (per partition)


# Kafka - Detail: Commit Log

* Messages are commited to and read from a log
* Messages are deleted after a certain duration or log size
* One independent commit log per topic and partition
![kafka_commit](img/kafka_commitlog.png)

# Kafka - Detail: Consumers and Producers 

### Producers
* Send message to a kafka cluster for a specific topic
* Can ensure that messages are in the same partition

### Consumers
* Requests messages from a specific topic
* Partitions of the topics are assigned automatically
 * If consumer is part of a group, each partition is assigned only once across the group
 * Otherwise, all partitions are assigned
* Can specify the offset on a commit log

# Kafka - Detail: Connect

* Most common task is reading from and writing into a database
* Special interface to contiuously produce/consume with database as source/sink
* Ready-to-use connectors available for most popular databases and other 
 * E.g., Hadoop, MySQL, Cassandra, Oracle, Twitter, FTP

# Kafka - Demonstration
![kafka_demo](img/kafka_demoVisualization.png)

In [None]:
from kafka import KafkaConsumer
consumer = KafkaConsumer('generation', group_id='consumerGroupA', bootstrap_servers='broker')
for message in consumer:
    print ("%s:%d:%d    %s" % (message.topic, message.partition, message.offset, message.value.decode('utf-8')))

In [None]:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='broker')
while 1:
    producer.send('generation', value=input('message: ').rstrip('\n').encode('utf-8'))

# Architecture - Software Frameworks
<img src="img/pipeline_overview_5.png" alt="Pipeline5" width="100%"/>

![logo](https://upload.wikimedia.org/wikipedia/commons/5/5e/Cassandra_logo.svg)

### Apache Cassandra Database

* Distributed and Decentralized: no single point of failure
* Highly scalable
    * Apple: 75,000 nodes, 10PB data
    * Netflix: 420TB data, trillion requests per day
* Row-oriented storage
* Well suited for temporal data



# Cassandra Overview

* Data Model
* Distribution Strategy
* Querying Data
* (Read and Write Path)
* Demo Data Model

# Topology of a Cassandra Cluster

![cluster](img/cassandra_cluster+kafka.png)

# Cassandra Data Model 

#### Basic entity: Key-Value pairs
* temperature $\rightarrow $ 25
* humidity $\rightarrow $ 53 

#### More complex data is composition of Key-Value pairs
* temperature $\rightarrow $ 25
* unit $\rightarrow $ celcius
* location $\rightarrow $ Passau

#### Primary Key
* Identifier of a set of key value pairs
* Contains of a Partition Key and a Clustering Key
* Controls data locality

# Cassandra Data Model
### Regular Table
![dm1](img/cassandra_datamodel_1.png)

# Example Regular Table

* Table: Environment-Information
    * Primary Key: Karlsruhe
      * temperature $\rightarrow $ 28
      * temperature_unit $\rightarrow $ celcius
    * Primary Key: Passau
      * temperature $\rightarrow $ 25
      * temperature_unit $\rightarrow $ celcius
      * waterlevel $\rightarrow $ 700 
      * waterlevel_unit $\rightarrow $ cm

# Keyspace

* Keyspaces summarize tables
* Defines the replication strategy

# Cassandra Data Model
## Wide Rows

![dm2](img/cassandra_datamodel_2.png)

# Example wide row

* Table: Environment-Information
* Partition Key: Location, Clustering Key: Date
* Primary Key: Composite of Location + Date
  * Location $\rightarrow$ Karlsruhe
    * Date $\rightarrow $ 2017-09-11; temperature $\rightarrow $ 28
    * Date $\rightarrow $ 2017-09-12; temperature $\rightarrow $ 24

## So far so good - But why not just good old relational DB? 


# Distribution of Partition Keys on Cluster

![](img/cassandra_cluster_2.png)

# Replication

<img src="https://blog.gft.com/wp-content/uploads/2017/01/cassandra-ring.png" alt="ipe2" width="100%"/>

# Replication and Consistency

* Brewer's CAP Theorem
  * Consistency
  * Availability 
  * Partition Tolerance
* Cassandra: AP but with tuneable consistency 
  * trade consistency for availability

# Benefits from this architecture

* Horizontally Scalable
* Fault Tolerance
* Particularly well suited for energy data (timeseries)

# The Cassandra Query Language (CQL)

* Declarative language to query data
* Similar to SQL
* _But_
  * No arbitrary where predicates (only on partition keys)
  * No arbitrary order by (only on clustering keys)
  * No joins or group by
* Other Options
  * _Allow filtering_
  * _Secondary Index_
  * ...
  * **Better think about your data model first**
* Data model is query oriented 
* Denormalization is important concept
* Spark for analytical queries

# Cassandra Write Path
![write_path](https://docs.datastax.com/en/cassandra/3.0/cassandra/images/dml_write-process_12.png)

### Compaction

* SSTables are rearranged for efficient retrieval
* Reads are sequential for a partition key in order of the clustering key
  * Fast for time series!

### Cassandra Read Path

![read_path](https://docs.datastax.com/en/cassandra/3.0/cassandra/images/dml_caching-reads_12.png)

https://docs.datastax.com/en/cassandra/3.0/cassandra/dml/dmlAboutReads.html

### Back to our Use-Case

**ts**|**typer**|**region**|**value**
:-----:|:-----:|:-----:|:-----:
2011-12-31 23:00:00|solar|DE|0.0
2011-12-31 23:15:00|solar|DE|0.0
2011-12-31 23:30:00|solar|DE|0.0
...|...|...|...
2010-01-01 00:00:00|wind|DE|3517.0
2010-01-01 00:15:00|wind|DE|3512.0
2010-01-01 00:30:00|wind|DE|3438.0

### Back to our Use-Case

```SQL 
CREATE  KEYSPACE IF NOT EXISTS energydata WITH REPLICATION = {'class' : 'SimpleStrategy', 
'replication_factor' : 2 };
```


```SQL
CREATE TABLE IF NOT EXISTS energydata.generation (
    ts timestamp,
	type text,
    region text,
    value double,
    PRIMARY KEY ((region, type), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
```

```SQL
AND compaction = {'class': 'DateTieredCompactionStrategy'};
```

# Cassandra Demonstration

## Cassandra References

* Cassandra Datastax Documentation: http://docs.datastax.com/en/cql/3.3/index.html
* Cassandra - The Definitive Guide (Jeff Carpenter and Eben Hewitt, O'Reilly 2016)

# Architecture - Software Frameworks
<img src="img/pipeline_overview_5.png" alt="Pipeline5" width="100%"/>

![](https://spark.apache.org/images/spark-logo-trademark.png)

### Spark - Analytics Framework

* Berkeley AMPLab, Apache Project since 2014
* Prececessor of MapReduce
* in memory processing instead of writing intermediate results to disk
* Bindings to several languages

### How does it fit to Cassandra

* Shortcomings of Cassanadra
  * No filter on non-keys
  * Not suited for analytical functions, e.g., a simple aggregate per sensor over time

### Integration with Cassandra

![cluster](img/cassandra_cluster+spark.png)

### Basic Idea of Distributed Computing Frameworks

* Bring the computation/function to the data
* Simple Example: show all locations where the temperature is over 30°C
  * Filter Function: temperature > 30
  * _Possibility 1_: load all memory into one node, do the filter
  * _Possibility 2_: filter on each node, then combine the results

### Major Design Principle: Data Locality

### The Spark Workflow

* User friendly interface (API)
  * bindings in Scala, Java, Python, R, ...
  * high level transformations on the data (filter, map, aggregate, ...)
  * lazy evaluation
* Transformation into operators
* Efficient execution (in many cases) transparent to the user

### Spark Cassandra Connector

* Preserve Data Locality
* Alignment of Spark Partitions with Cassandra partitions

# Jupyter Notebook

![jupyter_logo](http://blog.jupyter.org/content/images/2015/02/jupyter-sq-text.png)

* Interactive Data Science 
* Web-based
* Support of many programming languages

# Spark Demo

# Summary

* Use-Case: large scale energy data collection and analysis
* Technology Stack: Kafka, Cassandra, Spark, Jupyter 
* System design principles 
* Easy to setup docker-based playground

### Technology readily available

# Running the Workshop Examples

<img src="https://assets-cdn.github.com/images/modules/logos_page/GitHub-Mark.png" alt="githublogo" width="30%"/>

### All resources are available at https://github.com/holtri/energydata-docker