In [0]:
%scala
dbutils.widgets.removeAll()


dbutils.widgets.text(
  "ehub_namespace",
  "hyssh-ehubs",
  "Sample event in JSON"
)

dbutils.widgets.text(
  "ehub_name",
  "transactions",
  "Sample event in JSON"
)

dbutils.widgets.text(
  "ehub_secret_name_akv",
  "secret_name",
  "EventHub Secret Name in AKV"
)

dbutils.widgets.text(
  "sample_event",
  """{"user_profile": [{"userid": "userid_4", "username": "Keith Farrar", "gender": "male", "age": 50, "picture_url": "../images/profile/Keith Farrar.png"}], "car": [{"carid": "car_0", "make": "Honda", "year": "2020", "category": "Sedan", "model": "Accord", "color": "Black", "pride": 14000}]}""",
  "Sample event in JSON"
)

dbutils.widgets.text(
  "target_table_name",
  "clickEvents",
  "Target Table Name"
)

val ehubNamespace = dbutils.widgets.get("ehub_namespace")
val ehubName = dbutils.widgets.get("ehub_name")
val ehubSecretNameAKV = dbutils.widgets.get("ehub_secret_name_akv")
val sampleEvent = dbutils.widgets.get("sample_event")
val target_table_name = dbutils.widgets.get("target_table_name")

In [0]:
%scala
val ehubConnectionString = s"Endpoint=sb://${ehubNamespace}.servicebus.windows.net/;EntityPath=${ehubName};SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${ehubSecretNameAKV}" 

In [0]:
%scala
import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.functions.{ explode, split }
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.Trigger  


In [0]:
%scala
// With an entity path
val ehub_conn = sys.env.get("ehub_connection_fullstring").get

In [0]:
%scala
val eventHubsConf = EventHubsConf(ehub_conn)

val eventhubs = spark.readStream
  .format("eventhubs")
  .options(eventHubsConf.toMap)
  .load()

val df = eventhubs.select($"body".cast("string").as("parsed_json"))

In [0]:
%scala
df.schema

In [0]:
%scala
val ucName = "uc_hsshin"
val schemaName = "kickthetires"
val icebergTableName = "clickEvents0"   
val streamingDataFrameSchema: StructType = df.schema // Replace result_df with your streaming DataFrame  

// spark.sql(s"DROP TABLE IF EXISTS $ucName.$schemaName.$icebergTableName")
// spark.sql(s"DROP DATABASE IF EXISTS $ucName.$schemaName")

// Create Database 
// spark.sql(s"CREATE DATABASE $ucName.$schemaName")
spark.catalog.setCurrentDatabase(schemaName) // Can't find the database

// Create an empty Delta table with the same schema as your streaming DataFrame  
spark.sql(s"""  
  CREATE TABLE IF NOT EXISTS $ucName.$schemaName.$icebergTableName (  
    ${streamingDataFrameSchema.toDDL}  
  )  
  TBLPROPERTIES(
  'delta.universalFormat.enabledFormats' = 'iceberg');
""")  


In [0]:
%scala
val icebergcheckpointLocation = "/mnt/bronze/iceberg_checkpoint/"

dbutils.fs.mkdirs(icebergcheckpointLocation)

In [0]:
%scala
import org.apache.spark.sql.streaming.Trigger  
  
val streamQuery = df.writeStream  
  .format("iceberg")  
  .outputMode("append")
  .option("checkpointLocation", icebergcheckpointLocation) // Choose an appropriate checkpoint location  
  .trigger(Trigger.ProcessingTime("30 seconds")) // Data will be written every 30 seconds
  .table(s"$ucName.$schemaName.$icebergTableName") // Write to the iceberg table  
  
streamQuery.awaitTermination()


In [0]:
%scala
// val ucName = "uc_hsshin"
// val schemaName = "kickthetires"
// val icebergTableName = "clickEvents"   

display(spark.sql(s"SELECT * FROM $ucName.$schemaName.$icebergTableName"))

In [0]:
%sql
SELECT * FROM uc_hsshin.kickthetires.clickEvents0

