<a href="https://colab.research.google.com/github/gauravraidata/IITJ-projects/blob/main/GPU_ASS_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [40]:
import argparse, time, os, math, sys
from typing import Dict, List, Optional
import numpy as np
import pandas as pd
from scipy import sparse
import matplotlib.pyplot as plt

import cupy as cp
import cupyx.scipy.sparse as cpx_sparse

In [41]:
def parse_transaction_data(file_path: str, region: str = None):
    """Parse retail transaction data from CSV file"""
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File does not exist: {file_path}")

    transactions_df = pd.read_csv(file_path, encoding="ISO-8859-1")
    mandatory_fields = ["InvoiceNo", "StockCode", "Description", "Quantity", "UnitPrice"]
    for field in mandatory_fields:
        if field not in transactions_df.columns:
            raise RuntimeError(f"Missing mandatory field '{field}' in dataset")

    # Data cleaning and filtering pipeline
    transactions_df = transactions_df.dropna(subset=mandatory_fields)
    transactions_df = transactions_df[~transactions_df["InvoiceNo"].astype(str).str.startswith("C")]
    transactions_df = transactions_df[transactions_df["Quantity"] > 0]
    transactions_df = transactions_df[transactions_df["UnitPrice"] > 0.0]
    if region and "Country" in transactions_df.columns:
        transactions_df = transactions_df[transactions_df["Country"] == region]

    # Product encoding
    transactions_df["StockCode"] = transactions_df["StockCode"].astype(str)
    distinct_products = pd.Series(transactions_df["StockCode"].unique())
    product_mapping = {prod: position for position, prod in enumerate(distinct_products)}
    transactions_df["product_id"] = transactions_df["StockCode"].map(product_mapping)

    # Invoice/session encoding
    transactions_df["InvoiceNo"] = transactions_df["InvoiceNo"].astype(str)
    distinct_invoices = pd.Series(transactions_df["InvoiceNo"].unique())
    invoice_mapping = {invoice: position for position, invoice in enumerate(distinct_invoices)}
    transactions_df["basket_id"] = transactions_df["InvoiceNo"].map(invoice_mapping)

    # Remove duplicate product-basket pairs
    unique_pairs = transactions_df.drop_duplicates(subset=["basket_id", "product_id"])

    # Build sparse matrix representation
    row_indices = unique_pairs["product_id"].to_numpy(dtype=np.int32)
    col_indices = unique_pairs["basket_id"].to_numpy(dtype=np.int32)
    values = (np.ones_like(row_indices, dtype=np.int8)).astype(np.int8)

    total_products = len(distinct_products)
    total_baskets = len(distinct_invoices)

    product_basket_matrix = sparse.csr_matrix((values, (row_indices, col_indices)),
                                              shape=(total_products, total_baskets),
                                              dtype=np.int32)

    # Extract product metadata
    product_metadata: Dict[int, Dict] = {}
    for product_idx, group_data in transactions_df.groupby("product_id"):
        desc_value = group_data["Description"].mode().iloc[0] if "Description" in group_data else ""
        product_metadata[int(product_idx)] = {
            "stock_code": group_data["StockCode"].iloc[0],
            "description": str(desc_value)
        }

    return product_basket_matrix, product_metadata, total_baskets

In [42]:
def convert_scipy_to_cupy_sparse(scipy_matrix: sparse.csr_matrix):
    """Transfer sparse matrix from CPU (SciPy) to GPU (CuPy)"""
    coo_format = scipy_matrix.tocoo()
    gpu_data = cp.asarray(coo_format.data, dtype=cp.float32)
    gpu_rows = cp.asarray(coo_format.row)
    gpu_cols = cp.asarray(coo_format.col)
    gpu_coo_matrix = cpx_sparse.coo_matrix((gpu_data, (gpu_rows, gpu_cols)), shape=scipy_matrix.shape)
    return gpu_coo_matrix.tocsr()

In [43]:

def compute_similarities_gpu_batched(product_matrix_gpu, top_k: int, batch_count: int = 64):
    """Compute top-K similar products using GPU acceleration with batching"""
    total_products = int(product_matrix_gpu.shape[0])

    # Calculate product frequency (diagonal elements)
    diagonal_values_gpu = product_matrix_gpu.multiply(product_matrix_gpu).sum(axis=1).ravel().astype(cp.int32)
    diagonal_values_cpu = cp.asnumpy(diagonal_values_gpu)

    top_indices_collection = [None] * total_products
    top_scores_collection = [None] * total_products

    computation_start = time.time()
    sqrt_diagonal_gpu = cp.sqrt(cp.maximum(diagonal_values_gpu.astype(cp.float32), 1.0))
    product_indices = np.arange(total_products, dtype=np.int32)

    for batch_start in range(0, total_products, batch_count):
        batch_end = min(batch_start + batch_count, total_products)
        current_batch = product_indices[batch_start:batch_end]
        batch_length = len(current_batch)

        # Extract batch rows and compute cooccurrence matrix
        batch_matrix = product_matrix_gpu[current_batch, :]
        cooccurrence_batch = batch_matrix.dot(product_matrix_gpu.T)
        cooccurrence_dense = cooccurrence_batch.toarray().astype(cp.float32)

        denominator_matrix = sqrt_diagonal_gpu[cp.asarray(current_batch, dtype=cp.int32)].astype(cp.float32)[:, None] * sqrt_diagonal_gpu[None, :]

        similarity_batch = cp.where(denominator_matrix > 0.0,
                                    cooccurrence_dense / denominator_matrix,
                                    0.0).astype(cp.float32)

        # Zero out self-similarities
        similarity_batch[cp.arange(batch_length, dtype=cp.int32),
                        cp.asarray(current_batch, dtype=cp.int32)] = 0.0

        # Extract top-K per row
        if top_k >= total_products:
            top_k_indices = cp.argsort(-similarity_batch, axis=1)[:, :top_k]
            top_k_scores = cp.take_along_axis(similarity_batch, top_k_indices, axis=1)
        else:
            partition_indices = cp.argpartition(-similarity_batch, top_k, axis=1)[:, :top_k]
            partition_scores = cp.take_along_axis(similarity_batch, partition_indices, axis=1)
            sort_order = cp.argsort(-partition_scores, axis=1)
            top_k_indices = cp.take_along_axis(partition_indices, sort_order, axis=1)
            top_k_scores = cp.take_along_axis(partition_scores, sort_order, axis=1)

        # Transfer results to CPU
        top_k_indices_cpu = cp.asnumpy(top_k_indices.astype(cp.int32))
        top_k_scores_cpu = cp.asnumpy(top_k_scores.astype(cp.float32))

        for local_index in range(batch_length):
            global_product = int(current_batch[local_index])
            top_indices_collection[global_product] = top_k_indices_cpu[local_index].astype(np.int32)
            top_scores_collection[global_product] = top_k_scores_cpu[local_index].astype(np.float32)

        elapsed_time = time.time() - computation_start
        print(f"[GPU processing] batch {batch_start}-{batch_end-1} complete (size={batch_length}), time={elapsed_time:.2f}s")

    total_gpu_time = time.time() - computation_start
    return top_indices_collection, top_scores_collection, diagonal_values_cpu, total_gpu_time

