# Data Ingestion
#### Agenda
- Loading batched data
- Connecting databases
- Connecting to streaming server

<img src="https://github.com/awantik/machine-learning-slides/blob/master/ML-Pipeline-business.png?raw=true">

## 1. Loading Batched Data
<hr>
* Historical data in any format csv,json,parquet can be loaded.
* Result of loading is dataframe.

In [2]:
data = spark.read.csv('/FileStore/tables/winequality.csv', header=True, inferSchema=True)

In [3]:
data.printSchema

In [4]:
display(data)

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality,recommend
7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8,6,False
6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5,6,False
8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1,6,False
7.2,0.23,0.32,8.5,0.0579999999999999,47.0,186.0,0.9956,3.19,0.4,9.9,6,False
7.2,0.23,0.32,8.5,0.0579999999999999,47.0,186.0,0.9956,3.19,0.4,9.9,6,False
8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1,6,False
6.2,0.32,0.16,7.0,0.045,30.0,136.0,0.9949,3.18,0.47,9.6,6,False
7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8,6,False
6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5,6,False
8.1,0.22,0.43,1.5,0.044,28.0,129.0,0.9938,3.22,0.45,11.0,6,False


## 2. Connecting Databases
<hr>
- Creating tables

In [6]:
df = spark.sql("SELECT * from winequality")
display(df)

fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality,recommend
7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8,6,False
6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5,6,False
8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1,6,False
7.2,0.23,0.32,8.5,0.0579999999999999,47.0,186.0,0.9956,3.19,0.4,9.9,6,False
7.2,0.23,0.32,8.5,0.0579999999999999,47.0,186.0,0.9956,3.19,0.4,9.9,6,False
8.1,0.28,0.4,6.9,0.05,30.0,97.0,0.9951,3.26,0.44,10.1,6,False
6.2,0.32,0.16,7.0,0.045,30.0,136.0,0.9949,3.18,0.47,9.6,6,False
7.0,0.27,0.36,20.7,0.045,45.0,170.0,1.001,3.0,0.45,8.8,6,False
6.3,0.3,0.34,1.6,0.049,14.0,132.0,0.994,3.3,0.49,9.5,6,False
8.1,0.22,0.43,1.5,0.044,28.0,129.0,0.9938,3.22,0.45,11.0,6,False


## 3. Connecting to Streaming Server
<hr>

In [8]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

streamingInputDF.isStreaming

In [9]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingInputDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     # counts = name of the in-memory table
    .start()
)

In [10]:
df = spark.sql("SELECT * from counts") 
print(df.count())
display(df)

time,action
2016-07-26T02:45:07.000+0000,Open
2016-07-26T02:45:47.000+0000,Open
2016-07-26T02:46:42.000+0000,Open
2016-07-26T02:46:59.000+0000,Open
2016-07-26T02:47:05.000+0000,Open
2016-07-26T02:47:14.000+0000,Open
2016-07-26T02:47:25.000+0000,Open
2016-07-26T02:47:26.000+0000,Open
2016-07-26T02:47:28.000+0000,Open
2016-07-26T02:47:36.000+0000,Open
