### Creating some global variables

In [9]:
kafka_broker = "localhost:9092" #Yet to figure out how the endpoint links to spark
bucket_prefix = "my-company-bucket-prefix-"

### Setting up Locations

To write the Delta table, we need 3 settings: the location of the delta table, the location of the checkpoints and the location of the schema file.

In [10]:
# path_to_bucket = "/mnt/10ac-batch-5/week9/g3"
bucket = "/mnt/10ac-batch-5/week9/g3/speech-to-text-delta"


delta_location = bucket + "/delta-table"
checkpoint_location = bucket + "/checkpoints"
schema_location = bucket + "/kafka_schema.json"


### The Schema
- Assuming the streaming data from kafka is in json format. To properly read this data into spark, we have to provide a schema.
- For efficiency, we will infer the schema one and save it to an s3 location so that every time we save data into the delta lake, we only have to infer rather than re-reading the schema

In [11]:
# ! pip install pyspark==3.3.0

In [15]:
## Making necessary imports
import json, os, re

# from delta.tables import *

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *

from delta.tables import *


ModuleNotFoundError: No module named 'delta'

In [16]:
##Method to infer the schema of kafka topic and return it in the json format

def infer_topic_schema_json(topic):

    df_json = (spark.read
               .format("kafka")
               .option("kafka.bootstrap.servers", kafka_broker)
               .option("subscribe", topic)
               .option("startingOffsets", "earliest")
               .option("endingOffsets", "latest")
               .option("failOnDataLoss", "false")
               .load()
               # filter out empty values
               .withColumn("value", expr("string(value)"))
               .filter(col("value").isNotNull())
               # get latest version of each record
               .select("key", expr("struct(offset, value) r"))
               .groupBy("key").agg(expr("max(r) r")) 
               .select("r.value"))
    
    # decode the json values
    df_read = spark.read.json(
      df_json.rdd.map(lambda x: x.value), multiLine=True)
    
    # drop corrupt records
    if "_corrupt_record" in df_read.columns:
        df_read = (df_read
                   .filter(col("_corrupt_record").isNotNull())
                   .drop("_corrupt_record"))
 
    return df_read.schema.json()

Now we are ready to write the schema to aws. We will be wriring this schema in the *schema_location* as determined in the *setting up loations* section

In [21]:

infer_schema = False #No schema available in s3 bucket

if not infer_schema:
  try:
      topic_schema_txt = dbutils.fs.head(schema_location) #Just directly read from the present schemab
  except:
    infer_schema = True
    pass

if infer_schema:
  # topic_schema_txt = infer_topic_schema_json(topic)
  
  dbutils.fs.rm(schema_location) #Remove the schema that was present
  dbutils.fs.put(schema_location, topic_schema_txt)# Puts a new schema in the specified location


  # Topic not yet read from kafka
  # Dbutils not yet installed

NameError: name 'dbutils' is not defined

### Reading Kafka Stream
Here, we will implement the usage of *readStream* which uses strutured streaming

In [None]:
def read_stream_kafka_topic(topic, schema):
    return (spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", kafka_broker)
            .option("subscribe", topic)
            .option("startingOffsets", "earliest")
            .option("failOnDataLoss", "false")
            .load()
            # filter out empty values
            .withColumn("value", expr("string(value)"))
            .filter(col("value").isNotNull())
            .select(
              # offset must be the first field, due to aggregation
              expr("offset as kafka_offset"),
              expr("timestamp as kafka_ts"),
              expr("string(key) as kafka_key"),
              "value"
            )
            # get latest version of each record
            .select("kafka_key", expr("struct(*) as r"))
            .groupBy("kafka_key")
            .agg(expr("max(r) r"))
            # convert to JSON with schema
            .withColumn('value', 
                        from_json(col("r.value"), topic_schema))
            .select('r.kafka_key', 
                    'r.kafka_offset', 
                    'r.kafka_ts', 
                    'value.*'
            

### Loading into dataframe
Now to the part we have all been waiting for!! The actual loading of the data in the format we are used to, *hehehe*

In [None]:
df = read_stream_kafka_topic(topic, topic_schema)

### Create Delta Table

Now, we have a ready dataframe. We will use the schema of this dataframe to make an empty dataframe. The empty dataframe created will just act as a blueprint to the creation of the delta table ie **The created empty dataframe can be used if a delta table doesn't already exist**

In [None]:
(spark
 .createDataFrame([], df.schema)
 .write
 .option("mergeSchema", "true")
 .format("delta")
 .mode("append")
 .save(delta_location))

### Clean Up

- We start this cleanup stage  early because this code will be executed everytime we run the script. Because we are using streaming we do not know when the update is finished. That’s why we prefer to clean the delta table before we start the streaming process.
- This is done by first optimizing the delta table where we coallace smaller files into larger ones *Optimize* then use *bin packing* to produce evenly-balanced data files with respect to their disk size


In [1]:
sql = "OPTIMIZE delta.`{}`".format(delta_location)
spark.sql(sql)
None

NameError: name 'delta_location' is not defined

Next, we vaccuum the delta to table to throw away data that is older than 7 days. This, however would depend with your specification of the days you'd rather have data retained in the delta table. Delta table grow way too large if at all we do not vaccuum it

In [None]:
sql = "VACUUM delta.`{}`".format(delta_location)
spark.sql(sql)
None