# Write to a file

In this exercices we reuse the code from the previous exercice but instead of grouping by, we will simply save the resulting dataset as a csv file.

In [None]:
import com.amazonaws.services.kinesis.model.PutRecordRequest
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import collection.JavaConverters._

In [None]:
val awsAccessKeyId = "YOUR ACCESS KEY ID"
val awsSecretKey = "YOUR SECRET KEY"
val kinesisStreamName = "YOUR STREAM NAME"
val kinesisRegion = "YOUR REGION"

In [None]:
sc.hadoopConfiguration.set("fs.s3a.access.key", awsAccessKeyId)
sc.hadoopConfiguration.set("fs.s3a.secret.key", awsSecretKey)

### TODO: Implement a spark streming listener that can save to a csv on s3

In [None]:
val kinesis = spark.readStream
  .format("kinesis")
  .option("streamName", kinesisStreamName)
  .option("region", kinesisRegion)
  .option("initialPosition", "TRIM_HORIZON")
  .option("checkpointLocation", "s3://output-databricks/checkpoint/")
  .option("awsAccessKey", awsAccessKeyId)
  .option("awsSecretKey", awsSecretKey)
  .load()


val schema = StructType(Seq(
  StructField("items", ArrayType(StringType)),
  StructField("from", StringType),
  StructField("to", StringType),
))

val result = kinesis.selectExpr("CAST(data as STRING) as json")
  .withColumn("order", from_json($"json", schema))
  .withColumn("item_count", size(expr("order.items")))
  .withColumn("from", expr("order.from"))
  .withColumn("to", expr("order.to"))
  .drop("json")
  .drop("order")

Instead of using display to trigger the stream, we use `writeStream`. This will write the stream to a file.

In [None]:
val query = result.writeStream
    .outputMode("append")
    .format("csv")
    .option("path", "s3://output-databricks/")
    .option("checkpointLocation", "s3://output-databricks/checkpoint/")
    .start()

### Helper code to publish the events

In [None]:
import scala.util.Random
import java.lang.reflect.{Type, ParameterizedType}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.`type`.TypeReference;
import java.nio.ByteBuffer

In [None]:
case class Order(from: String, to: String, items: Seq[String])

def getRandomElement[A](seq: Seq[A]): A = {
  val r = scala.util.Random
  seq(r.nextInt(seq.length))
}
    
def selectNElementFromList[A](maxElements: Int, list: Seq[A]): Seq[A] = {
  val r = scala.util.Random
  (0 until r.nextInt(maxElements) + 1).map(idx => list(r.nextInt(list.length))).toSeq
}

val kinesisClient = AmazonKinesisClientBuilder.standard()
  .withRegion(kinesisRegion)
  .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsAccessKeyId, awsSecretKey)))
  .build()

println(s"Putting words onto stream $kinesisStreamName")
var lastSequenceNumber: String = null

val SOURCES = Seq("Paris", "Lyon", "Marseille", "Bordeau")
val DESTINATIONS = Seq("Berlin", "Madrid", "Rome", "London")
val ITEMS = Seq("Adidas Kampung","Ballet shoe","Pointe shoe","Bast shoe","Blucher shoe","Boat shoe","Brogan","Brogue shoe","Brothel creeper","Bucks","Cantabrian albarcas","Chelsea boot","Chopine","Chukka boot","Climbing shoe","Clog","Court shoe","Cross country running shoe","Derby shoe","Desert Boot","Diabetic shoe","Dress shoe","Duckbill shoe","Driving moccasins","Earth shoe","Elevator shoes","Espadrille","Fashion boot","Galesh","Geta","Giveh","High-heeled")

val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)

for (i <- 0 to 10) {
  val time = System.currentTimeMillis
  
  val data = mapper.writeValueAsString(Order(getRandomElement(SOURCES), getRandomElement(DESTINATIONS), selectNElementFromList(5, ITEMS)))
  val request = new PutRecordRequest()
      .withStreamName(kinesisStreamName)
      .withPartitionKey("some-key")
      .withData(ByteBuffer.wrap(data.getBytes()))
  if (lastSequenceNumber != null) {
    request.setSequenceNumberForOrdering(lastSequenceNumber)
  }    
  val result = kinesisClient.putRecord(request)
  lastSequenceNumber = result.getSequenceNumber()

  Thread.sleep(math.max(10000 - (System.currentTimeMillis - time), 0))
}