In [0]:
import os
from io import StringIO
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import time
from datetime import datetime

Solution X

In [0]:
def ingest_transactions():
    """
    Ingests transactions from a CSV file and splits them into smaller chunks.

    This function reads a large transactions CSV file from a specified path, splits it into smaller chunks of 10,000 rows each,
    and saves each chunk as a separate CSV file in a specified directory. It waits for 1 second between processing each chunk.

    The function performs the following steps:
    1. Reads the full transactions file from the specified path.
    2. Splits the data into chunks of 10,000 rows each.
    3. Converts each chunk to a CSV string.
    4. Saves each chunk locally in the specified directory.
    5. Waits for 1 second before processing the next chunk.

    Returns:
        None
    """
    
    print("Mechanism X started...")
    print("Processing transactions...")
    transactions_path = "/Volumes/workspace/default/myprojectdata/transactions.csv"
    full_df = pd.read_csv(transactions_path)
    
    chunk_size = 10000
    total_chunks = len(full_df) // chunk_size + 1
    
    for i in range(total_chunks):
        start_idx = i * chunk_size
        end_idx = (i + 1) * chunk_size
        chunk = full_df.iloc[start_idx:end_idx]
        
        csv_buffer = StringIO()
        chunk.to_csv(csv_buffer, index=False)
        
        local_path = f"/Volumes/workspace/default/myprojectdata/raw_chunks/chunk_{i}.csv"
        with open(local_path, 'w') as f:
            f.write(csv_buffer.getvalue())
        
        time.sleep(1)
    print("Mechanism X completed successfully.")

Defining the patterns

In [0]:
def process_pattern1(df):
    """
    Detects pattern 1 in the given DataFrame.

    This function identifies the top 1% transactions by amount for each merchant, 
    joins them with customer importance data, filters for transactions with weight 
    less than or equal to 0.01, and further filters for merchants with more than 
    50,000 transactions. The final detections are saved.

    Args:
        df (DataFrame): Input DataFrame containing transaction data.

    Returns:
        None
    """
    window = Window.partitionBy("merchant").orderBy(col("amount").desc())
    ranked_df = df.withColumn("percent_rank", percent_rank().over(window))
    
    top_1_percent = ranked_df.filter(col("percent_rank") <= 0.01)
    
    customer_importance = spark.read.csv("/Volumes/workspace/default/myprojectdata/CustomerImportance.csv", header=True)
    joined_df = top_1_percent.join(customer_importance, 
                                 (top_1_percent.customer == customer_importance.Source) &
                                 (top_1_percent.merchant == customer_importance.Target))
    
    bottom_1_weight = joined_df.filter(col("weight") <= 0.01)
    
    merchant_totals = df.groupBy("merchant").agg(count("*").alias("totalTransactions"))
    valid_merchants = merchant_totals.filter(col("totalTransactions") > 50000)
    
    final_detections = bottom_1_weight.join(valid_merchants, "merchant")
    
    output = final_detections.select(
        lit(datetime.now()).alias("YStartTime"),
        lit(datetime.now()).alias("detectionTime"),
        lit("PatId1").alias("patternId"),
        lit("UPGRADE").alias("ActionType"),
        col("customer"),
        col("merchant").alias("MerchantId")
    )
    
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    output_path = f"/Volumes/workspace/default/myprojectdata/output_bucket/Pattern_1_detections_{timestamp}"
    df.write.csv(output_path, mode="overwrite", header=True)
    print("Pattern 1 processed successfully.")

In [0]:
def process_pattern2(df):
    """
    Detects pattern 2 in the given DataFrame.

    This function calculates average transaction amount and transaction count 
    for each customer-merchant pair, filters for pairs with average amount less 
    than 23 and transaction count greater than or equal to 80, and joins with 
    customer importance data. The final detections are saved.

    Args:
        df (DataFrame): Input DataFrame containing transaction data.

    Returns:
        None
    """
    customer_stats = df.groupBy("customer", "merchant").agg(
        avg("amount").alias("avgAmount"),
        count("*").alias("transactionCount")
    )
    
    pattern2_detections = customer_stats.filter(
        (col("avgAmount") < 23) & 
        (col("transactionCount") >= 80))
    
    customers = spark.read.csv("/Volumes/workspace/default/myprojectdata/CustomerImportance.csv", header=True)
    final_detections = pattern2_detections.join(customers, (pattern2_detections.customer == customers.Source))
    
    output = final_detections.select(
        lit(datetime.now()).alias("YStartTime"),
        lit(datetime.now()).alias("detectionTime"),
        lit("PatId2").alias("patternId"),
        lit("CHILD").alias("ActionType"),
        col("customer"),
        col("merchant").alias("Merchant")
    )
    
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    output_path = f"/Volumes/workspace/default/myprojectdata/output_bucket/Pattern_2_detections_{timestamp}"
    df.write.csv(output_path, mode="overwrite", header=True)
    print("Pattern 2 processed successfully.")