In [44]:
def compute_similarities_cpu_baseline(product_matrix_scipy, top_k: int):
    """Baseline CPU implementation for similarity computation"""
    total_products = product_matrix_scipy.shape[0]
    diagonal_values = product_matrix_scipy.multiply(product_matrix_scipy).sum(axis=1).A1.astype(np.int32)
    top_indices_collection = [None] * total_products
    top_scores_collection = [None] * total_products

    computation_start = time.time()
    for product_id in range(total_products):
        product_vector = product_matrix_scipy.getrow(product_id)
        cooccurrence_vector = product_vector.dot(product_matrix_scipy.T)
        cooccurrence_array = cooccurrence_vector.toarray().ravel().astype(np.float32)

        denominator = math.sqrt(diagonal_values[product_id]) * np.sqrt(np.maximum(diagonal_values, 1.0))
        with np.errstate(divide='ignore', invalid='ignore'):
            similarity_vector = np.where(denominator > 0,
                                        cooccurrence_array / denominator,
                                        0.0).astype(np.float32)
        similarity_vector[product_id] = 0.0

        if top_k >= similarity_vector.size:
            sorted_indices = np.argsort(-similarity_vector)[:top_k]
        else:
            partition = np.argpartition(-similarity_vector, top_k)[:top_k]
            sorted_indices = partition[np.argsort(-similarity_vector[partition])]

        top_indices_collection[product_id] = sorted_indices.astype(np.int32)
        top_scores_collection[product_id] = similarity_vector[sorted_indices].astype(np.float32)

        if (product_id + 1) % 500 == 0 or (product_id + 1) == total_products:
            elapsed_time = time.time() - computation_start
            print(f"[CPU processing] {product_id+1}/{total_products} products done, time={elapsed_time:.2f}s")

    total_cpu_time = time.time() - computation_start
    return top_indices_collection, top_scores_collection, diagonal_values, total_cpu_time

In [45]:

def generate_recommendation_report(reference_product: int, top_indices_list: List[np.ndarray],
                                  top_scores_list: List[np.ndarray],
                                  product_matrix_scipy: sparse.csr_matrix,
                                  diagonal_values: np.ndarray,
                                  product_metadata: Dict[int, Dict],
                                  total_baskets: int) -> pd.DataFrame:
    """Generate detailed recommendation report for a reference product"""
    cooccurrence_vector = product_matrix_scipy.getrow(reference_product).dot(product_matrix_scipy.T)
    cooccurrence_array = cooccurrence_vector.toarray().ravel().astype(np.int32)
    denominator = math.sqrt(diagonal_values[reference_product]) * np.sqrt(np.maximum(diagonal_values, 1.0))
    with np.errstate(divide='ignore', invalid='ignore'):
        similarity_exact = np.where(denominator > 0.0,
                                    cooccurrence_array.astype(np.float32) / denominator,
                                    0.0).astype(np.float32)
    similarity_exact[reference_product] = 0.0

    recommended_indices = top_indices_list[reference_product]
    report_rows = []
    for position, related_product in enumerate(recommended_indices, start=1):
        cooccurrence_count = int(cooccurrence_array[int(related_product)])
        similarity_score = float(similarity_exact[int(related_product)])
        lift_value = 0.0
        if diagonal_values[reference_product] > 0 and diagonal_values[int(related_product)] > 0:
            prob_both = cooccurrence_count / float(total_baskets)
            prob_reference = diagonal_values[reference_product] / float(total_baskets)
            prob_related = diagonal_values[int(related_product)] / float(total_baskets)
            lift_value = prob_both / (prob_reference * prob_related) if (prob_reference * prob_related) > 0 else 0.0
        report_rows.append({
            "rank": position,
            "item_index": int(related_product),
            "stock_code": product_metadata[int(related_product)]["stock_code"] if int(related_product) in product_metadata else "",
            "description": product_metadata[int(related_product)]["description"] if int(related_product) in product_metadata else "",
            "cooccurrence": cooccurrence_count,
            "similarity": similarity_score,
            "lift": lift_value,
            "sessions_with_other": int(diagonal_values[int(related_product)])
        })
    return pd.DataFrame(report_rows)

