#### Spark Streaming Sources

In [None]:
// MongoDB source 

// Spark Structured Streaming Chapter 8. MongoDB Data Source

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import org.apache.spark.sql.streaming.Trigger


val spark:SparkSession = SparkSession
    .builder()
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector:10.1.1")
    .master("local[*]")
    .appName("Hand-On-Spark3_File_Data_Source_MongoDB")
    .getOrCreate()

val sc = spark.sparkContext

sc.setLogLevel("ERROR")

val mongoDBURI = "mongodb+srv://<user>:<password>@hands-on-spark3.abcdefg.mongodb.net/?retryWrites=true&w=majority"

val PatientsSchema = StructType(Array(
     StructField("_id", StringType),
     StructField("NSS", StringType),
     StructField("Nom", StringType),
     StructField("DID", IntegerType),
     StructField("DNom", StringType),
     StructField("Fecha", StringType)
         )
    )
val columsOfInterest = List("NSS","Nom","DID","DNom","Fecha","_id")

// define a streaming query
val df = spark.readStream
  .format("mongodb")
  .option("spark.mongodb.connection.uri", mongoDBURI)
  .option("spark.mongodb.database", "MongoDB_Data_Source")
  .option("spark.mongodb.collection", "MongoDB_Data_Source")
  .option("spark.mongodb.change.stream.publish.full.document.only", "true")
  .option("forceDeleteTempCheckpointLocation", "true")
  //.schema(PatientsSchema)
  .load()

df.printSchema()

if (df.isStreaming) printf(" ----- Streaming is running -----! \n")

import spark.implicits._

val groupDF = df.select(columsOfInterest.map(col): _*) // Here you could do data transformation
    //.groupBy("DID").count()

groupDF.printSchema()

groupDF.writeStream
    .outputMode("append")
    .option("forceDeleteTempCheckpointLocation", "true")
    .format("console")
    .option("checkpointLocation", "file:///tmp/checkpoint")
    //.trigger(Trigger.ProcessingTime("30 seconds"))
    .trigger(Trigger.Continuous("30 seconds"))
    .start()
    .awaitTermination()

#### Spark Streaming Sinks

In [None]:
// File Sink to CSV

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType,DoubleType,LongType}
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
import java.io.IOException
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.{GroupState,GroupStateTimeout,OutputMode}


val PatientsSchema = StructType(Array(
     StructField("NSS", StringType),
     StructField("Nom", StringType),
     StructField("DID", IntegerType),
     StructField("DNom", StringType),
     StructField("Fecha", StringType)
         )
    )

case class Patient(
    NSS: String,
    Nom: String,
    DID: Option[Long],
    DNom: String,
    Fecha: String
)

val spark:SparkSession = SparkSession.builder()
    .master("local[*]")
    .appName("Hand-On-Spark3_Socket_Data_Source")
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

val host = "localhost"
val port = 9999
val checkpointDir = "/tmp/streaming_checkpoint"

try {
    val PatientDS = spark.readStream
        .format("socket")
        .option("host",host)
        .option("port",port)
        .load()
        .select(from_json(col("value"), PatientsSchema).as("patient"))
        .selectExpr("Patient.*")
        .as[Patient]
    
    printf("\n Listening and ready... \n")
    
    val PatientDF = PatientDS.select("*")
    
    PatientDF.writeStream
      .format("csv")
      .option("path", "/tmp/streaming_output/csv")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .option("checkpointLocation", checkpointDir)
      .outputMode("append")
      .option("truncate",false)
      .option("newRows",30)
      .start()
      .awaitTermination()
    
} catch {
    case e: java.net.ConnectException => println("Error establishing connection to " + host + ":" + port)
    case e: IOException => println("IOException occurred")
    case t: Throwable => println("Error receiving data", t)
}finally {
    println("In finally block")
}


In [None]:
// File Sink to Parquet

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType,DoubleType,LongType}
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
import java.io.IOException
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.{GroupState,GroupStateTimeout,OutputMode}


val PatientsSchema = StructType(Array(
     StructField("NSS", StringType),
     StructField("Nom", StringType),
     StructField("DID", IntegerType),
     StructField("DNom", StringType),
     StructField("Fecha", StringType)
         )
    )

