# Retail Transaction Analysis Backend (Spark + Flask + Ngrok)

This notebook runs the backend analysis service.
1. Be sure to run all cells.
2. Enter your Ngrok Authtoken when prompted (or hardcode it).
3. Copy the URL printed at the end (e.g., `http://xxxx.ngrok-free.app`) to your local dashboard.

In [None]:
# Install dependencies
!pip install pyspark flask flask-cors pyngrok

In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.functions import col, collect_set, struct, to_date, to_timestamp, sum as _sum, count, countDistinct, desc, date_format, coalesce, expr
from flask import Flask, request, jsonify
from flask_cors import CORS
from pyngrok import ngrok
import shutil

# Initialize Spark
spark = SparkSession.builder \
    .appName("RetailAnalysis") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

print("Spark Initialized")

In [None]:
# Analysis Functions

def standardize_columns(df):
    # aggressively standardize columns
    # 1. Strip whitespace and lower case map
    original_cols = df.columns
    col_map = {c.strip().lower(): c for c in original_cols}
    
    # Rename logic
    if 'price' in col_map and 'unitprice' not in col_map:
        df = df.withColumnRenamed(col_map['price'], 'UnitPrice')
        
    if 'invoice' in col_map and 'invoiceno' not in col_map:
        df = df.withColumnRenamed(col_map['invoice'], 'InvoiceNo')
        
    if 'invoicedate' in col_map and col_map['invoicedate'] != 'InvoiceDate':
        df = df.withColumnRenamed(col_map['invoicedate'], 'InvoiceDate')
        
    return df

def check_columns(df, required):
    missing = [c for c in required if c not in df.columns]
    if missing:
        return f"Missing columns: {', '.join(missing)}. Found: {', '.join(df.columns)}"
    return None

def get_common_df(df_path):
    # Helper to load and standardize DF
    try:
        df = spark.read.csv(df_path, header=True, inferSchema=True)
        df = standardize_columns(df)
        return df, None
    except Exception as e:
        return None, str(e)

def analyze_basket(df_path, min_support=0.01, min_confidence=0.1):
    df, err = get_common_df(df_path)
    if err: return None, err
    
    item_col = 'Description'
    if 'Description' not in df.columns and 'StockCode' in df.columns:
        item_col = 'StockCode'
        
    err = check_columns(df, ['InvoiceNo', item_col])
    if err: return None, err
    
    df = df.dropna(subset=['InvoiceNo', item_col])
    transactions = df.groupBy("InvoiceNo").agg(collect_set(item_col).alias("items"))
    
    fpGrowth = FPGrowth(itemsCol="items", minSupport=min_support, minConfidence=min_confidence)
    model = fpGrowth.fit(transactions)
    
    rules = model.associationRules.sort(col("lift").desc()).limit(50)
    rules_pdf = rules.toPandas()
    return rules_pdf.to_dict(orient='records'), None

def analyze_sales(df_path):
    df, err = get_common_df(df_path)
    if err: return None, err
    
    err = check_columns(df, ['InvoiceDate', 'Quantity', 'UnitPrice'])
    if err: return None, err
    df = df.dropna(subset=['InvoiceDate', 'Quantity', 'UnitPrice'])
    
    if 'TotalAmount' not in df.columns:
        df = df.withColumn("TotalAmount", col("Quantity") * col("UnitPrice"))
    
    # Robust Date Parsing
    df = df.withColumn("ParsedDate", 
        coalesce(
            expr("try_to_timestamp(InvoiceDate, 'dd-MM-yyyy HH:mm')"),
            expr("try_to_timestamp(InvoiceDate, 'M/d/yyyy H:m')"),
            expr("try_to_timestamp(InvoiceDate, 'dd/MM/yyyy HH:mm')"),
            expr("try_to_timestamp(InvoiceDate, 'yyyy-MM-dd HH:mm:ss')"),
            expr("try_to_timestamp(InvoiceDate, 'yyyy-MM-dd')"),
            expr("try_to_timestamp(InvoiceDate, 'M/d/yyyy')"),
            expr("try_to_timestamp(InvoiceDate, 'dd/MM/yyyy')")
        )
    )
    df = df.withColumn("Date", to_date(col("ParsedDate")))
    df = df.filter(col("Date").isNotNull())
    
    daily_sales = df.groupBy("Date").agg(_sum("TotalAmount").alias("DailySales")).orderBy("Date")
    pdf = daily_sales.toPandas()
    if pdf.empty: return None, "No valid dates found."
    
    pdf['Date'] = pdf['Date'].astype(str)
    return pdf.to_dict(orient='records'), None