In [46]:
def visualize_performance_comparison(cpu_duration, gpu_transfer_time, gpu_compute_time):
    """Create visualization comparing CPU and GPU performance"""
    gpu_combined = gpu_transfer_time + gpu_compute_time
    category_labels = ["CPU (SciPy) computation", "GPU data transfer", "GPU computation", "GPU combined"]
    duration_values = [cpu_duration, gpu_transfer_time, gpu_compute_time, gpu_combined]
    plt.figure(figsize=(8,4))
    bar_chart = plt.bar(category_labels, duration_values)
    plt.ylabel("Execution Time (seconds)")
    plt.title("Performance Analysis: CPU vs GPU")
    for bar in bar_chart:
        height = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2, height + 0.3,
                f"{height:.2f}s", ha='center', va='bottom', fontsize=9)
    plt.tight_layout()
    plt.show()

In [47]:
def main(argv=None):
    argument_parser = argparse.ArgumentParser()
    argument_parser.add_argument("--csv", type=str, default="OnlineRetail_cached.csv", help="Input CSV file path")
    argument_parser.add_argument("--k", type=int, default=10, help="Number of top recommendations")
    argument_parser.add_argument("--batch_size", type=int, default=64, help="Batch size for GPU processing")
    argument_parser.add_argument("--anchor_by", choices=["most_popular","random","id"], default="most_popular")
    argument_parser.add_argument("--anchor_id", type=int, default=None)
    argument_parser.add_argument("--country", type=str, default=None)
    argument_parser.add_argument("--save_all", action="store_true", help="Export complete recommendation datasets")
    argument_parser.add_argument("--inline_plot", action="store_true", help="Display performance visualization inline")
    config = argument_parser.parse_args(argv)

    input_file = config.csv
    recommendations_count = config.k
    processing_batch_size = config.batch_size

    print("Phase 1: Loading and preprocessing transaction dataset.")
    start_timestamp = time.time()
    product_basket_matrix_cpu, product_metadata, total_baskets = parse_transaction_data(input_file, region=config.country)
    end_timestamp = time.time()
    print(f" Dataset loaded: products={product_basket_matrix_cpu.shape[0]}, baskets={product_basket_matrix_cpu.shape[1]}, duration={end_timestamp-start_timestamp:.2f}s")

    print("\nPhase 2: Transferring sparse matrix to GPU memory.")
    start_timestamp = time.time()
    product_basket_matrix_gpu = convert_scipy_to_cupy_sparse(product_basket_matrix_cpu)
    cp.cuda.Stream.null.synchronize()
    transfer_duration = time.time() - start_timestamp
    print(f" Transfer completed: {transfer_duration:.2f}s")

    print("\nPhase 3: Computing recommendations using GPU (batched processing).")
    start_timestamp = time.time()
    gpu_top_indices, gpu_top_scores, gpu_diagonal, gpu_compute_duration = compute_similarities_gpu_batched(
        product_basket_matrix_gpu, top_k=recommendations_count, batch_count=processing_batch_size)
    gpu_total_duration = transfer_duration + gpu_compute_duration
    print(f" GPU execution (transfer+compute): {gpu_total_duration:.2f}s (compute only: {gpu_compute_duration:.2f}s)")

    print("\nPhase 4: Computing recommendations using CPU (baseline reference).")
    cpu_top_indices, cpu_top_scores, cpu_diagonal, cpu_duration = compute_similarities_cpu_baseline(
        product_basket_matrix_cpu, top_k=recommendations_count)
    print(f" CPU execution duration: {cpu_duration:.2f}s")

    # Select reference product for demonstration
    if config.anchor_by == "most_popular":
        reference_product = int(np.argmax(cpu_diagonal))
    elif config.anchor_by == "random":
        reference_product = int(np.random.default_rng(1).integers(0, product_basket_matrix_cpu.shape[0]))
    else:
        if config.anchor_id is None:
            raise ValueError("Must specify anchor_id when using anchor_by='id'")
        reference_product = config.anchor_id

    reference_stock_code = product_metadata[reference_product]['stock_code']
    reference_description = product_metadata[reference_product]['description']

    print(f"\nReference product: index={reference_product}; "
          f"stock_code={reference_stock_code}; "
          f"description={reference_description}; "
          f"basket_occurrences={int(cpu_diagonal[reference_product])}")

    # Generate recommendation reports
    gpu_recommendations = generate_recommendation_report(reference_product, gpu_top_indices, gpu_top_scores,
                                                        product_basket_matrix_cpu, gpu_diagonal,
                                                        product_metadata, total_baskets)
    cpu_recommendations = generate_recommendation_report(reference_product, cpu_top_indices, cpu_top_scores,
                                                        product_basket_matrix_cpu, cpu_diagonal,
                                                        product_metadata, total_baskets)

    print("\nRecommendations (GPU-generated):")
    print(gpu_recommendations.to_string(index=False))

    print("\nRecommendations (CPU-generated):")
    print(cpu_recommendations.to_string(index=False))

    # Display sample recommendations for top products
    try:
        top_two_products = list(np.argsort(-cpu_diagonal)[:2])
        print(f"\n\n=== Sample recommendations for top 2 products (indices): {top_two_products} ===")
        for sample_product in top_two_products:
            sample_stock = product_metadata[sample_product]['stock_code']
            sample_desc = product_metadata[sample_product]['description']
            print(f"\nSample Product {sample_product} (stock_code={sample_stock}, description={sample_desc}, baskets={int(cpu_diagonal[sample_product])})")
            sample_gpu_recs = generate_recommendation_report(sample_product, gpu_top_indices, gpu_top_scores,
                                                            product_basket_matrix_cpu, gpu_diagonal,
                                                            product_metadata, total_baskets)
            sample_cpu_recs = generate_recommendation_report(sample_product, cpu_top_indices, cpu_top_scores,
                                                            product_basket_matrix_cpu, cpu_diagonal,
                                                            product_metadata, total_baskets)
            print("\nGPU recommendations (top 5):")
            print(sample_gpu_recs.head(5).to_string(index=False))
            print("\nCPU recommendations (top 5):")
            print(sample_cpu_recs.head(5).to_string(index=False))
    except Exception as error:
        print("Unable to generate sample recommendations:", error)

    if config.save_all:
        print("\nExporting complete recommendation datasets. This may take time for large catalogs.")
        # CPU results export
        all_cpu_rows = []
        for prod_idx in range(len(cpu_top_indices)):
            indices = cpu_top_indices[prod_idx]
            scores = cpu_top_scores[prod_idx]
            for rank_pos, (related_idx, score_val) in enumerate(zip(indices, scores), start=1):
                all_cpu_rows.append({
                    "item_index": prod_idx,
                    "stock_code_item": product_metadata[prod_idx]["stock_code"],
                    "item_description": product_metadata[prod_idx]["description"],
                    "rank": rank_pos,
                    "other_index": int(related_idx),
                    "other_stock_code": product_metadata[int(related_idx)]["stock_code"],
                    "other_description": product_metadata[int(related_idx)]["description"],
                    "similarity": float(score_val),
                })
        pd.DataFrame(all_cpu_rows).to_csv("recommendations_complete_cpu.csv", index=False)
        # GPU results export
        all_gpu_rows = []
        for prod_idx in range(len(gpu_top_indices)):
            indices = gpu_top_indices[prod_idx]
            scores = gpu_top_scores[prod_idx]
            for rank_pos, (related_idx, score_val) in enumerate(zip(indices, scores), start=1):
                all_gpu_rows.append({
                    "item_index": prod_idx,
                    "stock_code_item": product_metadata[prod_idx]["stock_code"],
                    "item_description": product_metadata[prod_idx]["description"],
                    "rank": rank_pos,
                    "other_index": int(related_idx),
                    "other_stock_code": product_metadata[int(related_idx)]["stock_code"],
                    "other_description": product_metadata[int(related_idx)]["description"],
                    "similarity": float(score_val),
                })
        pd.DataFrame(all_gpu_rows).to_csv("recommendations_complete_gpu.csv", index=False)
        print("Exported recommendations_complete_cpu.csv and recommendations_complete_gpu.csv")

    # Performance summary
    print("\nPERFORMANCE SUMMARY:")
    summary_data = pd.DataFrame([
        {"computation_method": "CPU (SciPy) row-wise", "execution_time_sec": round(cpu_duration, 3)},
        {"computation_method": "GPU (CuPy) data transfer", "execution_time_sec": round(transfer_duration, 3)},
        {"computation_method": "GPU (CuPy) computation", "execution_time_sec": round(gpu_compute_duration, 3)},
        {"computation_method": "GPU (CuPy) combined", "execution_time_sec": round(gpu_total_duration, 3)}
    ])
    speedup_factor = (cpu_duration / gpu_total_duration) if (gpu_total_duration > 0) else float('nan')
    summary_display = summary_data.copy()
    if not np.isnan(speedup_factor):
        summary_display.loc[len(summary_display)] = {
            "computation_method": "Speedup (CPU/GPU)",
            "execution_time_sec": round(speedup_factor, 3)
        }
    print(summary_display.to_string(index=False))

    # Visualization
    if config.inline_plot:
        try:
            visualize_performance_comparison(cpu_duration, transfer_duration, gpu_compute_duration)
        except Exception as error:
            print("Warning: inline visualization failed:", error)
    else:
        try:
            from matplotlib import pyplot as plot_module
            gpu_combined = transfer_duration + gpu_compute_duration
            plot_data = pd.DataFrame([
                {"computation_method": "CPU (SciPy) row-wise", "execution_time_sec": cpu_duration},
                {"computation_method": "GPU (CuPy) data transfer", "execution_time_sec": transfer_duration},
                {"computation_method": "GPU (CuPy) computation", "execution_time_sec": gpu_compute_duration},
                {"computation_method": "GPU (CuPy) combined", "execution_time_sec": gpu_combined}
            ])
            plot_module.figure(figsize=(8, 4))
            bar_plot = plot_module.bar(plot_data["computation_method"], plot_data["execution_time_sec"])
            plot_module.ylabel("Execution Time (seconds)")
            plot_module.title("Performance Comparison: CPU vs GPU")
            for bar in bar_plot:
                height = bar.get_height()
                plot_module.text(bar.get_x() + bar.get_width()/2, height + 0.5,
                               f"{height:.2f}s", ha='center', va='bottom', fontsize=9)
            plot_module.tight_layout()
            plot_module.savefig("performance_analysis.png", dpi=150)
            plot_module.close()
            print("Performance visualization saved: performance_analysis.png")
        except Exception as error:
            print("Warning: visualization generation failed:", error)
    print("\nExecution completed successfully.")

