## Stream Processing To Delta Lake
Must have data streaming to the topic "video_usage".

*Note: If not working, try changing the GROUP_ID and Consumer Group values to reset

### Shared imports and variables
Run this first since most cells below need at least one of these imports or variables

In [0]:
from pyspark.sql.functions import col, desc, regexp_replace, substring, to_date, from_json, explode, expr
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, IntegerType, BooleanType, TimestampType

date_format = "yyyy-MM-dd HH:mm:ss"

video_views_delta_path = "/mnt/adlsdemo/usage/video"

# Define a schema that Spark understands. This is one of several ways to do it.
usage_schema = StructType([
    StructField("usageId", IntegerType()),
    StructField("user", StringType()),
    StructField("completed", BooleanType()),
    StructField("durationSeconds", IntegerType()),
    StructField("eventTimestamp", TimestampType())
])


### Stream Load of incoming data - Video Views
Read streaming data from Confluent Cloud or Event Hubs (using Apache Kafka API) and save in the same delta location within Azure Data Lake Storage (ADLS).

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

run_version = "v0.5"

topic = 'video_usage'
GROUP_ID = f'tst-group-video-usage-{run_version}'

# To setup Key Vault backed secret scope for this first time, replace items in url and follow instructions: 
#   https://<databricks-instance>/#secrets/createScopeSetup

def get_event_hub_config():
    # Password is really a Event Hub connection string, for example ->
    # Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=ReadWriteTmp;SharedAccessKey=vhNXxXXXXXxxxXXXXXXXxx=;EntityPath=demo-message-1
  password = dbutils.secrets.get(scope = "demo", key = "eh-sasl-{0}".format(topic))

  EH_SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{0}";'.format(password)

  config = {
      'kafka.bootstrap.servers': 'dustin-demo-eh.servicebus.windows.net:9093',
      'kafka.security.protocol': 'SASL_SSL',
      'kafka.sasl.mechanism': 'PLAIN',
      'kafka.group.id': GROUP_ID,
      'kafka.request.timeout.ms': "60000",
      'kafka.session.timeout.ms': "20000",
      'kafka.heartbeat.interval.ms': "10000",
      'kafka.sasl.jaas.config': EH_SASL,
      'subscribe': topic
  }
  return config


def get_confluent_cloud_config():
  bootstrap_servers = dbutils.secrets.get(scope = "demo", key = "confluent-cloud-brokers")
  username = dbutils.secrets.get(scope = "demo", key = "confluent-cloud-user")
  password = dbutils.secrets.get(scope = "demo", key = "confluent-cloud-password")
  SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{0}" password="{1}";'.format(username, password)

  config = {
      'kafka.bootstrap.servers': bootstrap_servers,
      'kafka.security.protocol': 'SASL_SSL',
      'kafka.sasl.mechanism': 'PLAIN',
      'kafka.group.id': GROUP_ID,
      'kafka.request.timeout.ms': "60000",
      'kafka.session.timeout.ms': "20000",
      'kafka.heartbeat.interval.ms': "10000",
      'kafka.sasl.jaas.config': SASL,
      'subscribe': topic
  }
  return config


consumer_config = get_confluent_cloud_config()
                                 
# Read from Kafka, format will be a kafka record
input_df = spark.readStream.format("kafka").options(**consumer_config).load()

# Cast just the value as a string (instead of bytes) then use from_json to convert to an object matching the schema
json_df = (
  input_df.select(
    from_json(col("value").cast("string"), usage_schema).alias("json"),
    col("value").cast("string").alias("value_raw")
  )
)

# Select all attribues from json as individual columns, cast trip_distance, add columns
transformed_df = (
    json_df
      .select("json.*", "value_raw")
)

# display(transformed_df)


## Azure Storage as a destination
* One option for streaming output is to write directly to you data lake storage (Azure Data Lake Storage Gen 2 or standard Azure Blob Storage).
* Databricks Delta / Delta Lake file format makes this more efficient, but could do with Parquet, Avro or other formats.

In [0]:
video_views_delta_path_2 = video_views_delta_path + "_" + run_version

(
transformed_df.writeStream
  .queryName("StreamingVideoViewsDelta")
  .format("delta")
  .outputMode("append")
  .trigger(processingTime="5 seconds")
  .option("checkpointLocation", f"/delta/events/_checkpoints/streaming_video_views_{run_version}")
  .start(video_views_delta_path_2)
)

In [0]:
# Read data out of delta table
# delta_stream_df = spark.readStream.format("delta").load(video_views_delta_path_2)
# display(delta_stream_df)

### Alternatively: Send transformed data to Event Hubs for next steps in pipeline

In [0]:
# topic2 = 'demo-message-transformed'

# producer_config = consumer_config
# producer_config.pop('subscribe')
# producer_config['topic'] = topic2

# kafka_output_df = trip_df.selectExpr(
#     "CAST(VendorId as STRING) as key",
#     "to_json(struct(*)) as value")

# # display(kafka_output_df)
# kafka_output_df.writeStream \
#   .format("kafka") \
#   .options(**producer_config) \
#   .option("checkpointLocation", f"/delta/events/_checkpoints/cp_{run_version}") \
#   .start()