# **Importing the Libraries required**

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from io import StringIO
from datetime import datetime as t
import time,boto3,os,psycopg2,pandas as pd
from pyspark.sql import Row

postgres_pwd = 'Sanidhya2424#'

jar_path = "dbfs:/FileStore/postgresql_42_7_6.jar"
sc._jsc.addJar(jar_path)

jdbc_url = "jdbc:postgresql://database-1.czy4sq8iitbm.ap-south-1.rds.amazonaws.com:5432/postgres"

properties = {
    "user": "postgres",
    "password": postgres_pwd,
    "driver": "org.postgresql.Driver"
}


secret = dbutils.secrets.get(scope = "gsrscope", key = "secret")
t_ID	 =  'a940799b-8d78-44ef-861d-12ad29d1d758' 
app_ID   =  'dae49eca-5bcd-4cb7-bcc1-b90a3c8d4a3d' 

storage_account_name = 'devdolphins'

spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net",app_ID)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net", f"https://login.microsoftonline.com/{t_ID}/oauth2/token")


### **Creating a dataFrame from a table which is constant throughout the process.**

In [0]:
bottom_1pct_weights_df = spark.read.jdbc(
        url=jdbc_url,
        table="bottom_1pct_weights",
        properties=properties
    )

### **Defining the functions required**

In [0]:
def update_postgres_data(df):

    conn, cur = postgres_cur()

    # Group by customer and merchant, aggregate transaction count and average amount
    grouped_df = df.groupBy(col('customer'), col('merchant')).agg(
        F.count("*").alias("txn_count"),
        F.avg("amount").alias("avg_amount"),
        F.first("gender").alias("gender")
    )

    # Write to a staging table in PostgreSQL
    grouped_df.write.jdbc(
        url=jdbc_url,
        table="staging_customer_merchant_txn_counts",
        mode="overwrite",
        properties=properties
    )

    # Upsert the data into the main table
    upsert_sql = """
        INSERT INTO customer_merchant_txn_gender_counts (customer, merchant,txn_count,avg_amount,gender)
        SELECT customer,merchant,txn_count, avg_amount,gender
        FROM staging_customer_merchant_txn_counts
        ON CONFLICT (customer, merchant)
        DO UPDATE 
        SET txn_count = customer_merchant_txn_gender_counts.txn_count + EXCLUDED.txn_count,
            avg_amount = (
                ((customer_merchant_txn_gender_counts.avg_amount *     customer_merchant_txn_gender_counts.txn_count) + 
                 (EXCLUDED.avg_amount * EXCLUDED.txn_count)) /
                (customer_merchant_txn_gender_counts.txn_count + EXCLUDED.txn_count)
            );
    """
    cur.execute(upsert_sql)
    conn.commit()


    # Drop the staging table
    cur.execute("DROP TABLE IF EXISTS staging_customer_merchant_txn_counts;")
    conn.commit()

    cur.close()
    conn.close()

# Pattern 1: Top 1% high-transacting customers per merchant

def pat1(jdbc_url, properties,df):
    

    customer_merchant_txn_counts = df

   # 1. Create window for ranking
    window_spec_by_merchant = Window.partitionBy("merchant").orderBy(F.col("txn_count").desc())

    # 2. Percent_Rank customers by txn_count within each merchant
    ranked_df = customer_merchant_txn_counts.withColumn("percentile_rank", F.percent_rank().over(window_spec_by_merchant))

    # 3. Filter top 1 percentile (percentile_rank>=0.99)
    # top_1pct_customers_df = ranked_df.filter(col('percentile_rank')>=0.99).select("customer", "merchant", "txn_count", "txn_rank", "total_txn")
    top_1pct_customers_df = ranked_df.filter(col('percentile_rank')>=0.99).select("customer", "merchant")

    # Join with weight data
    joined_df = top_1pct_customers_df.alias("t").join(
        bottom_1pct_weights_df.alias("b"),
        on=["customer", "merchant"],
        how="inner"
    )

    # Add metadata columns
    pattern_matches_df = joined_df.withColumn("ystarttime", F.current_timestamp())
    pattern_matches_df = pattern_matches_df.withColumn("detectiontime", F.current_timestamp())
    pattern_matches_df = pattern_matches_df.withColumn("patternid", F.lit("PatId1"))
    pattern_matches_df = pattern_matches_df.withColumn("actiontype", F.lit("UPGRADE"))

    # Select final output columns
    pattern_matches_df = pattern_matches_df.select(
        "ystarttime", "detectiontime", "patternid", "actiontype", "customer", "merchant"
    )

    return pattern_matches_df


    