def analyze_summary(df_path):
    df, err = get_common_df(df_path)
    if err: return None, err
    
    required = ['Quantity', 'UnitPrice', 'InvoiceNo', 'Customer ID']
    # Customer ID is optional, handle nicely if missing
    
    if 'TotalAmount' not in df.columns:
        df = df.withColumn("TotalAmount", col("Quantity") * col("UnitPrice"))
        
    # Aggregations
    total_revenue = df.agg(_sum("TotalAmount")).collect()[0][0]
    total_transactions = df.select("InvoiceNo").distinct().count()
    total_items = df.agg(_sum("Quantity")).collect()[0][0]
    
    total_customers = 0
    if "Customer ID" in df.columns:
         total_customers = df.select("Customer ID").distinct().count()
    elif "CustomerID" in df.columns:
         total_customers = df.select("CustomerID").distinct().count()
         
    return {
        "total_revenue": total_revenue or 0,
        "total_transactions": total_transactions,
        "total_items": total_items or 0,
        "total_customers": total_customers
    }, None

def analyze_top_products(df_path):
    df, err = get_common_df(df_path)
    if err: return None, err
    
    if 'TotalAmount' not in df.columns:
        df = df.withColumn("TotalAmount", col("Quantity") * col("UnitPrice"))
        
    item_col = 'Description'
    if 'Description' not in df.columns: item_col = 'StockCode'
    
    top = df.groupBy(item_col).agg(_sum("TotalAmount").alias("Revenue")).orderBy(desc("Revenue")).limit(5)
    return top.toPandas().to_dict(orient='records'), None

def analyze_by_country(df_path):
    df, err = get_common_df(df_path)
    if err: return None, err
    
    if 'Country' not in df.columns: return [], "'Country' column missing"
    
    if 'TotalAmount' not in df.columns:
        df = df.withColumn("TotalAmount", col("Quantity") * col("UnitPrice"))
        
    countries = df.groupBy("Country").agg(_sum("TotalAmount").alias("Revenue")).orderBy(desc("Revenue")).limit(10)
    return countries.toPandas().to_dict(orient='records'), None

In [None]:
# Flask API
app = Flask(__name__)
CORS(app, resources={r"/*": {"origins": "*", "allow_headers": ["Content-Type", "ngrok-skip-browser-warning"]}})

UPLOAD_FOLDER = "/content/uploads"
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
CURRENT_FILE = None

@app.route('/')
def home():
    return jsonify({"status": "running"})

@app.route('/upload', methods=['POST'])
def upload_file():
    global CURRENT_FILE
    if os.path.exists(UPLOAD_FOLDER): shutil.rmtree(UPLOAD_FOLDER)
    os.makedirs(UPLOAD_FOLDER, exist_ok=True)
    
    if 'file' not in request.files: return jsonify({"error": "No file part"}), 400
    file = request.files['file']
    if file.filename == '': return jsonify({"error": "No selected file"}), 400
    
    filepath = os.path.join(UPLOAD_FOLDER, "transaction_data.csv")
    file.save(filepath)
    CURRENT_FILE = filepath
    return jsonify({"message": "File uploaded successfully", "path": filepath})

@app.route('/analyze/basket', methods=['GET'])
def get_basket_analysis():
    global CURRENT_FILE
    if not CURRENT_FILE: return jsonify({"error": "No file uploaded"}), 400
    min_sup = float(request.args.get('min_support', 0.01))
    min_conf = float(request.args.get('min_confidence', 0.1))
    results, error = analyze_basket(CURRENT_FILE, min_sup, min_conf)
    if error: return jsonify({"error": error}), 500
    return jsonify(results)

@app.route('/analyze/sales', methods=['GET'])
def get_sales_analysis():
    global CURRENT_FILE
    if not CURRENT_FILE: return jsonify({"error": "No file uploaded"}), 400
    results, error = analyze_sales(CURRENT_FILE)
    if error: return jsonify({"error": error}), 500
    return jsonify(results)
    
@app.route('/analyze/summary', methods=['GET'])
def get_summary():
    global CURRENT_FILE
    if not CURRENT_FILE: return jsonify({"error": "No file uploaded"}), 400
    results, error = analyze_summary(CURRENT_FILE)
    if error: return jsonify({"error": error}), 500
    return jsonify(results)
    
@app.route('/analyze/top_products', methods=['GET'])
def get_top_products():
    global CURRENT_FILE
    if not CURRENT_FILE: return jsonify({"error": "No file uploaded"}), 400
    results, error = analyze_top_products(CURRENT_FILE)
    if error: return jsonify({"error": error}), 500
    return jsonify(results)
    
@app.route('/analyze/by_country', methods=['GET'])
def get_by_country():
    global CURRENT_FILE
    if not CURRENT_FILE: return jsonify({"error": "No file uploaded"}), 400
    results, error = analyze_by_country(CURRENT_FILE)
    if error: return jsonify({"error": error}), 500
    return jsonify(results)


In [None]:
# Start Ngrok and Flask
NGROK_AUTH_TOKEN = "36ezKIxPMhSIslCTqCaIAV8Od8M_5LfejcBq9MtTBb2ZZ7sGJ"
if NGROK_AUTH_TOKEN == "ENTER_YOUR_TOKEN_HERE":
    print("WARNING: Please set your NGROK_AUTH_TOKEN in the code cell above!")
else:
    ngrok.set_auth_token(NGROK_AUTH_TOKEN)
    public_url = ngrok.connect(5000)
    print(f"\n\n>>> PUBLIC URL: {public_url} <<<\n\n")
    app.run(port=5000)