In [0]:
%run /Users/hilla.abramov@gmail.com/utils/stream_utils/extract_aws_credentials

In [0]:
from collections.abc import Callable # for type-hinting
from pyspark.sql.types import StructType # for type-hinting
from pyspark.sql.functions import from_json


class StreamProcessor:
    """
    Class containing ETL methods to be used on a data stream within the Stream Layer
    of the pipeline.

    Parameters:
    ----------
    kinesis_stream_name: str
        The name of the Kinesis data stream to read data from. This should match
        the naming of the data streams created for this dataset on the AWS Kinesis console.

    delta_table_name: str
        The name to give the delta table the processed stream data should be appended to.

    json_schema: pyspark.sql.types.Structype
        The StructType JSON input schema to define typing of new stream data after ingestion.
    
    cleaning_function: Callable
        The function defining how the data should be cleaned/transformed before being stored.

    Attributes:
    ----------
    _kinesis_stream_name: str
        Protected; the name of the Kinesis data stream to read data from.
        
    _delta_table_name: str
        Protected; the name of the Delta Table the processed stream data is appended to.
    
    _json_schema: pyspark.sql.types.StructType
        Protected; the StructType JSON input type schema according to which the new stream data is mapped immediately after ingestion.
        
    _input_streaming_df: NoneType or pyspark.sql.DataFrame
        Protected; initialised as None; assigned pyspark.sql.DataFrame when reading of stream begins.
        
    _cleaned_streaming_df: NoneType or pyspark.sql.DataFrame
        Protected; initialised as None; assigned pyspark.sql.DataFrame when cleaning of stream begins.
    
    _cleaning_function: Callable
        Protected; the function defining how the data should be cleaned/transformed before being stored.
    """
    def __init__(self, kinesis_stream_name: str, delta_table_name: str, json_schema: StructType, cleaning_function: Callable):
        """
        See help(StreamProcessor) for an accurate signature.
        """
        self._kinesis_stream_name = kinesis_stream_name
        self._delta_table_name = delta_table_name
        self._json_schema = json_schema
        self._input_streaming_df = None
        self._cleaned_streaming_df = None
        self._cleaning_function = cleaning_function

    def _read_stream_from_kinesis_to_spark_df(self) -> None:
        """
        Protected; method which initialises the structured streaming within Spark DataFrames, assigning a stream DataFrame to the object's _input_streaming_df atttribute, to which it reads the incoming data from Kinesis. Applies the input JSON schema stored at the object's _json_schema attribute to incoming data.
        """
        self._input_streaming_df = spark.readStream \
                                        .format('kinesis') \
                                        .option('streamName', self._kinesis_stream_name) \
                                        .option('initialPosition','earliest') \
                                        .option('region','us-east-1') \
                                        .option('awsAccessKey', ACCESS_KEY) \
                                        .option('awsSecretKey', SECRET_KEY) \
                                        .load() \
                                        .selectExpr("cast (data as STRING) jsonData") \
                                        .select(from_json("jsonData", self._json_schema).alias(self._kinesis_stream_name)) \
                                        .select(f"{self._kinesis_stream_name}.*")

    def _clean_stream_data(self) -> None:
        """
        Protected; method which applies the predefined cleaning function to the incoming stream of data, and saving the cleaned data to a pyspark.sql.DataFrame at the object's _cleaned_streaming_df attribute. 
        """
        self._cleaned_streaming_df = self._cleaning_function(self._input_streaming_df)
    
    def _write_stream_to_delta_table(self) -> None:
        """
        Protected; method which appends the cleaned stream data to its assigned Delta table.
        """
        checkpoint_location = f"/tmp/delta/{self._delta_table_name}/_checkpoints/"
        
        # first remove checkpoint location if already exists
        if checkpoint_location in dbutils.fs.ls("/tmp"):
            dbutils.fs.rm(checkpoint_location, True)

        # write stream to delta table
        self._cleaned_streaming_df.writeStream \
            .format("delta") \
            .outputMode("append") \
            .option("checkpointLocation", checkpoint_location) \
            .table(self._delta_table_name)
    
    def run_pipeline(self) -> None:
        """
        Method which runs the full ETL pipeline for the stream (reading from the data stream in Kinesis, cleaning the data and writing it to its Delta table).
        """
        self._read_stream_from_kinesis_to_spark_df()
        self._clean_stream_data()
        self._write_stream_to_delta_table()