# Pattern 2: Child customers based on avg amount and txn count
def pat2(jdbc_url, properties,df):
    

    child_customers_df  = df

    # Identify customers with low avg amount but high txn count
    child_customers_df = child_customers_df.filter(
        (col("avg_amount") < 23) & (col("txn_count") >= 80)
    )

    # Add metadata
    child_customers_df = child_customers_df.withColumn("ystarttime", F.current_timestamp())
    child_customers_df = child_customers_df.withColumn("detectiontime", F.current_timestamp())
    child_customers_df = child_customers_df.withColumn("patternid", F.lit("PatId2"))
    child_customers_df = child_customers_df.withColumn("actiontype", F.lit("CHILD"))

    # Select final output columns
    child_customers_df = child_customers_df.select(
        "ystarttime", "detectiontime", "patternid", "actiontype", "customer", "merchant"
    )

    return child_customers_df

def pat3(jdbc_url, properties,df):

    merchant_gender = df


    merchant_gender = merchant_gender.groupBy("merchant").agg(
        F.sum(F.when(col("gender") == "'M'", 1).otherwise(0)).alias("male"),
        F.sum(F.when(col("gender") == "'F'", 1).otherwise(0)).alias("female")
    )

    # merchant_gender.display()
    # Identify merchants with gender imbalance
    merchant_gender = merchant_gender.filter((col('male') > col('female')) & (col('female') > 100))

    # Add metadata
    merchant_gender = merchant_gender.withColumn("ystarttime", F.current_timestamp())
    merchant_gender = merchant_gender.withColumn("detectiontime", F.current_timestamp())
    merchant_gender = merchant_gender.withColumn("patternid", F.lit("PatId3"))
    merchant_gender = merchant_gender.withColumn("actiontype", F.lit("DEI-NEEDED"))

    # Final selection with customer as empty
    merchant_gender = merchant_gender.select(
        "ystarttime", "detectiontime", "patternid", "actiontype",
        F.lit("").alias("customer"),
        "merchant"
    )

    return merchant_gender

# PostgreSQL connection helper
def postgres_cur():
    host = "database-1.czy4sq8iitbm.ap-south-1.rds.amazonaws.com"
    port = "5432"
    db = "postgres"
    user_name = "postgres"
    user_pass = postgres_pwd
    conn = psycopg2.connect(host=host, port=port, database=db, user=user_name, password=user_pass)
    return conn, conn.cursor()

# Get next data chunk to process
def get_next_chunk():
    conn, cur = postgres_cur()
    cur.execute("""
                    SELECT id, chunk_index, s3_path 
                    FROM chunk_tracking 
                    WHERE status = 'written' 
                    ORDER BY chunk_index ASC 
                    LIMIT 1
                    FOR UPDATE SKIP LOCKED;
                """)
    data = cur.fetchone()
    cur.close()
    conn.close()
    return data

# De-duplicate entries in final_output based on customer + merchant + ystarttime
def dedupe_final_output():
    conn, cur = postgres_cur()
    cur.execute("""
                    WITH ranked_rows AS (
                        SELECT ctid,
                            ROW_NUMBER() OVER (PARTITION BY customer, merchant ORDER BY ystarttime ASC) AS rn
                        FROM final_output
                    )
                    DELETE FROM final_output
                    WHERE ctid IN ( SELECT ctid FROM ranked_rows WHERE rn > 1);
                """)
    conn.commit()
    cur.close()
    conn.close()
    return True