if __name__ == '__main__':
    if hasattr(sys, 'argv') and len(sys.argv) > 0 and \
       (sys.argv[0].endswith('ipykernel_launcher.py') or sys.argv[0].endswith('colab_kernel_launcher.py')):
        main(argv=[])
    else:
        main(argv=sys.argv[1:])

Phase 1: Loading and preprocessing transaction dataset.
 Dataset loaded: products=3922, baskets=19960, duration=2.14s

Phase 2: Transferring sparse matrix to GPU memory.
 Transfer completed: 0.03s

Phase 3: Computing recommendations using GPU (batched processing).
[GPU processing] batch 0-63 complete (size=64), time=0.02s
[GPU processing] batch 64-127 complete (size=64), time=0.04s
[GPU processing] batch 128-191 complete (size=64), time=0.06s
[GPU processing] batch 192-255 complete (size=64), time=0.08s
[GPU processing] batch 256-319 complete (size=64), time=0.12s
[GPU processing] batch 320-383 complete (size=64), time=0.14s
[GPU processing] batch 384-447 complete (size=64), time=0.16s
[GPU processing] batch 448-511 complete (size=64), time=0.17s
[GPU processing] batch 512-575 complete (size=64), time=0.23s
[GPU processing] batch 576-639 complete (size=64), time=0.27s
[GPU processing] batch 640-703 complete (size=64), time=0.32s
[GPU processing] batch 704-767 complete (size=64), time=0