In [0]:
#CREATE MARKET BASKET FORMAT
# Create a view with all transactions (baskets)
spark.sql("""
CREATE OR REPLACE TEMP VIEW market_baskets AS
SELECT 
    o.order_id,
    o.user_id,
    COLLECT_LIST(p.product_name) as products,
    COLLECT_LIST(op.product_id) as product_ids,
    COUNT(*) as basket_size
FROM workspace.instacart.orders o
JOIN workspace.instacart.order_products_prior op ON o.order_id = op.order_id
JOIN workspace.instacart.products p ON op.product_id = p.product_id
WHERE o.eval_set = 'prior'
GROUP BY o.order_id, o.user_id
HAVING basket_size >= 2
""")
display(spark.table('market_baskets'))

In [0]:

# ============================================
# PART 2: PYSPARK ASSOCIATION ANALYSIS
# ============================================


from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.types import *
import pandas as pd

# Initialize Spark Session (already available in Databricks)
spark = SparkSession.builder.appName("InstacartAssociation").getOrCreate()

# 1. Prepare transaction data for FP-Growth
def prepare_transactions_for_fpgrowth():
    """
    Prepare data in the format required by FP-Growth algorithm
    """
    transactions_df = spark.sql("""
        SELECT 
            o.order_id,
            COLLECT_LIST(CAST(op.product_id AS STRING)) as items
        FROM workspace.instacart.orders o
        JOIN workspace.instacart.order_products_prior op ON o.order_id = op.order_id
        WHERE o.eval_set = 'prior'
        GROUP BY o.order_id
        HAVING COUNT(*) >= 2
        LIMIT 100000  -- Start with subset for faster processing
    """)
    
    return transactions_df

# 2. Run FP-Growth Algorithm
def run_fpgrowth_analysis(transactions_df, min_support=0.001, min_confidence=0.1):
    """
    Run FP-Growth algorithm to find frequent itemsets and association rules
    
    Parameters:
    - min_support: Minimum support threshold (default 0.1% of transactions)
    - min_confidence: Minimum confidence for association rules
    """
    
    # Create FP-Growth model
    fpGrowth = FPGrowth(
        itemsCol="items", 
        minSupport=min_support, 
        minConfidence=min_confidence
    )
    
    # Fit the model
    model = fpGrowth.fit(transactions_df)
    
    # Get frequent itemsets
    frequent_itemsets = model.freqItemsets
    
    # Get association rules
    association_rules = model.associationRules
    
    return frequent_itemsets, association_rules, model

# 3. Analyze Association Rules with Product Names
def analyze_rules_with_names(association_rules):
    """
    Join association rules with product names for better interpretation
    """
    
    # Create a product lookup table
    products_lookup = spark.sql("""
        SELECT 
            CAST(product_id AS STRING) as product_id,
            product_name,
            aisle_id,
            department_id
        FROM workspace.instacart.products
    """)
    
    # Convert rules to a more interpretable format
    rules_expanded = association_rules.select(
        col("antecedent").alias("antecedent_ids"),
        col("consequent").alias("consequent_ids"),
        col("confidence"),
        col("lift"),
        col("support")
    )
    
    return rules_expanded

# 4. Find Product Associations for Specific Products
def find_product_associations(product_name_pattern):
    """
    Find associations for products matching a name pattern
    """
    query = f"""
    WITH product_pairs AS (
        SELECT 
            op1.product_id as product1_id,
            op2.product_id as product2_id,
            COUNT(DISTINCT op1.order_id) as co_occurrence_count
        FROM workspace.instacart.order_products_prior op1
        JOIN workspace.instacart.order_products_prior op2 
            ON op1.order_id = op2.order_id 
            AND op1.product_id < op2.product_id
        GROUP BY op1.product_id, op2.product_id
        HAVING co_occurrence_count > 50
    ),
    product_support AS (
        SELECT 
            product_id,
            COUNT(DISTINCT order_id) as support_count
        FROM workspace.instacart.order_products_prior
        GROUP BY product_id
    )
    SELECT 
        p1.product_name as product_1,
        p2.product_name as product_2,
        pp.co_occurrence_count,
        ps1.support_count as product1_count,
        ps2.support_count as product2_count,
        ROUND(pp.co_occurrence_count * 1.0 / ps1.support_count, 4) as confidence_1_to_2,
        ROUND(pp.co_occurrence_count * 1.0 / ps2.support_count, 4) as confidence_2_to_1,
        ROUND(pp.co_occurrence_count * 1.0 * 
            (SELECT COUNT(DISTINCT order_id) FROM workspace.instacart.order_products_prior) / 
            (ps1.support_count * ps2.support_count), 4) as lift
    FROM product_pairs pp
    JOIN workspace.instacart.products p1 ON pp.product1_id = p1.product_id
    JOIN workspace.instacart.products p2 ON pp.product2_id = p2.product_id
    JOIN product_support ps1 ON pp.product1_id = ps1.product_id
    JOIN product_support ps2 ON pp.product2_id = ps2.product_id
    WHERE p1.product_name LIKE '%{product_name_pattern}%'
       OR p2.product_name LIKE '%{product_name_pattern}%'
    ORDER BY lift DESC
    LIMIT 20
    """
    
    return spark.sql(query)

