# How state store works in Spark
When a late sample of data with a timestamp comes, Spark check for any window in the state store to perform aggregation update for that late sample

With watermarking, there is a limit to which Spark can retain the information of past windows:
- If the watermarking is at 6 hours, windows which are prior to 6 hours are disposed
- And if there is late data from that particular window, Spark just omits that data, there is no update

# How to choose the appropriate watermarking
- What is the maximum possible delay?
- When are late samples not revelant?

i.e.1: A dashboard of transaction data, recording transactions from 9 to 17 (8 hours) &rarr; watermarking should be 8 hours

i.e.2: That same dashboard, but only requires 99% accuracy for just getting the feel of the transaction flow in the day.

You might want to make some analysis on the lateness of each transaction. **How late can we retain the transaction so that we get 99% transaction on the dashboard?** Can be 30 seconds, 30 minutes

In [1]:
import findspark
findspark.init()

import pyspark
from delta import *

In [2]:

builder = pyspark.sql.SparkSession.builder.appName('Trade Summarizer') \
                                          .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                                          .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
class TradeSumary():

    def __init__(self):
        
        self.BASE_DIR = '..'

    
    def get_schema(self):

        from pyspark.sql.types import StructType, StructField, StringType, DoubleType
        return StructType([
            StructField('created_time', StringType(), nullable=True),
            StructField('type', StringType(), nullable=True),
            StructField('amount', DoubleType(), nullable=True),
            StructField('broker_code', StringType(), nullable=True)
        ])


    def read_bronze(self):
        
        return spark.readStream.table('kafka_bz')
    

    def get_trade(self, kafka_df):

        from pyspark.sql.functions import from_json, expr
        return kafka_df.select(from_json(kafka_df.value, self.get_schema()).alias('value')) \
                       .select('value.*') \
                       .withColumn('created_time', expr("to_timestamp(created_time, 'yyyy-MM-dd HH:mm:ss')")) \
                       .withColumn('buy', expr("CASE WHEN type == 'BUY' THEN amount ELSE 0 END")) \
                       .withColumn('sell', expr("CASE WHEN type == 'SELL' THEN amount ELSE 0 END"))
    

    def aggregate_trade(self, trade_df):

        from pyspark.sql.functions import window, sum
        return trade_df.withWatermark('created_time', '30 minutes') \
                       .groupBy(window(trade_df['created_time'], '15 minutes')) \
                       .agg(
                           sum('buy').alias('total_buy'),
                           sum('sell').alias('total_sell')
                       ) \
                       .select('window.start', 'window.end', 'total_buy', 'total_sell')
    

    def process(self):

        print('Starting trade data extracting stream...', end='')

        raw_kakfa_df = self.read_bronze()
        trade_df = self.get_trade(raw_kakfa_df)
        aggregated_df = self.aggregate_trade(trade_df)
        streaming_query = aggregated_df.writeStream \
                                       .queryName('trade_summary') \
                                       .format('delta') \
                                       .option('checkpointLocation', f'{self.BASE_DIR}/checkpoint/trade_summary') \
                                       .outputMode('complete') \
                                       .toTable('trade_summary')

        print(' Done.\n')

        return streaming_query