In [0]:
def process_pattern3(df):
    """
    Detects pattern 3 in the given DataFrame.

    This function calculates the distinct count of customers by gender for each 
    merchant, pivots the data to have separate columns for male and female counts, 
    and filters for merchants with more male customers than female customers. 
    The final detections are saved.

    Args:
        df (DataFrame): Input DataFrame containing transaction data.

    Returns:
        None
    """
    gender_counts = df.groupBy("merchant", "gender").agg(countDistinct("customer").alias("count"))
    pivoted = gender_counts.groupBy("merchant").pivot("gender", ["Male", "Female"]).sum("count")
    pattern3_detections = pivoted.filter(col("Male") > col("Female"))
    
    output = pattern3_detections.select(
        lit(datetime.now()).alias("YStartTime"),
        lit(datetime.now()).alias("detectionTime"),
        lit("PatId3").alias("patternId"),
        lit("DEI-NEEDED").alias("ActionType"),
        lit("").alias("customerName"),
        col("merchant").alias("MerchantId")
    )
    
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    output_path = f"/Volumes/workspace/default/myprojectdata/output_bucket/Pattern_3_detections_{timestamp}"
    df.write.csv(output_path, mode="overwrite", header=True)
    print("Pattern 3 processed successfully.")

Solution Y

In [0]:
def detect_patterns():
    """
    Detects patterns in transaction data chunks.

    This function monitors a directory for new transaction data chunks,
    processes each chunk to detect patterns, and moves the processed chunk to a 
    different directory. It performs the following steps:
    1. Lists all CSV files in the specified local directory.
    2. For each file, checks if it has already been processed.
    3. Reads the file into a Spark DataFrame.
    4. Applies pattern detection functions to the DataFrame.
    5. Moves the processed file to the processed directory.
    6. Waits for 1 second before checking for new files.

    Args:
        max_iterations (int): Maximum number of iterations to run the loop.

    Returns:
        None
    """

    print("\nMechanism Y started...")
    print("Calculating Patterns...")

    local_path = "/Volumes/workspace/default/myprojectdata/raw_chunks/"    
    chunks = [f for f in os.listdir(local_path) if f.endswith(".csv")]
    
    for file_name in chunks:
        file_path = os.path.join(local_path, file_name)
        
        df = spark.read.csv(file_path, header=True, inferSchema=True)
        
        process_pattern1(df)
        process_pattern2(df)
        process_pattern3(df)

    print("Mechanism Y completed successfully.")



In [0]:
if __name__ == "__main__":
    ingest_transactions()
    detect_patterns()

Mechanism X started...
Processing transactions...
Mechanism X completed successfully.

Mechanism Y started...
Calculating Patterns...
Pattern 1 processed successfully.
Pattern 2 processed successfully.
Pattern 3 processed successfully.
Pattern 1 processed successfully.
Pattern 2 processed successfully.
Pattern 3 processed successfully.
Pattern 1 processed successfully.
Pattern 2 processed successfully.
Pattern 3 processed successfully.
Pattern 1 processed successfully.
Pattern 2 processed successfully.
Pattern 3 processed successfully.
Pattern 1 processed successfully.
Pattern 2 processed successfully.
Pattern 3 processed successfully.
Pattern 1 processed successfully.
Pattern 2 processed successfully.
Pattern 3 processed successfully.
Pattern 1 processed successfully.
Pattern 2 processed successfully.
Pattern 3 processed successfully.
Pattern 1 processed successfully.
Pattern 2 processed successfully.
Pattern 3 processed successfully.
Pattern 1 processed successfully.
Pattern 2 proces