parsed_json
"{""user_profile"": [{""userid"": ""userid_0"", ""username"": ""Maria Dandridge"", ""gender"": ""female"", ""age"": 20, ""picture_url"": ""../images/profile/Maria Dandridge.png""}], ""car"": [{""carid"": ""car_2"", ""make"": ""Nissan"", ""year"": ""2018"", ""category"": ""Sedan"", ""model"": ""Altima"", ""color"": ""Gold"", ""pride"": 13000}]}"
"{""user_profile"": [{""userid"": ""userid_3"", ""username"": ""Owen Milford"", ""gender"": ""male"", ""age"": 30, ""picture_url"": ""../images/profile/Owen Milford.png""}], ""car"": [{""carid"": ""car_8"", ""make"": ""Ford"", ""year"": ""2016"", ""category"": ""Truck"", ""model"": ""F-150"", ""color"": ""Brown"", ""pride"": 17000}]}"
"{""user_profile"": [{""userid"": ""userid_4"", ""username"": ""Keith Farrar"", ""gender"": ""male"", ""age"": 50, ""picture_url"": ""../images/profile/Keith Farrar.png""}], ""car"": [{""carid"": ""car_3"", ""make"": ""Toyota"", ""year"": ""2012"", ""category"": ""Sedan"", ""model"": ""Camry"", ""color"": ""Silver"", ""pride"": 11000}]}"
"{""user_profile"": [{""userid"": ""userid_2"", ""username"": ""Nathan Finley"", ""gender"": ""male"", ""age"": 20, ""picture_url"": ""../images/profile/Nathan Finley.png""}], ""car"": [{""carid"": ""car_10"", ""make"": ""Ferarri"", ""year"": ""2015"", ""category"": ""SportCar"", ""model"": ""Spider"", ""color"": ""Red"", ""pride"": 70000}]}"
"{""user_profile"": [{""userid"": ""userid_3"", ""username"": ""Owen Milford"", ""gender"": ""male"", ""age"": 30, ""picture_url"": ""../images/profile/Owen Milford.png""}], ""car"": [{""carid"": ""car_0"", ""make"": ""Honda"", ""year"": ""2020"", ""category"": ""Sedan"", ""model"": ""Accord"", ""color"": ""Black"", ""pride"": 14000}]}"
"{""user_profile"": [{""userid"": ""userid_0"", ""username"": ""Maria Dandridge"", ""gender"": ""female"", ""age"": 20, ""picture_url"": ""../images/profile/Maria Dandridge.png""}], ""car"": [{""carid"": ""car_2"", ""make"": ""Nissan"", ""year"": ""2018"", ""category"": ""Sedan"", ""model"": ""Altima"", ""color"": ""Gold"", ""pride"": 13000}]}"
"{""user_profile"": [{""userid"": ""userid_3"", ""username"": ""Owen Milford"", ""gender"": ""male"", ""age"": 30, ""picture_url"": ""../images/profile/Owen Milford.png""}], ""car"": [{""carid"": ""car_1"", ""make"": ""Hyundai"", ""year"": ""2017"", ""category"": ""Sedan"", ""model"": ""Sonata"", ""color"": ""Gray"", ""pride"": 14000}]}"
"{""user_profile"": [{""userid"": ""userid_2"", ""username"": ""Nathan Finley"", ""gender"": ""male"", ""age"": 20, ""picture_url"": ""../images/profile/Nathan Finley.png""}], ""car"": [{""carid"": ""car_3"", ""make"": ""Toyota"", ""year"": ""2012"", ""category"": ""Sedan"", ""model"": ""Camry"", ""color"": ""Silver"", ""pride"": 11000}]}"
"{""user_profile"": [{""userid"": ""userid_4"", ""username"": ""Keith Farrar"", ""gender"": ""male"", ""age"": 50, ""picture_url"": ""../images/profile/Keith Farrar.png""}], ""car"": [{""carid"": ""car_3"", ""make"": ""Toyota"", ""year"": ""2012"", ""category"": ""Sedan"", ""model"": ""Camry"", ""color"": ""Silver"", ""pride"": 11000}]}"
"{""user_profile"": [{""userid"": ""userid_3"", ""username"": ""Owen Milford"", ""gender"": ""male"", ""age"": 30, ""picture_url"": ""../images/profile/Owen Milford.png""}], ""car"": [{""carid"": ""car_10"", ""make"": ""Ferarri"", ""year"": ""2015"", ""category"": ""SportCar"", ""model"": ""Spider"", ""color"": ""Red"", ""pride"": 70000}]}"


In [0]:
%scala
val updateddf = eventhubs.select(
  $"enqueuedTime".as("enqueued_time"),
  $"body".cast("string").as("json_string")
  )

In [0]:
%scala
// Create an empty Delta table with the same schema as your streaming DataFrame  
spark.sql(s"""  
  ALTER TABLE $ucName.$schemaName.$icebergTableName
    ADD COLUMNS (enqueued_time timestamp);
""")  

In [0]:
%scala
val icebergcheckpointLocation = "/mnt/bronze/iceberg_checkpoint/"

dbutils.fs.mkdirs(icebergcheckpointLocation)

val streamQuery = updateddf.writeStream  
  .format("iceberg")  
  .outputMode("append")
  .option("checkpointLocation", icebergcheckpointLocation) // Choose an appropriate checkpoint location  
  .option("mergeSchema", "true")
  .trigger(Trigger.ProcessingTime("30 seconds")) // Data will be written every 30 seconds
  .table(s"$ucName.$schemaName.$icebergTableName") // Write to the iceberg table  
  
streamQuery.awaitTermination()