In [4]:
class TradeSummaryTestSuite():

    def __init__(self):
        
        self.BASE_DIR = '..'


    def clean_up_for_testing(self):

        import shutil
        import os

        print('Starting cleaning...', end='')

        spark.sql('DROP TABLE IF EXISTS kafka_bz')
        spark.sql('DROP TABLE IF EXISTS trade_summary')

        shutil.rmtree(f'{self.BASE_DIR}/notebooks/spark-warehouse/kafka_bz')
        os.makedirs(f'{self.BASE_DIR}/notebooks/spark-warehouse/kafka_bz')
        
        shutil.rmtree(f'{self.BASE_DIR}/notebooks/spark-warehouse/trade_summary')
        os.makedirs(f'{self.BASE_DIR}/notebooks/spark-warehouse/trade_summary')

        shutil.rmtree(f'{self.BASE_DIR}/checkpoint/trade_summary')
        os.makedirs(f'{self.BASE_DIR}/checkpoint/trade_summary')

        spark.sql('CREATE TABLE kafka_bz(key String, value String) USING delta')

        print(' Done.')



    def wait_for_microbatch(self, sleep_time=15):

        import time

        print(f'\tWaiting for {sleep_time} seconds...', end='')
        time.sleep(sleep_time)

        print(' Done.')


    def assert_trade_summary(self, start, end, expected_buy, expected_sell):
        
        print('\tStarting Trade Summary validation...', end='')

        result = spark.sql(
            f'''
            SELECT total_buy, total_sell
            FROM trade_summary
            WHERE date_format(start, 'yyyy-MM-dd HH:mm:ss') = '{start}'
            AND date_format(end, 'yyyy-MM-dd HH:mm:ss') = '{end}';
            '''
        ).collect()

        actual_buy = result[0][0]
        actual_sell = result[0][1]

        assert expected_buy == actual_buy, f'Test failed! Expected buy is {expected_buy}. Got {actual_buy} instead.'
        assert expected_sell == actual_sell, f'Test failed! Expected sell is {expected_sell}. Got {actual_sell} instead.'

        print(' Done.\n')


    def run_stream_tests(self):

        # Sleep time between extract and transform operation
        sleep_time = 30  # Only works if sleep_time >= 30
        self.clean_up_for_testing()

        trade_summary_stream = TradeSumary()
        trade_summary_streaming_query = trade_summary_stream.process()

        print('Testing first 2 events...')
        spark.sql(
            '''
            INSERT INTO kafka_bz VALUES
                ('2019-02-05', '{"created_time": "2019-02-05 10:05:00", "type": "BUY", "amount": 500, "broker_code": "ABX"}'),
                ('2019-02-05', '{"created_time": "2019-02-05 10:12:00", "type": "BUY", "amount": 300, "broker_code": "ABX"}');
            '''
        )
        self.wait_for_microbatch(sleep_time=sleep_time)
        self.assert_trade_summary('2019-02-05 10:00:00', '2019-02-05 10:15:00', 800, 0)

        print('Testing 3rd and 4th events...')
        spark.sql(
            '''
            INSERT INTO kafka_bz VALUES
                ('2019-02-05', '{"created_time": "2019-02-05 10:20:00", "type": "BUY", "amount": 600, "broker_code": "ABX"}'),
                ('2019-02-05', '{"created_time": "2019-02-05 10:40:00", "type": "BUY", "amount": 900, "broker_code": "ABX"}');
            '''
        )
        self.wait_for_microbatch(sleep_time=sleep_time)
        self.assert_trade_summary('2019-02-05 10:15:00', '2019-02-05 10:30:00', 600, 0)
        self.assert_trade_summary('2019-02-05 10:30:00', '2019-02-05 10:45:00', 900, 0)

        print('Testing late event...')
        spark.sql(
            '''
            INSERT INTO kafka_bz VALUES
                ('2019-02-05', '{"created_time": "2019-02-05 10:48:00", "type": "SELL", "amount": 500, "broker_code": "ABX"}'),
                ('2019-02-05', '{"created_time": "2019-02-05 10:25:00", "type": "SELL", "amount": 400, "broker_code": "ABX"}');
            '''
        )
        self.wait_for_microbatch(sleep_time=sleep_time)
        self.assert_trade_summary('2019-02-05 10:45:00', '2019-02-05 11:00:00', 0, 500)
        self.assert_trade_summary('2019-02-05 10:15:00', '2019-02-05 10:30:00', 600, 400)

        print('All tests completed.\n')

        trade_summary_streaming_query.stop()


In [5]:
trade_summary_tester = TradeSummaryTestSuite()
trade_summary_tester.run_stream_tests()

Starting cleaning... Done.
Starting trade data extracting stream... Done.

Testing first 2 events...
	Waiting for 30 seconds... Done.
	Starting Trade Summary validation... Done.

Testing 3rd and 4th events...
	Waiting for 30 seconds... Done.
	Starting Trade Summary validation... Done.

	Starting Trade Summary validation... Done.

Testing late event...
	Waiting for 30 seconds... Done.
	Starting Trade Summary validation... Done.

	Starting Trade Summary validation... Done.

All tests completed.



In [6]:
spark.stop()