# Stream to datapool using the MSSQL Spark Connector
MSSQL Spark connector provides an efficient write to SQLServer master instance and SQL Server data pool in Big Data Clusters. This sample shows how to use the connector to stream a file to an external table in   data pools.

 The sample is divided into 2 parts. 
- In Part 1, we stream all files under a given directory to a external table in data pools
- In Part 2, we read the table to see the data. 

PreReq: 
- The sample uses a SQL database named "MyTestDatabase". Create this before you run this sample. The database can be created as follows
    ``` sql
    Create DATABASE MyTestDatabase
    GO 
    ``` 
- Download [AdultCensusIncome.csv]( https://amldockerdatasets.azureedge.net/AdultCensusIncome.csv ) to your local machine.  Create a hdfs folder named /filestreaming and upload the files there. 


## Part 1 - Stream all files under a given directory to an external table in data pools
 

### Configure the user, password, database, sourceDir, datapool_tables,schema,  datasource_name per your needs




In [3]:
import org.apache.spark.sql.types._

//Change per your installation
val user="sa"
val password="****"
val database =  "MyTestDatabase"
val sourceDir = "/file_streaming"
val datapool_table = "streaming_DataPoolTable"
val datasource_name = "test_data_src"
val schema = StructType(Seq(
   StructField("age",StringType,true), StructField("workclass",StringType,true), StructField("fnlwgt",StringType,true), StructField("education",StringType,true), 
   StructField("education-num",StringType,true), StructField("marital-status",StringType,true), StructField("occupation",StringType,true), 
   StructField("relationship",StringType,true), StructField("race",StringType,true), StructField("sex",StringType,true), StructField("capital-gain",StringType,true), 
   StructField("capital-loss",StringType,true), StructField("hours-per-week",StringType,true), StructField("native-country",StringType,true), StructField("income",StringType,true)
))

val hostname = "master-0.master-svc"
val port = 1433
val url = s"jdbc:sqlserver://${hostname}:${port};database=${database};user=${user};password=${password};"

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
74,application_1568069140269_0144,spark,idle,Link,Link,✔


SparkSession available as 'spark'.


import org.apache.spark.sql.types._
user: String = sa
password: String = Yukon900
database: String = MyTestDatabase
sourceDir: String = /file_streaming
datapool_table: String = streaming_DataPoolTable
datasource_name: String = test_data_src
schema: org.apache.spark.sql.types.StructType = StructType(StructField(age,StringType,true), StructField(workclass,StringType,true), StructField(fnlwgt,StringType,true), StructField(education,StringType,true), StructField(education-num,StringType,true), StructField(marital-status,StringType,true), StructField(occupation,StringType,true), StructField(relationship,StringType,true), StructField(race,StringType,true), StructField(sex,StringType,true), StructField(capital-gain,StringType,true), StructField(capital-loss,StringType,true), StructField(hours-per-week,StringType,true), StructField(native-country,StringType,true), StructField(income,StringType,true))
hostname: String = master-0.master-svc
port: Int = 1433
url: String = jdbc:sqlserver://master-

 ### Run Spark Ingestion Job

This spark ingestion job is created using a **readStream** and a **writeStream**   

In [8]:
 import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}

val df = spark.readStream.format("csv").schema(schema).option("header", true).load(sourceDir)
val query = df.writeStream.outputMode("append").foreachBatch{ (batchDF: DataFrame, batchId: Long) => 
                batchDF.write
                 .format("com.microsoft.sqlserver.jdbc.spark")
                 .mode("append")
                  .option("url", url)
                  .option("dbtable", datapool_table)
                  .option("user", user)
                  .option("password", password)
                  .option("dataPoolDataSource",datasource_name).save()
               }.start()

query.processAllAvailable()
query.awaitTermination(40000)


import org.apache.spark.sql.{SparkSession, SaveMode, Row, DataFrame}
df: org.apache.spark.sql.DataFrame = [age: string, workclass: string ... 13 more fields]
query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@1033421d
res14: Boolean = false


## Part2 -  Read the external table using MSSQLSpark Connector 
Now you are streaming data from the source directory to the data pool table. An external table has been created in the targeted database specified above. You can view the table in the explorer tree and query it using t-sql. 

If you want to view the current count of records in the external table use the code below. The stream with append new data to the external table as well. Try creating another census csv file with 100 rows and add it to the source directory.

In [9]:
def df_read(dbtable: String,
                url: String,
                dataPoolDataSource: String=""): DataFrame = {
        spark.read
             .format("com.microsoft.sqlserver.jdbc.spark")
             .option("url", url)
             .option("dbtable", dbtable)
             .option("user", user)
             .option("password", password)
             .option("dataPoolDataSource", dataPoolDataSource)
             .load()
             }

val new_df = df_read(datapool_table, url, dataPoolDataSource=datasource_name)
println("Number of rows is " +  new_df.count)

df_read: (dbtable: String, url: String, dataPoolDataSource: String)org.apache.spark.sql.DataFrame
new_df: org.apache.spark.sql.DataFrame = [age: string, workclass: string ... 13 more fields]
Number of rows is 618665
