#REPLACE storage account with <storageAccountName>

# Send Rate Stream Data to Bronze Delta Table
To run this notebook, import it into Azure Synapse and attach it to an Apache Spark Pool.
Choose the "Small" Node Size, and choose "3" as the Number of Nodes.

In [None]:
%%spark
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

## Configure the Storage Account
Be sure to replace `<storageAccountName>` with the name of your storage account.

In [None]:
%%spark
val storageAccountName = "<storageAccountName>"
val bronzeDataLocation: String = "abfss://datalake@"+storageAccountName+".dfs.core.windows.net/bronzeSynapse"  

## Choose a Topic
Choose a topic by uncommenting the line of code for the desired topic. Comment out the line of code for the unwanted topic.
If you would like to have data with each topic, then please run this Notebook with one topic, then rerun with the other topic to generate data for each topic.

In [None]:
%%spark
var topic = "VAUsage"
// var topic = "Telemetry"

## Create the Streaming DataFrame
The Streaming Dataframe generates data using a Rate Streaming Source. One row is generated every second.

In [None]:
%%spark
var df = spark.readStream.format("rate").option("rowsPerSecond", 1).load

## Configure the Schema
The schema is configured to match the schema of the Bronze Delta Table.

In [None]:
%%spark
df = df.withColumn("ProcessedTimestamp", col("timestamp"))
df = df.withColumn("ProcessedDate", to_date(col("timestamp")))
df = df.withColumn("ProcessedHour", format_string("%2d", hour(col("timestamp"))))
df = df.withColumn("VehicleId", (col("value") % 10 + 1).cast("String"))
df = df.withColumn("UserId", (col("value") % 10 + 1).cast("String"))

if (topic == "VAUsage") { // populate "Properties.topic" and "Body" column
  df = df.withColumn("Properties", typedLit(Map("topic" -> "VAUsage")))

  val vausageBodyData: List[String] = List("\"VolumeUp\"", "\"VolumeDown\"")
  def getBody(index: Int): String = { vausageBodyData(index) }
  
  df = df.withColumn("tempAction",
    when(col("value") % 2 === 0, getBody(0))
    .otherwise(getBody(1))
  )
  df = df.withColumn("Body", concat(
    lit("{ "), 
    lit("\"Object\":\"SteeringWheelVolume\", "), 
    lit("\"Action\":"), 
    col("tempAction"), 
    lit(" }")
  ))
  df = df.withColumn("Body", col("Body").cast("Binary"))

  df = df.drop("tempAction")
}


if (topic == "Telemetry") { // populate "Properties.topic" and "Body" column
  df = df.withColumn("Properties", typedLit(Map("topic" -> "Telemetry")))

  // generate random values using col("value")
  df = df.withColumn("tempEngineTemp", (col("value") % 2 + 200).cast("String"))
  df = df.withColumn("tempBatteryVoltage", (col("value") % 20 / 10 + 16.0).cast("String"))
  
  df = df.withColumn("tempBody", concat(
      lit("\"VehicleId\":"), col("VehicleId"), lit(", "), 
      lit("\"EngineTemp\":"), col("tempEngineTemp"), lit(", "), 
      lit("\"BatteryVoltage\":"), col("tempBatteryVoltage"), lit(", "), 
      lit("\"DaysSinceLastServicing\": 360"), lit(", "),
      lit("\"Mileage\": 90000")
      ))
  
  df = df.withColumn("tempBody", concat(lit("{ "), col("tempBody"), lit(" }"))) 
  df = df.withColumn("Body", col("tempBody").cast("Binary"))

  df = df.drop("tempEngineTemp")
  df = df.drop("tempBatteryVoltage")
  df = df.drop("tempBody")
}


// dropping columns that are no longer needed
df = df.drop("timestamp").drop("value") 

df.printSchema()

## Write Data to Bronze Delta Table

In [None]:
%%spark
val bronzeQuery = df.writeStream.format("delta").
outputMode("append").
option("checkpointLocation", bronzeDataLocation + "/checkpoint").
partitionBy("ProcessedDate", "ProcessedHour").
start(bronzeDataLocation)

Run the following cell when you would like to stop generating data.

In [None]:
%%spark
bronzeQuery.stop()

## View the Data

In [None]:
%%spark 
val bronzeViewDF = spark.read.format("delta").load(bronzeDataLocation)
bronzeViewDF.orderBy(col("ProcessedTimestamp").desc).show()

In [None]:
%%spark
bronzeViewDF.printSchema()

In [None]:
%%spark
display(bronzeViewDF.orderBy(col("ProcessedTimestamp").desc))