In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
from datetime import datetime
from pyspark.sql.window import Window
import json
import random

In [None]:
def create_spark_session():
    import pyspark
    import os
    from pyspark.sql import SparkSession
    spark=SparkSession.builder.appName('transactions').getOrCreate()
    os.environ['PYSPARK_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python311/python.exe"
    os.environ['PYSPARK_DRIVER_PYTHON'] = "C:/Users/Administrator/AppData/Local/Programs/Python/Python311/python.exe"
    
    return spark

In [None]:
def create_datastreamReader():
    userSchema = StructType().add("Acc No", "integer").add("Transaction", "string").add("Time Stamp", "string").add("Amount", "integer")
    streaming_df = spark.readStream.schema(userSchema) \
        .option('maxFilesPerTrigger', 1)\
        .json('./data')

    accountDF = streaming_df.withColumn('Times', to_timestamp(col('Time Stamp'))).drop('Time Stamp')
    spark.conf.set("spark.sql.shuffle.partitions", 1)

    return accountDF

In [None]:
def get_aggregate_count_lessthan5L():
    lessthan_five_lakh = accountDF.select('Acc No', 'Amount', 'Times').where(col('Amount') < 500000).groupBy('Acc No').count()
    lessquery = lessthan_five_lakh.writeStream.queryName('Min_Amount_complete')\
        .trigger(processingTime ="20 seconds")\
        .format('console')\
        .outputMode('complete')\
        .start()
    lessquery.awaitTermination()

In [None]:
def get_trans_details_perTrigger():
    lessthan_five_lakh = accountDF.select('Acc No', 'Amount', 'Times').where(col('Amount') < 500000)
    append_query = lessthan_five_lakh.writeStream\
        .trigger(processingTime ="20 seconds")\
        .format('console')\
        .outputMode('append')\
        .start()
    append_query.awaitTermination()
    

In [None]:
# Count of records which satisfy the given condition for each trigger
def transactionFilter_withWaterMark():
    windowedCounts = (
        accountDF.withColumn("Times", to_timestamp(col("Times")))\
        .withWatermark("Times", "1 minute")\
        .where(col("Amount") < 500000)\
        .groupBy("Acc No", window("Times", "20 seconds"))\
        .count()
    )

    query = (
        windowedCounts.writeStream.outputMode("append")\
        .format("console")\
        .option("truncate", "false")\
        .trigger(processingTime="20 seconds")\
        .start()
    )
    
    query.awaitTermination()  
    

In [None]:
# Count of records which satisfy the given condition for each trigger
def transactionFilter_withWaterMark():
    windowedCounts = (
        accountDF.withColumn("Times", to_timestamp(col("Times")))\
        .withWatermark("Times", "1 minute")\
        .where(col("Amount") < 500000)\
        .groupBy("Acc No", window("Times", "20 seconds"))\
        .count()
    )

    query = (
        windowedCounts.writeStream.outputMode("append")\
        .format("csv")\
        .option("truncate", "false")\
        .trigger(processingTime="20 seconds")\
        .option("checkpointLocation", "./checkpoint/")\
        .option("path", "./outputDir/")\
        .start()
    )
    
    query.awaitTermination()  
    

In [None]:
spark = create_spark_session()
accountDF = create_datastreamReader()
# get_aggregate_count_lessthan5L()
# get_trans_details_perTrigger()
transactionFilter_withWaterMark()