# Update the status of a chunk after processing
def update_chunk_status(chunk_index, status):
    conn, cur = postgres_cur()

    cur.execute("BEGIN;")

    # Lock the row first to prevent race conditions
    cur.execute("""
                    SELECT * FROM chunk_tracking
                    WHERE chunk_index = %s
                    FOR UPDATE;
                """, (chunk_index,))

    # Update the status with timestamp
    cur.execute("""
                    UPDATE chunk_tracking 
                    SET status = %s, updated_at = %s 
                    WHERE chunk_index = %s;
                """, (status, t.utcnow(), chunk_index))

    cur.execute("COMMIT;")
    conn.commit()

    cur.close()
    conn.close()

# Generate the final output to be published
def generate_output():
    conn, cur = postgres_cur()
    query = """         
        WITH to_publish AS (
            SELECT merchant, customer
            FROM final_output
            WHERE published IS NULL
            ORDER BY ystarttime
            LIMIT 50
            FOR UPDATE SKIP LOCKED
        ),
        check_count AS (
            SELECT COUNT(*) AS cnt FROM to_publish
        ),
        update_rows AS (
            UPDATE final_output
            SET published = 'published'
            WHERE (merchant, customer) IN (
                SELECT merchant, customer FROM to_publish
            )
            AND EXISTS (
                SELECT 1 FROM check_count WHERE cnt = 50
            )
            RETURNING *
        )
        SELECT * FROM update_rows;
    """
    cur.execute("BEGIN;")
    cur.execute(query)
    published_rows = cur.fetchall()
    cur.execute("COMMIT;")

    schema = StructType([
        StructField("ystarttime", TimestampType(), True),
        StructField("detectiontime", TimestampType(), True),
        StructField("patternid", StringType(), True),
        StructField("actiontype", StringType(), True),
        StructField("customer", StringType(), True),
        StructField("merchant", StringType(), True),
        StructField("published", StringType(), True)
    ])

    # Convert to Spark DataFrame
    df = spark.createDataFrame(published_rows, schema=schema)
    df = df.select(
        col("ystarttime"),
        col("detectiontime"),
        col("patternid"),
        col("actiontype"),
        col("customer"),
        col("merchant")
    )

    cur.close()
    conn.close()
    return df

def get_filename(storage_account: str, container: str, folder_path: str) -> str:
    
    base_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{folder_path}"
    
    files = dbutils.fs.ls(base_path)
    
    # Filter for .csv files only
    csv_files = [f.path for f in files if f.path.endswith(".csv")]
    
    if len(csv_files) != 1:
        raise Exception(f"Expected exactly 1 CSV file in {base_path}, but found {len(csv_files)}")
    
    return csv_files[0]


### **Producer Process that loads data from transactions.csv and load to container incrementally.**

In [0]:
# Parameters
input = f"transactions.csv"   # Change to your CSV file path

chunk_size = 10000


conn,cur = postgres_cur()

# Load CSV
df = spark.read.format('csv').option('header',True).option('inferSchema',True).load(f'abfss://source@devdolphins.dfs.core.windows.net/{input}')



# Adding Index
df_with_index = df.rdd.zipWithIndex().map(lambda row_index: Row(**row_index[0].asDict(), row_id=row_index[1]))
df_indexed = spark.createDataFrame(df_with_index)

# Total row count
total_rows = df_indexed.count()
total_chunks = (total_rows + chunk_size - 1) // chunk_size