# 5. Department-Level Association Analysis
def department_association_analysis():
    """
    Analyze associations at department level for broader patterns
    """
    query = """
    WITH dept_baskets AS (
        SELECT 
            o.order_id,
            d.department,
            COUNT(DISTINCT op.product_id) as items_from_dept
        FROM workspace.instacart.orders o
        JOIN workspace.instacart.order_products_prior op ON o.order_id = op.order_id
        JOIN workspace.instacart.products p ON op.product_id = p.product_id
        JOIN workspace.instacart.departments d ON p.department_id = d.department_id
        WHERE o.eval_set = 'prior'
        GROUP BY o.order_id, d.department
    ),
    dept_pairs AS (
        SELECT 
            db1.department as dept1,
            db2.department as dept2,
            COUNT(DISTINCT db1.order_id) as co_occurrence,
            COUNT(DISTINCT db1.order_id) * 1.0 / 
                (SELECT COUNT(DISTINCT order_id) FROM dept_baskets) as support
        FROM dept_baskets db1
        JOIN dept_baskets db2 ON db1.order_id = db2.order_id
        WHERE db1.department < db2.department
        GROUP BY db1.department, db2.department
        HAVING co_occurrence > 1000
    )
    SELECT 
        dept1,
        dept2,
        co_occurrence,
        ROUND(support * 100, 2) as support_pct,
        ROUND(co_occurrence * 1.0 / 
            (SELECT COUNT(DISTINCT order_id) FROM dept_baskets WHERE department = dept1), 4) as confidence_dept1_to_dept2
    FROM dept_pairs
    ORDER BY co_occurrence DESC
    LIMIT 20
    """
    
    return spark.sql(query)
    

In [0]:
# ============================================
# PART 4: RECOMMENDATION SYSTEM
# ============================================

def create_product_recommendations(user_id, n_recommendations=10):
    """
    Create personalized recommendations for a specific user
    based on their purchase history and association rules
    """
    
    # Get user's purchase history
    user_history_query = f"""
    SELECT DISTINCT p.product_id, p.product_name, COUNT(*) as purchase_count
    FROM workspace.instacart.orders o
    JOIN workspace.instacart.order_products_prior op ON o.order_id = op.order_id
    JOIN workspace.instacart.products p ON op.product_id = p.product_id
    WHERE o.user_id = {user_id}
    GROUP BY p.product_id, p.product_name
    ORDER BY purchase_count DESC
    """
    
    # Get frequently associated products
    recommendations_query = f"""
    WITH user_products AS (
        SELECT DISTINCT op.product_id
        FROM workspace.instacart.orders o
        JOIN workspace.instacart.order_products_prior op ON o.order_id = op.order_id
        WHERE o.user_id = {user_id}
    ),
    associated_products AS (
        SELECT 
            op2.product_id as recommended_product,
            COUNT(DISTINCT op1.order_id) as association_strength
        FROM workspace.instacart.order_products_prior op1
        JOIN workspace.instacart.order_products_prior op2 
            ON op1.order_id = op2.order_id
        WHERE op1.product_id IN (SELECT product_id FROM user_products)
          AND op2.product_id NOT IN (SELECT product_id FROM user_products)
        GROUP BY op2.product_id
    )
    SELECT 
        p.product_name,
        p.product_id,
        a.aisle,
        d.department,
        ap.association_strength,
        RANK() OVER (ORDER BY ap.association_strength DESC) as recommendation_rank
    FROM associated_products ap
    JOIN workspace.instacart.products p ON ap.recommended_product = p.product_id
    JOIN workspace.instacart.aisles a ON p.aisle_id = a.aisle_id
    JOIN workspace.instacart.departments d ON p.department_id = d.department_id
    ORDER BY association_strength DESC
    LIMIT {n_recommendations}
    """
    
    user_history = spark.sql(user_history_query)
    recommendations = spark.sql(recommendations_query)
    
    return user_history, recommendations

In [0]:
# ============================================
# PART 5: EVALUATION METRICS
# ============================================