In [0]:
%sql
--spark.sql(s"SELECT * FROM $ucName.$schemaName.$icebergTableName")
SELECT * FROM  uc_hsshin.kickthetires.clickevents0

parsed_json,enqueued_time,json_string
"{""user_profile"": [{""userid"": ""userid_0"", ""username"": ""Maria Dandridge"", ""gender"": ""female"", ""age"": 20, ""picture_url"": ""../images/profile/Maria Dandridge.png""}], ""car"": [{""carid"": ""car_2"", ""make"": ""Nissan"", ""year"": ""2018"", ""category"": ""Sedan"", ""model"": ""Altima"", ""color"": ""Gold"", ""pride"": 13000}]}",,
"{""user_profile"": [{""userid"": ""userid_3"", ""username"": ""Owen Milford"", ""gender"": ""male"", ""age"": 30, ""picture_url"": ""../images/profile/Owen Milford.png""}], ""car"": [{""carid"": ""car_8"", ""make"": ""Ford"", ""year"": ""2016"", ""category"": ""Truck"", ""model"": ""F-150"", ""color"": ""Brown"", ""pride"": 17000}]}",,
"{""user_profile"": [{""userid"": ""userid_4"", ""username"": ""Keith Farrar"", ""gender"": ""male"", ""age"": 50, ""picture_url"": ""../images/profile/Keith Farrar.png""}], ""car"": [{""carid"": ""car_3"", ""make"": ""Toyota"", ""year"": ""2012"", ""category"": ""Sedan"", ""model"": ""Camry"", ""color"": ""Silver"", ""pride"": 11000}]}",,
"{""user_profile"": [{""userid"": ""userid_2"", ""username"": ""Nathan Finley"", ""gender"": ""male"", ""age"": 20, ""picture_url"": ""../images/profile/Nathan Finley.png""}], ""car"": [{""carid"": ""car_10"", ""make"": ""Ferarri"", ""year"": ""2015"", ""category"": ""SportCar"", ""model"": ""Spider"", ""color"": ""Red"", ""pride"": 70000}]}",,
"{""user_profile"": [{""userid"": ""userid_3"", ""username"": ""Owen Milford"", ""gender"": ""male"", ""age"": 30, ""picture_url"": ""../images/profile/Owen Milford.png""}], ""car"": [{""carid"": ""car_0"", ""make"": ""Honda"", ""year"": ""2020"", ""category"": ""Sedan"", ""model"": ""Accord"", ""color"": ""Black"", ""pride"": 14000}]}",,
"{""user_profile"": [{""userid"": ""userid_0"", ""username"": ""Maria Dandridge"", ""gender"": ""female"", ""age"": 20, ""picture_url"": ""../images/profile/Maria Dandridge.png""}], ""car"": [{""carid"": ""car_2"", ""make"": ""Nissan"", ""year"": ""2018"", ""category"": ""Sedan"", ""model"": ""Altima"", ""color"": ""Gold"", ""pride"": 13000}]}",,
"{""user_profile"": [{""userid"": ""userid_3"", ""username"": ""Owen Milford"", ""gender"": ""male"", ""age"": 30, ""picture_url"": ""../images/profile/Owen Milford.png""}], ""car"": [{""carid"": ""car_1"", ""make"": ""Hyundai"", ""year"": ""2017"", ""category"": ""Sedan"", ""model"": ""Sonata"", ""color"": ""Gray"", ""pride"": 14000}]}",,
"{""user_profile"": [{""userid"": ""userid_2"", ""username"": ""Nathan Finley"", ""gender"": ""male"", ""age"": 20, ""picture_url"": ""../images/profile/Nathan Finley.png""}], ""car"": [{""carid"": ""car_3"", ""make"": ""Toyota"", ""year"": ""2012"", ""category"": ""Sedan"", ""model"": ""Camry"", ""color"": ""Silver"", ""pride"": 11000}]}",,
"{""user_profile"": [{""userid"": ""userid_4"", ""username"": ""Keith Farrar"", ""gender"": ""male"", ""age"": 50, ""picture_url"": ""../images/profile/Keith Farrar.png""}], ""car"": [{""carid"": ""car_3"", ""make"": ""Toyota"", ""year"": ""2012"", ""category"": ""Sedan"", ""model"": ""Camry"", ""color"": ""Silver"", ""pride"": 11000}]}",,
"{""user_profile"": [{""userid"": ""userid_3"", ""username"": ""Owen Milford"", ""gender"": ""male"", ""age"": 30, ""picture_url"": ""../images/profile/Owen Milford.png""}], ""car"": [{""carid"": ""car_10"", ""make"": ""Ferarri"", ""year"": ""2015"", ""category"": ""SportCar"", ""model"": ""Spider"", ""color"": ""Red"", ""pride"": 70000}]}",,


In [0]:
%scala
// End of Notebook