# Loop through chunks
for i in range(total_chunks):
    start = i * chunk_size
    end  = start + chunk_size

    # chunk_df = limit(chunk_size + offset).subtract(df.limit(offset))

    chunk_df = df_indexed.filter((col("row_id")>=start) & (col("row_id")<end)).drop('row_id')
    record_count = chunk_df.count()

    # Write to container
    output_path = f"abfss://raw@devdolphins.dfs.core.windows.net/partitions/chunk_{i}"
    chunk_df.coalesce(1).write.option("header", "true").mode("overwrite").csv(output_path)

    # Insert into PostgreSQL
    try:
        cur.execute("""
                            INSERT INTO chunk_tracking (chunk_index, s3_path, record_count, status, created_at)
                            VALUES (%s, %s, %s, %s, %s)
                    """,(i, output_path, record_count, 'written',t.utcnow()))
        
        conn.commit()
        print(f"[{i+1}/{total_chunks}] Written to container and logged to DB: {output_path}")

    except Exception as e:
        print(f"DB insert failed for chunk {i}: {e}")
        conn.rollback()
    # time.sleep(sleep_seconds)
cur.close()
conn.close()

[1/60] Written to container and logged to DB: abfss://raw@devdolphins.dfs.core.windows.net/partitions/chunk_0
[2/60] Written to container and logged to DB: abfss://raw@devdolphins.dfs.core.windows.net/partitions/chunk_1
[3/60] Written to container and logged to DB: abfss://raw@devdolphins.dfs.core.windows.net/partitions/chunk_2
[4/60] Written to container and logged to DB: abfss://raw@devdolphins.dfs.core.windows.net/partitions/chunk_3
[5/60] Written to container and logged to DB: abfss://raw@devdolphins.dfs.core.windows.net/partitions/chunk_4
[6/60] Written to container and logged to DB: abfss://raw@devdolphins.dfs.core.windows.net/partitions/chunk_5
[7/60] Written to container and logged to DB: abfss://raw@devdolphins.dfs.core.windows.net/partitions/chunk_6
[8/60] Written to container and logged to DB: abfss://raw@devdolphins.dfs.core.windows.net/partitions/chunk_7
[9/60] Written to container and logged to DB: abfss://raw@devdolphins.dfs.core.windows.net/partitions/chunk_8
[10/60] Wr

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

### **Consumer Process to consume data from lake sequentially and detect the patterns **

In [0]:
# PostgreSQL setup
pg_conn,pg_cursor = postgres_cur()
output_index = 0


try:
    while True:
        
        chunk = get_next_chunk()
        if not chunk:
            print("No more chunks to process.")
            break
        
        chunk_id, chunk_index, s3_path = chunk
        print(f"Picked chunk {chunk_index} from {s3_path}")

        try:

            try:
                file_name = get_filename('devdolphins','raw',f'partitions/chunk_{chunk_index}')
                df = spark.read.format('csv').option('header',True).option('inferSchema',True).load(file_name)
            except Exception as e:
                print(f'Can\'t load DF {chunk_index}:{e}')

            try:
                update_postgres_data(df)
                df = spark.read.jdbc(
                                        url=jdbc_url,
                                        table="customer_merchant_txn_gender_counts",
                                        properties=properties
                                    )

                pat1_df = pat1(jdbc_url,properties,df)
                pat2_df = pat2(jdbc_url,properties,df)
                pat3_df = pat3(jdbc_url,properties,df)

                # union_df = pat2_df.unionByName(pat3_df)
                union_df = pat1_df.unionByName(pat2_df).unionByName(pat3_df)

            
                union_df.write.jdbc(url=jdbc_url,table="final_output", mode="overwrite",properties=properties)

               
                output_df = generate_output()
                if output_df.count() ==50:
                    output_index = output_index + 1 
                    output_path = f"abfss://processed@devdolphins.dfs.core.windows.net/chunks/output_{output_index}"
                    chunk_df.coalesce(1).write.option("header", "true").mode("overwrite").csv(output_path)
                    print(f'output written {output_index}')

                update_chunk_status(chunk_index, 'processed')
                print(f"Chunk {chunk_index} processed.")
            
            except Exception as e:
                print(f'Processing failed {chunk_index}:{e}')
                update_chunk_status(chunk_index, 'failed')

        except Exception as e:
            print(f"Error processing chunk {chunk_index}: {e}")
            update_chunk_status(chunk_index, 'failed')

finally:
    pg_cursor.close()
    pg_conn.close()