## Set up Connection to Azure Event Hubs

In [0]:
%scala
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }

// To connect to an Event Hub, EntityPath is required as part of the connection string.
// Here, we assume that the connection string from the Azure portal does not have the EntityPath part.
val connectionString = ConnectionStringBuilder("{EVENT HUB CONNECTION STRING FROM AZURE PORTAL}")
  .setEventHubName("{EVENT HUB NAME}")
  .build
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

var streamingInputDF = 
  spark.readStream
    .format("eventhubs")
    .options(eventHubsConf.toMap)
    .load()

## streamingInputDF.printSchema

  root <br><pre>
   </t>|-- body: binary (nullable = true) <br>
   </t>|-- offset: string (nullable = false) <br>
   </t>|-- sequenceNumber: long (nullable = false) <br>
   </t>|-- enqueuedTime: timestamp (nullable = false) <br>
   </t>|-- publisher: string (nullable = true) <br>
   </t>|-- partitionKey: string (nullable = true) <br>

## Sample Event Payload
The `body` column is provided as a `binary`. After applying the `cast("string")` operation, a sample could look like:
<pre>
{
</t>"city": "<CITY>", 
</t>"country": "United States", 
</t>"countryCode": "US", 
</t>"isp": "<ISP>", 
</t>"lat": 0.00, "lon": 0.00, 
</t>"query": "<IP>", 
</t>"region": "CA", 
</t>"regionName": "California", 
</t>"status": "success", 
</t>"hittime": "2017-02-08T17:37:55-05:00", 
</t>"zip": "38917" 
}

## GroupBy, Count

In [0]:
%scala
import org.apache.spark.sql.functions._

var streamingSelectDF = 
  streamingInputDF
   .select(get_json_object(($"body").cast("string"), "$.zip").alias("zip"))
    .groupBy($"zip") 
    .count()

## Window

In [0]:
%scala
import org.apache.spark.sql.functions._

var streamingSelectDF = 
  streamingInputDF
   .select(get_json_object(($"body").cast("string"), "$.zip").alias("zip"), get_json_object(($"body").cast("string"), "$.hittime").alias("hittime"))
   .groupBy($"zip", window($"hittime".cast("timestamp"), "10 minute", "5 minute", "2 minute"))
   .count()


## Memory Output

In [0]:
%scala
import org.apache.spark.sql.streaming.Trigger.ProcessingTime

val query =
  streamingSelectDF
    .writeStream
    .format("memory")        
    .queryName("sample")     
    .outputMode("complete") 
    .trigger(ProcessingTime("25 seconds"))
    .start()

## Console Output

In [0]:
%scala
import org.apache.spark.sql.streaming.Trigger.ProcessingTime

val query =
  streamingSelectDF
    .writeStream
    .format("console")        
    .outputMode("complete") 
    .trigger(ProcessingTime("25 seconds"))
    .start()

## File Output with Partitions

In [0]:
%scala
import org.apache.spark.sql.functions._

var streamingSelectDF = 
  streamingInputDF
   .select(get_json_object(($"body").cast("string"), "$.zip").alias("zip"),    get_json_object(($"body").cast("string"), "$.hittime").alias("hittime"), date_format(get_json_object(($"body").cast("string"), "$.hittime"), "dd.MM.yyyy").alias("day"))
    .groupBy($"zip") 
    .count()
    .as[(String, String)]

In [0]:
%scala
import org.apache.spark.sql.streaming.Trigger.ProcessingTime

val query =
  streamingSelectDF
    .writeStream
    .format("parquet")
    .option("path", "/mnt/sample/test-data")
    .option("checkpointLocation", "/mnt/sample/check")
    .partitionBy("zip", "day")
    .trigger(ProcessingTime("25 seconds"))
    .start()

##### Create Table

In [0]:
%sql CREATE EXTERNAL TABLE  test_par
    (hittime string)
    PARTITIONED BY (zip string, day string)
    STORED AS PARQUET
    LOCATION '/mnt/sample/test-data'

## JDBC Sink

In [0]:
%scala
import java.sql._

class  JDBCSink(url:String, user:String, pwd:String) extends ForeachWriter[(String, String)] {
      val driver = "com.mysql.jdbc.Driver"
      var connection:Connection = _
      var statement:Statement = _
      
    def open(partitionId: Long,version: Long): Boolean = {
        Class.forName(driver)
        connection = DriverManager.getConnection(url, user, pwd)
        statement = connection.createStatement
        true
      }

      def process(value: (String, String)): Unit = {
        statement.executeUpdate("INSERT INTO zip_test " + 
                "VALUES (" + value._1 + "," + value._2 + ")")
      }

      def close(errorOrNull: Throwable): Unit = {
        connection.close
      }
   }


In [0]:
%scala
val url="jdbc:mysql://<mysqlserver>:3306/test"
val user ="user"
val pwd = "pwd"

val writer = new JDBCSink(url,user, pwd)
val query =
  streamingSelectDF
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()
    

## EventHubs Sink

In [0]:
%scala
// The connection string for the Event Hub you will WRTIE to. 
val connectionString = "{EVENT HUB CONNECTION STRING}"    
val eventHubsConfWrite = EventHubsConf(connectionString)

val query =
  streamingSelectDF
    .writeStream
    .format("eventhubs")
    .outputMode("update")
    .options(eventHubsConfWrite.toMap)
    .trigger(ProcessingTime("25 seconds"))
    .start()

## EventHubs Sink - Write test data with Rate Source

In [0]:
%scala
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.streaming.Trigger.ProcessingTime

// The connection string for the Event Hub you will WRTIE to. 
val connString = "{EVENT HUB CONNECTION STRING}"    
val eventHubsConfWrite = EventHubsConf(connString)

val source = 
  spark.readStream
    .format("rate")
    .option("rowsPerSecond", 100)
    .load()
    .withColumnRenamed("value", "body")
    .select($"body" cast "string")

val query = 
  source
    .writeStream
    .format("eventhubs")
    .outputMode("update")
    .options(eventHubsConfWrite.toMap)
    .trigger(ProcessingTime("25 seconds"))
    .option("checkpointLocation", "/checkpoint/")
    .start()