case class Patient(
    NSS: String,
    Nom: String,
    DID: Option[Long],
    DNom: String,
    Fecha: String
)

val spark:SparkSession = SparkSession.builder()
    .master("local[*]")
    .appName("Hand-On-Spark3_Socket_Data_Source")
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

val host = "localhost"
val port = 9999
val checkpointDir = "/tmp/streaming_checkpoint"

try {
    val PatientDS = spark.readStream
        .format("socket")
        .option("host",host)
        .option("port",port)
        .load()
        .select(from_json(col("value"), PatientsSchema).as("patient"))
        .selectExpr("Patient.*")
        .as[Patient]
    
    printf("\n Listening and ready... \n")
    
    val PatientDF = PatientDS.select("*")
    
    PatientDF.writeStream
      .format("parquet")
      .option("path", "/tmp/streaming_output/parquet")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .option("checkpointLocation", checkpointDir)
      .outputMode("append")
      .option("truncate",false)
      .option("newRows",30)
      .start()
      .awaitTermination()
    
} catch {
    case e: java.net.ConnectException => println("Error establishing connection to " + host + ":" + port)
    case e: IOException => println("IOException occurred")
    case t: Throwable => println("Error receiving data", t)
}finally {
    println("In finally block")
}

In [None]:
// File Sink to CSV with foreachBatch()

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType,DoubleType,LongType}
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
import java.io.IOException
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.{GroupState,GroupStateTimeout,OutputMode}
import org.apache.spark.sql.DataFrame


val PatientsSchema = StructType(Array(
     StructField("NSS", StringType),
     StructField("Nom", StringType),
     StructField("DID", IntegerType),
     StructField("DNom", StringType),
     StructField("Fecha", StringType)
         )
    )

case class Patient(
    NSS: String,
    Nom: String,
    DID: Option[Long],
    DNom: String,
    Fecha: String
)

def saveToCSV = (df: DataFrame, timeStamp: Long) => {
    df.withColumn("timeStamp", date_format(current_date(),"yyyyMMdd"))
    .write.format("csv")
    .option("path", "/tmp/streaming_output/foreachBatch")
    .mode("append")
    .save()
}

val spark:SparkSession = SparkSession.builder()
    .master("local[*]")
    .appName("Hand-On-Spark3_Socket_Data_Source")
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

val host = "localhost"
val port = 9999
val checkpointDir = "/tmp/streaming_checkpoint"

try {
    val PatientDS = spark.readStream
        .format("socket")
        .option("host",host)
        .option("port",port)
        .load()
        .select(from_json(col("value"), PatientsSchema).as("patient"))
        .selectExpr("Patient.*")
        .as[Patient]
    
    printf("\n Listening and ready... \n")
    
    val PatientDF = PatientDS.select("*")
    
    PatientDF.writeStream
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .option("checkpointLocation", checkpointDir)
      .outputMode("append")
      .foreachBatch(saveToCSV)
      .start()
      .awaitTermination()
    
} catch {
    case e: java.net.ConnectException => println("Error establishing connection to " + host + ":" + port)
    case e: IOException => println("IOException occurred")
    case t: Throwable => println("Error receiving data", t)
}finally {
    println("In finally block")
}

In [None]:
// Console Sink with foreach()

import org.apache.spark.sql.{Column, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType,DoubleType,LongType}
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
import java.io.IOException
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.{GroupState,GroupStateTimeout,OutputMode}
import org.apache.spark.sql.{DataFrame,ForeachWriter}

val PatientsSchema = StructType(Array(
     StructField("NSS", StringType),
     StructField("Nom", StringType),
     StructField("DID", IntegerType),
     StructField("DNom", StringType),
     StructField("Fecha", StringType)
         )
    )

case class Patient(
    NSS: String,
    Nom: String,
    DID: Option[Long],
    DNom: String,
    Fecha: String
)

val customWriterToConsole = new ForeachWriter[Row] {
    
    override def open(partitionId: Long, version: Long) = true
    
    override def process(record: Row) = {
        // You can transform record into a Sequence a loop around it
        //record.toSeq.foreach{col => println(col) }
        
        // ... or you can just print record field by field
        println("NSS: " + record.getAs("NSS")
                +" Nom: "  + record.getAs("Nom")
                +" DID: "  + record.getAs("DID")
                +" DNom: "  + record.getAs("DNom")
                +" Fecha : "  + record.getAs("Fecha"))
    }
    
    override def close(errorOrNull: Throwable) = {}
}

