## Data Ingestion
#### Agenda
<hr>
1. Loading batched data
2. Connecting databases
3. Connecting to streaming server

<hr>

<img src="https://github.com/awantik/machine-learning-slides/blob/master/ML-Pipeline-business.png?raw=true">

### 1. Loading Batched Data
<hr>
* Historical data in any format csv,json,parquet can be loaded.
* Result of loading is dataframe.

In [4]:
house_data = spark.read.csv('/FileStore/tables/house_rental_data.csv', header=True, inferSchema=True)

In [5]:
house_data.printSchema

In [6]:
display(house_data)

_c0,Sqft,Floor,TotalFloor,Bedroom,Living.Room,Bathroom,Price
1,1177.698,2,7,2,2,2,62000
2,2134.8,5,7,4,2,2,78000
3,1138.56,5,7,2,2,1,58000
4,1458.78,2,7,3,2,2,45000
5,967.776,11,14,3,2,2,45000
6,1127.886,11,12,4,2,2,148000
7,1352.04,5,7,3,2,1,58000
8,757.854,5,14,1,0,1,48000
9,1152.792,10,12,3,2,2,45000
10,1423.2,4,5,4,2,2,65000


### 2. Connecting Databases
<hr>
* Create tables in databricks

In [8]:
df = spark.sql("SELECT * from hr_data")

In [9]:
display(df)

satisfaction_level,last_evaluation,number_project,average_montly_hours,time_spend_company,Work_accident,left,promotion_last_5years,sales,salary
0.38,0.53,2,157,3,0,1,0,sales,low
0.8,0.86,5,262,6,0,1,0,sales,medium
0.11,0.88,7,272,4,0,1,0,sales,medium
0.72,0.87,5,223,5,0,1,0,sales,low
0.37,0.52,2,159,3,0,1,0,sales,low
0.41,0.5,2,153,3,0,1,0,sales,low
0.1,0.77,6,247,4,0,1,0,sales,low
0.92,0.85,5,259,5,0,1,0,sales,low
0.89,1.0,5,224,5,0,1,0,sales,low
0.42,0.53,2,142,3,0,1,0,sales,low


### 3. Connecting Stream

In [11]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

streamingInputDF.isStreaming

In [12]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingInputDF
    .writeStream
    .format("memory")        # memory = store in-memory table (for testing only in Spark 2.0)
    .queryName("counts")     # counts = name of the in-memory table
    .start()
)

In [13]:
df = spark.sql("SELECT * from counts") 

In [14]:
df.count()

In [15]:
df.count()

In [16]:
display(df)

time,action
2016-07-26T02:45:07.000+0000,Open
2016-07-26T02:45:47.000+0000,Open
2016-07-26T02:46:42.000+0000,Open
2016-07-26T02:46:59.000+0000,Open
2016-07-26T02:47:05.000+0000,Open
2016-07-26T02:47:14.000+0000,Open
2016-07-26T02:47:25.000+0000,Open
2016-07-26T02:47:26.000+0000,Open
2016-07-26T02:47:28.000+0000,Open
2016-07-26T02:47:36.000+0000,Open
