In [0]:
from pyspark.sql import functions as fn
from pyspark.sql.window import Window
from datetime import datetime
import pandas as pd
from io import StringIO
import boto3
Ystarttime=datetime.now()
def process_and_log(batchdf, batchId):
  batchdf.write.format("delta").mode("append").save("s3://databricksbucket101/delta/")
  df_custImp=spark.read.format("csv").options(header="true", inferSchema="true") \
  .load("s3://databricksbucket101/datafiles/CustomerImportance.csv")
  df_custImp=df_custImp.withColumn("Target",fn.regexp_replace(df_custImp.Target, "'", "")) \
  .withColumn("Source",fn.regexp_replace(df_custImp.Source, "'", ""))

  w = Window.partitionBy("Target").orderBy("weight")
  df_custImp_flt = df_custImp.withColumn("pct_rank", fn.percent_rank().over(w)) \
     .withColumn("percentile", fn.col("pct_rank") * 100).filter(fn.col("percentile") <= 10)
  df_tnxs = spark.read.format("delta").load("s3://databricksbucket101/delta/")
  df_tnxs_total = df_tnxs.groupby("customer").agg(fn.count("*").alias("total_customer_txns"))
  df_tnxs_cnts = df_tnxs.groupby("customer","merchant").agg(fn.count("*").alias("txn_cnt"),fn.avg("amount").alias ("avg_amt"))
  w = Window.partitionBy("merchant").orderBy("txn_cnt")
  df_tnxs_flt = df_tnxs_cnts.withColumn("pct_rank", fn.percent_rank().over(w)) \
     .withColumn("percentile",fn.col("pct_rank") * 100).filter(fn.col("percentile") > 90)
  df_patt1=df_tnxs_flt.join(df_tnxs_total, "customer").join(df_custImp_flt,((df_tnxs_flt["customer"]==df_custImp_flt  ["Source"]) & (df_tnxs_flt["merchant"]==df_custImp_flt["Target"])))
  patt1_time=datetime.now()
  df_patt1=df_patt1.select("customer","merchant").withColumn("patternId",fn.lit("1")).withColumn("ActionType",fn.lit  ("UPGRADE")).withColumn("YStarttime",fn.lit(Ystarttime)).withColumn("detectiontime",fn.lit(patt1_time))
  pandas_df = df_patt1.toPandas()


  df_patt2=df_tnxs_cnts.filter((fn.col("avg_amt")<23) & (fn.col("txn_cnt")>=80))
  patt2_time=datetime.now()
  df_patt2=df_patt2.select("customer","merchant").withColumn("patternId",fn.lit("2")).withColumn("ActionType",fn.lit  ("CHILD")).withColumn("YStarttime",fn.lit(Ystarttime)).withColumn("detectiontime",fn.lit(patt2_time))
  pandas_df = pd.concat([pandas_df, df_patt2.toPandas()], ignore_index=True)


  df_male=df_tnxs.filter(fn.col("gender")=="M").groupby("merchant").agg(fn.count("*").alias("male_cnt"))
  df_female=df_tnxs.filter(fn.col("gender")=="F").groupby("merchant").agg(fn.count("*").alias("female_cnt"))
  df_male_female=df_male.join(df_female, "merchant")
  df_patt3=df_male_female.filter((fn.col("male_cnt")>fn.col("female_cnt"))&(fn.col("female_cnt")>100))
  patt3_time=datetime.now()
  df_patt3=df_patt3.select("merchant").withColumn("customer",fn.lit("")).withColumn("patternId",fn.lit("3")). withColumn("ActionType",fn.lit("DEI-NEEDED")).withColumn("YStarttime",fn.lit(Ystarttime)).withColumn("detectiontime",  fn.lit(patt3_time))
  pandas_df = pd.concat([pandas_df, df_patt3.toPandas()], ignore_index=True)
  #display(pandas_df)

  try:
      s3_client = boto3.client('s3')
      i=0
      print(pandas_df.shape[0])
      while i<pandas_df.shape[0]:
        pd_chunk=pd.DataFrame(pandas_df.iloc[i:i+50])
        csv_buffer = StringIO()
        pd_chunk.to_csv(csv_buffer, index=False)
        s3_client.put_object(Bucket="databricksbucket101", Key=f"detections/detection_{i}.csv",Body=csv_buffer.getvalue())
        i=i+50
        print(i)
  except Exception as e:
      print(e)

stream = (
  spark.readStream
       .format("cloudFiles")
       .option("cloudFiles.format", "csv")
       .option("cloudFiles.schemaLocation", "s3://databricksbucket101/schema/")
       .load("s3://databricksbucket101/landing_zone/")
)

query = (
  stream.writeStream
        .foreachBatch(process_and_log)
        .option("checkpointLocation", "s3://databricksbucket101/checkpoint/")
        .trigger(availableNow=True)
        .start()
)
query.awaitTermination()