val spark:SparkSession = SparkSession.builder()
    .master("local[*]")
    .appName("Hand-On-Spark3_Socket_Data_Source")
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

val host = "localhost"
val port = 9999
val checkpointDir = "/tmp/streaming_checkpoint"

try {
    val PatientDS = spark.readStream
        .format("socket")
        .option("host",host)
        .option("port",port)
        .load()
        .select(from_json(col("value"), PatientsSchema).as("patient"))
        .selectExpr("Patient.*")
        .as[Patient]
    
    printf("\n Listening and ready... \n")
    
    val PatientDF = PatientDS.select("*")
    
    PatientDF.writeStream
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .option("checkpointLocation", checkpointDir)
      .outputMode("append")
      .foreach(customWriterToConsole)
      .start()
      .awaitTermination()
    
} catch {
    case e: java.net.ConnectException => println("Error establishing connection to " + host + ":" + port)
    case e: IOException => println("IOException occurred")
    case t: Throwable => println("Error receiving data", t)
}finally {
    println("In finally block")
}


Intitializing Scala interpreter ...

Spark Web UI available at http://tucan:4040
SparkContext available as 'sc' (version = 3.3.0, master = local[*], app id = local-1679141558379)
SparkSession available as 'spark'


23/03/18 13:12:48 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.

 Listening and ready... 
NSS: 1234 Nom: María DID: 10 DNom: Cardio Fecha : 01-09-2022
NSS: 2345 Nom: Emilio DID: 20 DNom: Neuro Fecha : 01-09-2022
NSS: 3456 Nom: Marta DID: 30 DNom: Endo Fecha : 01-09-2022
NSS: 4567 Nom: Marcos DID: 40 DNom: Gastro Fecha : 01-09-2022
NSS: 5678 Nom: Sonia DID: 50 DNom: Gineco Fecha : 01-09-2022
NSS: 6789 Nom: Eduardo DID: 10 DNom: Cardio Fecha : 01-09-2022
NSS: 1001 Nom: Lorena DID: 10 DNom: Cardio Fecha : 01-09-2022
NSS: 1006 Nom: Sara DID: 20 DNom: Neuro Fecha : 01-09-2022
NSS: 1002 Nom: Teresa DID: 10 DNom: Cardio Fecha : 01-09-2022
NSS: 1003 Nom: Luis DID: 20 DNom: Neuro Fecha : 01-09-2022


In [None]:
// MongoDB Sink 

// Spark Structured Streaming Chapter 8. MongoDB Data Sink


import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}


val spark:SparkSession = SparkSession
    .builder()
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector:10.1.1")
    .master("local[*]")
    .appName("Hand-On-Spark3_File_Data_Source_MongoDB_Sink")
    .getOrCreate()

val sc = spark.sparkContext

sc.setLogLevel("ERROR")

val mongoDBURI = "mongodb+srv://<user>:<password>@hands-on-spark3.abcdefg.mongodb.net/?retryWrites=true&w=majority"

val PatientsSchema = StructType(Array(
     StructField("NSS", StringType),
     StructField("Nom", StringType),
     StructField("DID", IntegerType),
     StructField("DNom", StringType),
     StructField("Fecha", StringType)
         )
    )

val df = spark.readStream
    .schema(PatientsSchema)
    .option("checkpointLocation", "file:///tmp/checkpoint")
    .json("file:///tmp/stream_mongo")

df.printSchema()

val groupDF = df.select("*") // Here you could do data transformation
    //.groupBy("DID").count()

groupDF.printSchema()

groupDF.writeStream
    .format("mongodb")
    .option("checkpointLocation", "file:///tmp/checkpoint")
    .option("forceDeleteTempCheckpointLocation", "true")
    .option("spark.mongodb.connection.uri", mongoDBURI)
    .option("spark.mongodb.database", "Hands-On-Spark3-Book")
    .option("spark.mongodb.collection", "Hands-On-Spark3-Book")
    .outputMode("append")
    .start()
    .awaitTermination()