def calculate_recommendation_metrics():
    """
    Calculate metrics to evaluate recommendation quality
    using the train set as ground truth
    """
    
    metrics_query = """
    WITH train_baskets AS (
        SELECT 
            o.user_id,
            COLLECT_SET(op.product_id) as actual_products
        FROM workspace.instacart.orders o
        JOIN workspace.instacart.order_products_train op ON o.order_id = op.order_id
        GROUP BY o.user_id
    ),
    prior_frequent AS (
        SELECT 
            o.user_id,
            op.product_id,
            COUNT(*) as purchase_frequency,
            ROW_NUMBER() OVER (PARTITION BY o.user_id ORDER BY COUNT(*) DESC) as rank
        FROM workspace.instacart.orders o
        JOIN workspace.instacart.order_products_prior op ON o.order_id = op.order_id
        GROUP BY o.user_id, op.product_id
    ),
    recommendations AS (
        SELECT 
            user_id,
            COLLECT_LIST(product_id) as recommended_products
        FROM prior_frequent
        WHERE rank <= 10
        GROUP BY user_id
    )
    SELECT 
        COUNT(*) as total_users,
        AVG(SIZE(array_intersect(t.actual_products, r.recommended_products))) as avg_correct_predictions,
        AVG(SIZE(array_intersect(t.actual_products, r.recommended_products)) / SIZE(r.recommended_products)) as avg_precision,
        AVG(SIZE(array_intersect(t.actual_products, r.recommended_products)) / SIZE(t.actual_products)) as avg_recall
    FROM train_baskets t
    JOIN recommendations r ON t.user_id = r.user_id
    """
    
    return spark.sql(metrics_query)


In [0]:
# ============================================
# PART 6: MAIN EXECUTION
# ============================================

def main():
    """
    Main execution function - run this to perform complete analysis
    """
    
    print("1. Preparing transaction data...")
    transactions = prepare_transactions_for_fpgrowth()
    print(f"   Total transactions: {transactions.count()}")
    
    print("\n2. Running FP-Growth algorithm...")
    freq_items, rules, model = run_fpgrowth_analysis(transactions)
    print(f"   Frequent itemsets found: {freq_items.count()}")
    print(f"   Association rules found: {rules.count()}")
    
    print("\n3. Top association rules by lift:")
    rules.orderBy(col("lift").desc()).show(20, truncate=False)
    
    print("\n4. Department-level associations:")
    dept_associations = department_association_analysis()
    dept_associations.show(20, truncate=False)
    
    print("\n5. Sample product associations (for 'Banana'):")
    banana_associations = find_product_associations('Banana')
    banana_associations.show(20, truncate=False)
    
    print("\n6. Calculating recommendation metrics:")
    metrics = calculate_recommendation_metrics()
    metrics.show()
    
    return freq_items, rules, model

# Execute the analysis
# Uncomment the line below to run the complete analysis
# frequent_itemsets, association_rules, fpgrowth_model = main()

# ============================================
# PART 7: VISUALIZATIONS (Using Databricks Display)
# ============================================

# After running the analysis, use these for visualization:

# Create the top_associations view in Python using spark.sql()
spark.sql("""
CREATE OR REPLACE TEMP VIEW top_associations AS
WITH product_pairs AS (
    SELECT 
        op1.product_id as product1_id,
        op2.product_id as product2_id,
        COUNT(DISTINCT op1.order_id) as co_occurrence
    FROM workspace.instacart.order_products_prior op1
    JOIN workspace.instacart.order_products_prior op2 
        ON op1.order_id = op2.order_id 
        AND op1.product_id < op2.product_id
    GROUP BY op1.product_id, op2.product_id
    HAVING co_occurrence > 100
)
SELECT 
    CONCAT(p1.product_name, ' → ', p2.product_name) as product_pair,
    pp.co_occurrence,
    ROUND(pp.co_occurrence * 1.0 / 
        (SELECT COUNT(DISTINCT order_id) FROM workspace.instacart.order_products_prior), 4) as support
FROM product_pairs pp
JOIN workspace.instacart.products p1 ON pp.product1_id = p1.product_id
JOIN workspace.instacart.products p2 ON pp.product2_id = p2.product_id
ORDER BY co_occurrence DESC
LIMIT 30
""")

# Display the results
display(spark.table("top_associations"))


In [0]:
# ============================================
# PART 8: EXPORT RESULTS
# ============================================

def export_association_rules(rules_df, output_path="/tmp/association_rules"):
    """
    Export association rules to CSV for external analysis
    """
    # Convert to Pandas for easier manipulation
    rules_pd = rules_df.toPandas()
    
    # Save to CSV
    rules_pd.to_csv(f"{output_path}.csv", index=False)
    print(f"Association rules exported to {output_path}.csv")
    
    # Also save as Delta table for future use
    rules_df.write.mode("overwrite").saveAsTable("workspace.instacart.association_rules")
    print("Association rules saved as Delta table: workspace.instacart.association_rules")
    
    return rules_pd