In [None]:
# PySpark version - no additional pip installs needed if Spark is configured
# Spark should be available via docker-compose or local installation

Note: you may need to restart the kernel to use updated packages.


In [None]:
# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, explode, split, regexp_replace, lower, trim,
    collect_list, array, array_join, udf, when, isnan, isnull,
    sum as spark_sum, count, avg, max as spark_max, min as spark_min,
    row_number, lit, array_contains, size, regexp_extract
)
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, DoubleType,
    ArrayType, MapType, FloatType
)
from pyspark.ml.feature import (
    Tokenizer, StopWordsRemover, CountVectorizer, IDF, 
    VectorAssembler, Normalizer, StringIndexer
)
from pyspark.ml.recommendation import ALS
from pyspark.ml.fpm import FPGrowth
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors, SparseVector, DenseVector
from pyspark.sql.window import Window
import json
from collections import defaultdict
from typing import Dict, List, Tuple, Optional


  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# -------------------------
# Initialize Spark Session
# -------------------------
spark = SparkSession.builder \
    .appName("MusicRecommendationSystem") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")
print("✓ Spark session initialized")

# -------------------------
# 1. Load data functions (PySpark)
# -------------------------
def load_interactions_spark(spark: SparkSession, path: str, hdfs_path: Optional[str] = None):
    """
    Load tab-delimited (user, track_id, playcount) from HDFS or local path.
    If hdfs_path is provided, reads from HDFS, otherwise reads from local path.
    """
    if hdfs_path:
        full_path = f"hdfs://namenode:9000{hdfs_path}"
        print(f"Loading interactions from HDFS: {full_path}")
    else:
        full_path = path
        print(f"Loading interactions from local: {full_path}")
    
    schema = StructType([
        StructField("user", StringType(), True),
        StructField("track_id", StringType(), True),
        StructField("playcount", DoubleType(), True)
    ])
    
    df = spark.read \
        .option("sep", "\t") \
        .option("header", "false") \
        .schema(schema) \
        .csv(full_path)
    
    # Filter out null values
    df = df.filter(col("user").isNotNull() & col("track_id").isNotNull() & col("playcount").isNotNull())
    
    print(f"✓ Loaded {df.count():,} interactions")
    print(f"  - Unique users: {df.select('user').distinct().count():,}")
    print(f"  - Unique tracks: {df.select('track_id').distinct().count():,}")
    
    return df

def load_tracks_from_hdfs(spark: SparkSession, hdfs_path: str = "/data/lastfm_data.csv"):
    """
    Load track metadata from HDFS CSV file (created from JSON files).
    Returns Spark DataFrame with normalized track metadata.
    """
    print(f"\n{'='*60}")
    print(f"[STEP 1] Loading tracks metadata from HDFS...")
    print(f"{'='*60}")
    
    full_path = f"hdfs://namenode:9000{hdfs_path}"
    print(f"Reading from: {full_path}")
    
    # Read CSV
    df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv(full_path)
    
    print(f"✓ Tracks CSV loaded successfully")
    print(f"  - Total records: {df.count():,}")
    print(f"  - Columns: {df.columns}")
    
    # Parse tags and similars from string representations
    # Tags: convert string representation to array
    def parse_tags_udf(tags_str):
        """Parse tags from string representation"""
        if not tags_str or tags_str == '':
            return []
        try:
            parsed = eval(tags_str) if isinstance(tags_str, str) else tags_str
            if isinstance(parsed, list):
                return [str(t[0]) if isinstance(t, (list, tuple)) and len(t) > 0 else str(t) 
                       for t in parsed if t]
            return []
        except:
            return []
    
    parse_tags = udf(parse_tags_udf, ArrayType(StringType()))
    
    # Similars: convert to array of tuples (track_id, score)
    def parse_similars_udf(similars_str):
        """Parse similars from string representation"""
        if not similars_str or similars_str == '':
            return []
        try:
            parsed = eval(similars_str) if isinstance(similars_str, str) else similars_str
            if isinstance(parsed, list):
                result = []
                for s in parsed:
                    if isinstance(s, (list, tuple)) and len(s) >= 2:
                        result.append((str(s[0]), float(s[1])))
                return result
            return []
        except:
            return []
    
    parse_similars = udf(parse_similars_udf, ArrayType(StructType([
        StructField("track_id", StringType()),
        StructField("score", DoubleType())
    ])))
    
    # Apply parsing
    df = df.withColumn("tag_list", parse_tags(col("tags"))) \
           .withColumn("similars_parsed", parse_similars(col("similars")))
    
    # Fill nulls
    df = df.fillna("", subset=["artist", "title"]) \
           .fillna([], subset=["tag_list", "similars_parsed"])
    
    # Remove duplicates
    df = df.dropDuplicates(["track_id"])
    
    print(f"✓ Tags and similars processed")
    print(f"  - Final records: {df.count():,}")
    print(f"✓ Track metadata loading complete!\n")
    
    return df


In [None]:
# -------------------------
# Build track dictionary (PySpark version)
# -------------------------
def build_track_dict_spark(tracks_df):
    """
    Build track metadata dictionary from Spark DataFrame.
    Returns a broadcast dictionary for efficient lookups.
    """
    print(f"\n{'='*60}")
    print(f"[STEP 2] Building track metadata dictionary...")
    print(f"{'='*60}")
    
    # Collect track metadata to driver (for small-medium datasets)
    # For very large datasets, consider using broadcast joins instead
    tracks_pd = tracks_df.select(
        "track_id", "artist", "title", "tag_list", "similars_parsed"
    ).toPandas()
    
    track_dict = {}
    invalid_count = 0
    
    for _, row in tracks_pd.iterrows():
        tid = row.get('track_id')
        if not tid or pd.isna(tid):
            invalid_count += 1
            continue
        
        # Convert similars_parsed to list of tuples
        similars = []
        if row.get('similars_parsed'):
            try:
                if isinstance(row['similars_parsed'], list):
                    similars = [(str(s[0]), float(s[1])) if isinstance(s, (list, tuple)) and len(s) >= 2 
                               else (str(s), 0.0) for s in row['similars_parsed']]
            except:
                similars = []
        
        track_dict[str(tid)] = {
            'artist': str(row.get('artist', '')) if not pd.isna(row.get('artist', '')) else '',
            'title': str(row.get('title', '')) if not pd.isna(row.get('title', '')) else '',
            'tags': [str(t) for t in (row.get('tag_list', []) or [])],
            'similars': similars
        }
    
    print(f"✓ Track dict built successfully")
    print(f"  - Valid tracks indexed: {len(track_dict):,}")
    print(f"  - Invalid/missing track_ids skipped: {invalid_count}")
    print(f"✓ Track metadata dictionary complete!\n")
    
    # Broadcast for efficient distributed access
    return spark.sparkContext.broadcast(track_dict)



Found 617295 JSON files to load...
Loaded 10000/617295 files...
Loaded 20000/617295 files...
Loaded 30000/617295 files...
Loaded 40000/617295 files...
Loaded 50000/617295 files...
Loaded 60000/617295 files...
Loaded 70000/617295 files...
Loaded 80000/617295 files...
Loaded 90000/617295 files...
Loaded 100000/617295 files...
Loaded 110000/617295 files...
Loaded 120000/617295 files...
Loaded 130000/617295 files...
Loaded 140000/617295 files...
Loaded 150000/617295 files...
Loaded 160000/617295 files...
Loaded 170000/617295 files...
Loaded 180000/617295 files...
Loaded 190000/617295 files...
Loaded 200000/617295 files...
Loaded 210000/617295 files...
Loaded 220000/617295 files...
Error loading lastfm_train/S/D/F/TRSDFHO12903CA5A09.json: Expecting value: line 1 column 1 (char 0)
Error loading lastfm_train/S/D/F/TRSDFDK128F427A947.json: Expecting value: line 1 column 1 (char 0)
Error loading lastfm_train/S/D/F/TRSDFOV128F4245F72.json: Expecting value: line 1 column 1 (char 0)
Error loading

KeyboardInterrupt: 

In [None]:
# Example: Load tracks from HDFS
# tracks_df = load_tracks_from_hdfs(spark, hdfs_path="/data/lastfm_data.csv")
# Or load from local if testing:
# tracks_df = spark.read.option("header", "true").option("inferSchema", "true").csv("tracks.csv")


In [None]:
# Display sample tracks
# tracks_df.show(5, truncate=False)

Unnamed: 0,artist,timestamp,similars,tags,track_id,title
0,Computer Truck,2011-08-08 18:30:39.698667,[],[],TRRRRIR128F933C8FE,Gamegirl power
1,Lack of Limits,2011-08-02 10:04:50.589618,[],[],TRRRRUH128F14ABD68,30 Summers
2,The Kinks,2011-08-02 05:18:14.083535,"[['TRMLOXQ12903CF06BB', 1], ['TRCOWHF128F93216...","[['classic rock', '100'], ['60s', '76'], ['bri...",TRRRRCH128F9342C72,A Well Respected Man
3,Everclear,2011-08-15 10:19:29.458998,[],[],TRRRRNT128E0782AF2,Misery Whip (Explicit)
4,Paula Abdul,2011-08-04 11:50:10.764331,"[['TRALFWK128F1458532', 1], ['TRRMRMZ128F14585...","[['pop', '100'], ['female vocalists', '66'], [...",TRRRRMR128F145852B,Rock House


In [None]:
"""
PySpark Hybrid Recommender System
Uses Spark MLlib for:
- ALS Collaborative Filtering
- TF-IDF Content-Based Features  
- FP-Growth Association Rules

Data sources:
- HDFS: /data/lastfm_data.csv (track metadata)
- HDFS: /data/train_triplets.txt (user-track interactions)
"""


In [None]:
# ---------------------------
# --- Build user/item mappings with StringIndexer ---
# ---------------------------

def build_mappings_and_user_item_spark(interactions_df, spark: SparkSession):
    """
    Build user/item indexers and prepare interactions DataFrame for ALS.
    Returns indexed interactions DataFrame and indexer models.
    """
    print(f"\n{'='*60}")
    print(f"[STEP 3] Building user/item mappings and interaction matrix...")
    print(f"{'='*60}")
    
    print(f"\n[Processing] Creating user and item indexers...")
    
    # Create StringIndexers for users and items
    user_indexer = StringIndexer(inputCol="user", outputCol="user_idx", handleInvalid="skip")
    item_indexer = StringIndexer(inputCol="track_id", outputCol="item_idx", handleInvalid="skip")
    
    # Fit indexers
    print(f"  - Fitting user indexer...")
    user_indexer_model = user_indexer.fit(interactions_df)
    print(f"  - Fitting item indexer...")
    item_indexer_model = item_indexer.fit(interactions_df)
    
    # Transform interactions
    print(f"  - Transforming interactions with indexers...")
    interactions_indexed = user_indexer_model.transform(interactions_df)
    interactions_indexed = item_indexer_model.transform(interactions_indexed)
    
    # Filter out any null indices
    interactions_indexed = interactions_indexed.filter(
        col("user_idx").isNotNull() & col("item_idx").isNotNull()
    )
    
    # Convert indices to integer
    interactions_indexed = interactions_indexed.withColumn("user_idx", col("user_idx").cast(IntegerType())) \
                                               .withColumn("item_idx", col("item_idx").cast(IntegerType()))
    
    # Get statistics
    total_interactions = interactions_indexed.count()
    unique_users = interactions_indexed.select("user_idx").distinct().count()
    unique_items = interactions_indexed.select("item_idx").distinct().count()
    
    print(f"\n✓ Indexing complete!")
    print(f"  - Unique users: {unique_users:,}")
    print(f"  - Unique items: {unique_items:,}")
    print(f"  - Total interactions: {total_interactions:,}")
    
    # Get mappings
    user_mapping = interactions_indexed.select("user", "user_idx").distinct().collect()
    item_mapping = interactions_indexed.select("track_id", "item_idx").distinct().collect()
    
    user2idx = {row.user: int(row.user_idx) for row in user_mapping}
    item2idx = {row.track_id: int(row.item_idx) for row in item_mapping}
    idx2user = {v: k for k, v in user2idx.items()}
    idx2item = {v: k for k, v in item2idx.items()}
    
    print(f"✓ User/item mapping complete!\n")
    
    return interactions_indexed, user2idx, item2idx, idx2user, idx2item, user_indexer_model, item_indexer_model


In [None]:
# ---------------------------
# --- TF-IDF content vectors (Spark MLlib) ---
# ---------------------------

def build_tfidf_content_matrix_spark(tracks_df, item2idx: Dict[str, int], max_features: int = 50000):
    """
    Build TF-IDF content matrix using Spark MLlib.
    Returns TF-IDF model and transformed features DataFrame.
    """
    print(f"\n{'='*60}")
    print(f"[STEP 4] Building TF-IDF content matrix...")
    print(f"{'='*60}")
    
    print(f"\n[Processing] Constructing text corpus from track metadata...")
    
    # Create text column by combining artist, title, and tags
    def combine_text(artist, title, tags):
        artist_str = str(artist) if artist else ""
        title_str = str(title) if title else ""
        tags_str = " ".join([str(t) for t in (tags or [])])
        return " ".join([artist_str, title_str, tags_str]).strip()
    
    combine_text_udf = udf(combine_text, StringType())
    
    # Join tracks with item indices
    tracks_with_idx = tracks_df.withColumn(
        "text", combine_text_udf(col("artist"), col("title"), col("tag_list"))
    ).filter(col("text") != "")
    
    # Index tracks by item_idx for alignment
    item_idx_df = spark.createDataFrame(
        [(tid, idx) for tid, idx in item2idx.items()],
        ["track_id", "item_idx"]
    )
    
    tracks_indexed = tracks_with_idx.join(item_idx_df, on="track_id", how="inner")
    
    print(f"  - Tracks with text: {tracks_indexed.count():,}")
    
    # Tokenize
    print(f"\n[Training] Tokenizing and building TF-IDF (max_features={max_features:,})...")
    tokenizer = Tokenizer(inputCol="text", outputCol="words")
    words_df = tokenizer.transform(tracks_indexed)
    
    # CountVectorizer (term frequency)
    cv = CountVectorizer(
        inputCol="words",
        outputCol="raw_features",
        vocabSize=max_features,
        minDF=1.0
    )
    cv_model = cv.fit(words_df)
    cv_df = cv_model.transform(words_df)
    
    print(f"  - Vocabulary size: {len(cv_model.vocabulary):,}")
    
    # IDF
    idf = IDF(inputCol="raw_features", outputCol="tfidf_features")
    idf_model = idf.fit(cv_df)
    tfidf_df = idf_model.transform(cv_df)
    
    # Normalize (L2 normalization)
    normalizer = Normalizer(inputCol="tfidf_features", outputCol="tfidf_normalized", p=2.0)
    tfidf_normalized_df = normalizer.transform(tfidf_df)
    
    print(f"\n✓ TF-IDF matrix built successfully!")
    print(f"  - Tracks processed: {tfidf_normalized_df.count():,}")
    print(f"✓ Content matrix building complete!\n")
    
    return tfidf_normalized_df, cv_model, idf_model, normalizer


In [None]:
# ---------------------------
# --- FP-Growth association rules (Spark MLlib) ---
# ---------------------------

def build_fpgrowth_rules_spark(interactions_indexed_df,
                               min_support: float = 0.001,
                               sample_frac: float = 0.2,
                               min_confidence: float = 0.2):
    """
    Build frequent itemsets & association rules using Spark MLlib FP-Growth.
    """
    print(f"\n{'='*60}")
    print(f"[STEP 5] Building FP-Growth association rules...")
    print(f"{'='*60}")
    print(f"  - min_support: {min_support}")
    print(f"  - sample_frac: {sample_frac}")
    print(f"  - min_confidence: {min_confidence}")
    
    # Sample transactions if needed
    if sample_frac < 1.0:
        print(f"\n[Sampling] Sampling {sample_frac*100:.1f}% of transactions...")
        interactions_sampled = interactions_indexed_df.sample(False, sample_frac, seed=42)
    else:
        interactions_sampled = interactions_indexed_df
    
    # Group by user to create transactions (itemsets)
    print(f"\n[Processing] Grouping items by user to create transactions...")
    user_itemsets = interactions_sampled.groupBy("user_idx") \
        .agg(collect_list("item_idx").alias("items"))
    
    # Convert to array of integers (required by FP-Growth)
    user_itemsets = user_itemsets.select("items")
    
    print(f"  - Transactions created: {user_itemsets.count():,}")
    
    # Run FP-Growth
    print(f"\n[Training] Running FP-Growth algorithm...")
    fpgrowth_model = FPGrowth(
        itemsCol="items",
        minSupport=min_support,
        minConfidence=min_confidence
    )
    
    model = fpgrowth_model.fit(user_itemsets)
    
    # Get frequent itemsets
    freq_itemsets = model.freqItemsets
    print(f"✓ Frequent itemsets generated: {freq_itemsets.count():,}")
    
    # Get association rules
    rules_df = model.associationRules
    print(f"✓ Association rules generated: {rules_df.count():,}")
    
    if rules_df.count() > 0:
        rules_stats = rules_df.agg(
            spark_min("confidence").alias("min_conf"),
            spark_max("confidence").alias("max_conf"),
            spark_min("lift").alias("min_lift"),
            spark_max("lift").alias("max_lift")
        ).collect()[0]
        
        print(f"  - Confidence range: [{rules_stats.min_conf:.4f}, {rules_stats.max_conf:.4f}]")
        print(f"  - Lift range: [{rules_stats.min_lift:.4f}, {rules_stats.max_lift:.4f}]")
    
    print(f"✓ FP-Growth rules generation complete!\n")
    
    return rules_df, freq_itemsets, model


In [None]:
# ---------------------------
# --- ALS training (Spark MLlib) ---
# ---------------------------

def train_als_model_spark(interactions_indexed_df,
                         factors: int = 64,
                         regularization: float = 0.01,
                         iterations: int = 15,
                         implicit_prefs: bool = True,
                         alpha: float = 40.0):
    """
    Train ALS model using Spark MLlib.
    """
    print(f"\n{'='*60}")
    print(f"[STEP 6] Training Spark MLlib ALS Model...")
    print(f"{'='*60}")
    print(f"  - Factors (latent dimensions): {factors}")
    print(f"  - Regularization: {regularization}")
    print(f"  - Iterations: {iterations}")
    print(f"  - Implicit preferences: {implicit_prefs}")
    if implicit_prefs:
        print(f"  - Alpha (confidence scaling): {alpha}")
    
    # Prepare data for ALS (user_idx, item_idx, rating)
    als_data = interactions_indexed_df.select(
        col("user_idx").alias("user"),
        col("item_idx").alias("item"),
        col("playcount").alias("rating")
    )
    
    # If implicit, transform ratings
    if implicit_prefs:
        als_data = als_data.withColumn("rating", (1.0 + alpha * col("rating")).cast(DoubleType()))
    
    print(f"  - Training interactions: {als_data.count():,}")
    
    # Create ALS model
    als = ALS(
        maxIter=iterations,
        regParam=regularization,
        rank=factors,
        implicitPrefs=implicit_prefs,
        userCol="user",
        itemCol="item",
        ratingCol="rating",
        coldStartStrategy="drop",
        nonnegative=True
    )
    
    print(f"\n[Training] Fitting ALS model (this may take a while)...")
    model = als.fit(als_data)
    
    print(f"\n✓ ALS training finished successfully!")
    print(f"✓ ALS model training complete!\n")
    
    return model, als_data


In [None]:
# ---------------------------
# --- Helper functions for similarity calculations ---
# ---------------------------

def compute_item_similarities_spark(interactions_indexed_df, item2idx: Dict[str, int]):
    """
    Precompute item-item similarities using co-occurrence and cosine similarity.
    Returns Spark DataFrame with item similarities.
    """
    print(f"\n[STEP 7] Computing item-item similarities...")
    
    # Co-occurrence: items that appear together in user playlists
    user_items = interactions_indexed_df.select("user_idx", "item_idx").distinct()
    
    # Self-join to find co-occurrences
    item1 = user_items.alias("item1")
    item2 = user_items.alias("item2")
    
    cooccurrences = item1.join(
        item2,
        (col("item1.user_idx") == col("item2.user_idx")) & 
        (col("item1.item_idx") != col("item2.item_idx")),
        "inner"
    ).select(
        col("item1.item_idx").alias("item_idx_1"),
        col("item2.item_idx").alias("item_idx_2")
    ).groupBy("item_idx_1", "item_idx_2").count().withColumnRenamed("count", "cooccurrence_count")
    
    print(f"✓ Co-occurrence matrix computed: {cooccurrences.count():,} pairs")
    
    return cooccurrences


In [None]:
# ---------------------------
# --- Hybrid Recommender Class (PySpark) ---
# ---------------------------

class HybridRecommenderSpark:
    """
    Hybrid recommender system using PySpark.
    Combines ALS collaborative filtering, content-based (TF-IDF), and association rules.
    """
    def __init__(self, spark: SparkSession,
                 interactions_indexed_df,
                 user2idx: Dict[str, int],
                 item2idx: Dict[str, int],
                 idx2user: Dict[int, str],
                 idx2item: Dict[int, str],
                 track_dict_broadcast,
                 als_model=None,
                 tfidf_df=None,
                 rules_df=None,
                 cooccurrences_df=None):
        self.spark = spark
        self.interactions_indexed_df = interactions_indexed_df
        self.user2idx = user2idx
        self.item2idx = item2idx
        self.idx2user = idx2user
        self.idx2item = idx2item
        self.track_dict_broadcast = track_dict_broadcast
        self.als_model = als_model
        self.tfidf_df = tfidf_df
        self.rules_df = rules_df
        self.cooccurrences_df = cooccurrences_df
    
    def recommend_for_user(self, user_id: str, top_k: int = 10, weights: Dict[str, float] = None):
        """
        Generate recommendations for a user.
        Returns list of (track_id, score) tuples.
        """
        if weights is None:
            weights = {'als': 0.6, 'coocc': 0.25, 'content': 0.15}
        
        if user_id not in self.user2idx:
            # Cold start: return popular items
            popular = self.interactions_indexed_df.groupBy("item_idx") \
                .agg(spark_sum("playcount").alias("total_plays")) \
                .orderBy(col("total_plays").desc()) \
                .limit(top_k) \
                .collect()
            return [(self.idx2item[row.item_idx], float(row.total_plays)) for row in popular]
        
        user_idx = self.user2idx[user_id]
        scores = defaultdict(float)
        
        # ALS recommendations
        if self.als_model is not None:
            try:
                # Create DataFrame with user_idx
                user_df = self.spark.createDataFrame([(user_idx,)], ["user"])
                als_recs = self.als_model.recommendForUserSubset(user_df, top_k * 5)
                
                recs = als_recs.select("recommendations").collect()
                if recs:
                    for rec_row in recs[0].recommendations:
                        item_idx = rec_row.item
                        score = rec_row.rating
                        track_id = self.idx2item.get(item_idx)
                        if track_id:
                            scores[track_id] += weights['als'] * float(score)
            except Exception as e:
                print(f"ALS recommendation error: {e}")
        
        # Get user's listened items
        user_items = self.interactions_indexed_df.filter(col("user_idx") == user_idx) \
            .select("item_idx").distinct().collect()
        user_item_set = {row.item_idx for row in user_items}
        
        # Co-occurrence recommendations
        if self.cooccurrences_df is not None and user_item_set:
            for item_idx in list(user_item_set)[:100]:  # Limit for performance
                coocc_recs = self.cooccurrences_df.filter(col("item_idx_1") == item_idx) \
                    .orderBy(col("cooccurrence_count").desc()) \
                    .limit(50) \
                    .collect()
                for row in coocc_recs:
                    track_id = self.idx2item.get(row.item_idx_2)
                    if track_id and track_id not in user_item_set:
                        scores[track_id] += weights['coocc'] * float(row.cooccurrence_count) / len(user_item_set)
        
        # Content-based recommendations (simplified - use TF-IDF similarity)
        # This would require more complex vector similarity calculations
        
        # Remove already listened items
        listened_tracks = {self.idx2item[i] for i in user_item_set}
        final_scores = [(tid, sc) for tid, sc in scores.items() if tid not in listened_tracks]
        final_scores.sort(key=lambda x: x[1], reverse=True)
        
        return final_scores[:top_k]
    
    def recommend_similar_song(self, track_id: str, top_k: int = 10, weights: Dict[str, float] = None):
        """
        Find similar songs to a given track.
        Returns list of (track_id, score) tuples.
        """
        if weights is None:
            weights = {'similars': 0.4, 'coocc': 0.2, 'content': 0.2}
        
        scores = defaultdict(float)
        track_dict = self.track_dict_broadcast.value
        
        # Metadata similars
        md = track_dict.get(track_id, {})
        for sid, sscore in md.get('similars', []):
            scores[sid] += weights['similars'] * float(sscore)
        
        # Item index
        item_idx = self.item2idx.get(track_id)
        if item_idx is not None:
            # Co-occurrence
            if self.cooccurrences_df is not None:
                coocc_recs = self.cooccurrences_df.filter(col("item_idx_1") == item_idx) \
                    .orderBy(col("cooccurrence_count").desc()) \
                    .limit(200) \
                    .collect()
                for row in coocc_recs:
                    sid = self.idx2item.get(row.item_idx_2)
                    if sid:
                        scores[sid] += weights['coocc'] * float(row.cooccurrence_count)
        
        # Remove the input track
        scores.pop(track_id, None)
        ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
        return ranked[:top_k]


In [None]:
# ---------------------------
# --- Hybrid recommender class ---
# ---------------------------

class HybridRecommender:
    def __init__(self, user_item: csr_matrix, user2idx: Dict[str,int], item2idx: Dict[str,int],
                 track_dict: Dict[str,Dict], als_model=None, fallback_mf=None, content_matrix=None,
                 tfidf_vectorizer=None, rules_df=None, item_norms=None):
        self.user_item = user_item
        self.user2idx = user2idx
        self.item2idx = item2idx
        self.idx2item = {v:k for k,v in item2idx.items()}
        self.track_dict = track_dict
        self.als_model = als_model
        self.fallback_mf = fallback_mf
        self.content_matrix = content_matrix
        self.tfidf_vectorizer = tfidf_vectorizer
        self.rules_df = rules_df
        self.item_norms = item_norms if item_norms is not None else precompute_item_norms(user_item)

    def recommend_similar_song(self, track_id: str, top_k: int = 10, weights: Dict[str,float] = None):
        if weights is None:
            weights = {'similars': 0.4, 'coocc': 0.2, 'cosine': 0.2, 'content': 0.2}
        scores = defaultdict(float)
        # metadata similars
        md = self.track_dict.get(track_id, {})
        for sid, sscore in md.get('similars', []):
            scores[sid] += weights['similars'] * float(sscore)
        # item index
        item_idx = self.item2idx.get(track_id, None)
        if item_idx is not None:
            # coocc
            for idx, conf in topk_cooccurrence(item_idx, self.user_item, top_k=200):
                sid = self.idx2item[idx]
                scores[sid] += weights['coocc'] * conf
            # cosine
            for idx, sc in topk_item_cosine(item_idx, self.user_item, self.item_norms, top_k=200):
                sid = self.idx2item[idx]
                scores[sid] += weights['cosine'] * sc
            # content
            if self.content_matrix is not None:
                q = self.content_matrix[item_idx]
                sims = q.dot(self.content_matrix.T).toarray().ravel()
                sims[item_idx] = 0.0
                topk = np.argpartition(-sims, min(200, len(sims)-1))[:min(200, len(sims)-1)]
                topk = topk[np.argsort(-sims[topk])]
                for idx in topk:
                    sid = self.idx2item[idx]
                    scores[sid] += weights['content'] * float(sims[idx])
        # rules: if rules_df exists, use consequents where antecedent subset matches track's similars
        if self.rules_df is not None and not self.rules_df.empty:
            user_items = set([s[0] for s in md.get('similars', [])])
            for _, row in self.rules_df.iterrows():
                antecedent = set(row['antecedents'])
                consequent = set(row['consequents'])
                if antecedent <= user_items:
                    for c in consequent:
                        scores[c] += 0.1 * row['confidence']
        # finalize
        scores.pop(track_id, None)
        ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
        return ranked[:top_k]

    def recommend_for_user(self, user_id: str, top_k: int = 10, weights: Dict[str,float] = None):
        if weights is None:
            weights = {'als':0.6, 'coocc':0.25, 'content':0.15}
        scored = defaultdict(float)
        if user_id not in self.user2idx:
            # cold user fallback: popular items
            pops = np.array(self.user_item.sum(axis=0)).ravel()
            top_idx = np.argpartition(-pops, top_k)[:top_k]
            top_idx = top_idx[np.argsort(-pops[top_idx])]
            return [(self.idx2item[i], float(pops[i])) for i in top_idx]
        uidx = self.user2idx[user_id]
        user_items = self.user_item[uidx].nonzero()[1].tolist()
        # ALS
        if self.als_model is not None:
            try:
                recs = self.als_model.recommend(uidx, self.user_item, N=top_k*5, filter_already_liked_items=True)
                for idx, sc in recs:
                    tid = self.idx2item[idx]
                    scored[tid] += weights['als'] * float(sc)
            except Exception:
                pass
        elif self.fallback_mf is not None:
            # fallback MF: use Surprise predictions (slow)
            algo, trainset = self.fallback_mf
            user_rated = set()
            try:
                u_inner = trainset.to_inner_uid(user_id)
                user_rated = set([trainset.to_raw_iid(i) for (i,_) in trainset.ur[u_inner]])
            except Exception:
                user_rated = set()
            candidates = [it for it in self.item2idx.keys() if it not in user_rated]
            preds=[]
            for it in candidates:
                try:
                    preds.append((it, algo.predict(user_id, it).est))
                except:
                    continue
            preds.sort(key=lambda x: x[1], reverse=True)
            for it,score in preds[:top_k*5]:
                scored[it] += weights['als'] * float(score)
        else:
            # no CF models available -> skip ALS part
            pass
        # cooccurrence aggregated
        for ui in user_items:
            for idx, conf in topk_cooccurrence(ui, self.user_item, top_k=50):
                tid = self.idx2item[idx]
                scored[tid] += weights['coocc'] * conf * (1.0 / max(1,len(user_items)))
        # content aggregated
        if self.content_matrix is not None:
            for ui in user_items:
                if ui >= self.content_matrix.shape[0]:
                    continue
                q = self.content_matrix[ui]
                sims = q.dot(self.content_matrix.T).toarray().ravel()
                topk = np.argpartition(-sims, min(50, len(sims)-1))[:min(50, len(sims)-1)]
                topk = topk[np.argsort(-sims[topk])]
                for idx in topk:
                    tid = self.idx2item[idx]
                    scored[tid] += weights['content'] * float(sims[idx]) * (1.0 / max(1,len(user_items)))
        # rules-based boost using rules_df if available
        if self.rules_df is not None and not self.rules_df.empty:
            user_item_ids = set([self.idx2item[i] for i in user_items])
            for _, row in self.rules_df.iterrows():
                antecedent = set(row['antecedents'])
                consequent = set(row['consequents'])
                if antecedent <= user_item_ids:
                    for c in consequent:
                        if c not in user_item_ids:
                            scored[c] += 0.1 * row['confidence']
        # remove already listened
        listened = set([self.idx2item[i] for i in user_items])
        final = [(tid, sc) for tid, sc in scored.items() if tid not in listened]
        final.sort(key=lambda x: x[1], reverse=True)
        return final[:top_k]


In [None]:
# ---------------------------
# --- Full pipeline builder (PySpark) ---
# ---------------------------

def build_and_run_pipeline_main_spark(spark: SparkSession,
                                     interactions_hdfs_path: str = "/data/train_triplets.txt",
                                     tracks_hdfs_path: str = "/data/lastfm_data.csv",
                                     fpgrowth_min_support: float = 0.001,
                                     fpgrowth_sample_frac: float = 0.2,
                                     als_factors: int = 64,
                                     als_iterations: int = 15):
    print(f"\n\n{'#'*60}")
    print(f"{'#'*60}")
    print(f"# PYSPARK HYBRID RECOMMENDATION SYSTEM - FULL PIPELINE")
    print(f"{'#'*60}")
    print(f"{'#'*60}\n")
    
    print(f"[CONFIG] Pipeline parameters:")
    print(f"  - Interactions HDFS path: {interactions_hdfs_path}")
    print(f"  - Tracks HDFS path: {tracks_hdfs_path}")
    print(f"  - FP-Growth min support: {fpgrowth_min_support}")
    print(f"  - FP-Growth sample fraction: {fpgrowth_sample_frac}")
    print(f"  - ALS factors: {als_factors}")
    print(f"  - ALS iterations: {als_iterations}")
    
    # 1) Load track metadata from HDFS
    print(f"\n\n{'*'*60}")
    print(f"PHASE 1: LOAD TRACK METADATA FROM HDFS")
    print(f"{'*'*60}\n")
    tracks_df = load_tracks_from_hdfs(spark, tracks_hdfs_path)
    track_dict_broadcast = build_track_dict_spark(tracks_df)
    
    # 2) Load interactions and build mappings
    print(f"\n\n{'*'*60}")
    print(f"PHASE 2: LOAD INTERACTIONS AND BUILD MAPPINGS")
    print(f"{'*'*60}\n")
    interactions_df = load_interactions_spark(spark, None, interactions_hdfs_path)
    interactions_indexed, user2idx, item2idx, idx2user, idx2item, user_idx_model, item_idx_model = \
        build_mappings_and_user_item_spark(interactions_df, spark)
    
    # 3) Build TF-IDF content features
    print(f"\n\n{'*'*60}")
    print(f"PHASE 3: BUILD CONTENT-BASED FEATURES (TF-IDF)")
    print(f"{'*'*60}\n")
    try:
        tfidf_df, cv_model, idf_model, normalizer = build_tfidf_content_matrix_spark(
            tracks_df, item2idx, max_features=50000
        )
        print(f"✓ Content matrix successfully built!")
    except Exception as e:
        print(f"\n✗ TF-IDF build failed: {e}")
        print(f"  - Continuing without content features...")
        tfidf_df, cv_model, idf_model, normalizer = None, None, None, None
    
    # 4) Compute item similarities (co-occurrence)
    print(f"\n\n{'*'*60}")
    print(f"PHASE 4: COMPUTE ITEM SIMILARITIES")
    print(f"{'*'*60}\n")
    try:
        cooccurrences_df = compute_item_similarities_spark(interactions_indexed, item2idx)
        print(f"✓ Item similarities computed!")
    except Exception as e:
        print(f"\n✗ Similarity computation failed: {e}")
        cooccurrences_df = None
    
    # 5) Build FP-Growth association rules
    print(f"\n\n{'*'*60}")
    print(f"PHASE 5: BUILD ASSOCIATION RULES (FP-GROWTH)")
    print(f"{'*'*60}\n")
    rules_df = None
    try:
        rules_df, freq_itemsets, fpgrowth_model = build_fpgrowth_rules_spark(
            interactions_indexed,
            min_support=fpgrowth_min_support,
            sample_frac=fpgrowth_sample_frac,
            min_confidence=0.2
        )
        print(f"✓ Rules generated successfully!")
    except Exception as e:
        print(f"\n✗ FP-Growth rules generation failed: {e}")
        print(f"  - Continuing without association rules...")
        rules_df = None
    
    # 6) Train ALS model
    print(f"\n\n{'*'*60}")
    print(f"PHASE 6: TRAIN ALS COLLABORATIVE FILTERING MODEL")
    print(f"{'*'*60}\n")
    als_model = None
    try:
        als_model, als_data = train_als_model_spark(
            interactions_indexed,
            factors=als_factors,
            regularization=0.01,
            iterations=als_iterations,
            implicit_prefs=True,
            alpha=40.0
        )
        print(f"✓ ALS training successful!")
    except Exception as e:
        print(f"\n✗ ALS training failed: {e}")
        als_model = None
    
    # 7) Build recommender object
    print(f"\n\n{'*'*60}")
    print(f"PHASE 7: INITIALIZE HYBRID RECOMMENDER OBJECT")
    print(f"{'*'*60}\n")
    print(f"[Setup] Creating HybridRecommenderSpark with all components...")
    
    hreco = HybridRecommenderSpark(
        spark=spark,
        interactions_indexed_df=interactions_indexed,
        user2idx=user2idx,
        item2idx=item2idx,
        idx2user=idx2user,
        idx2item=idx2item,
        track_dict_broadcast=track_dict_broadcast,
        als_model=als_model,
        tfidf_df=tfidf_df,
        rules_df=rules_df,
        cooccurrences_df=cooccurrences_df
    )
    
    print(f"\n✓ HybridRecommenderSpark object created successfully!")
    print(f"\n[COMPONENTS SUMMARY]")
    print(f"  - Interactions DataFrame: {'✓' if hreco.interactions_indexed_df is not None else '✗'}")
    print(f"  - ALS model: {'✓' if hreco.als_model is not None else '✗'}")
    print(f"  - TF-IDF features: {'✓' if hreco.tfidf_df is not None else '✗'}")
    print(f"  - Association rules: {'✓' if hreco.rules_df is not None else '✗'}")
    print(f"  - Co-occurrences: {'✓' if hreco.cooccurrences_df is not None else '✗'}")
    print(f"  - Track dictionary: {'✓' if hreco.track_dict_broadcast is not None else '✗'}")
    print(f"\n✓ Pipeline built successfully!\n")
    
    return hreco, rules_df


In [None]:
# ---------------------------
# --- Entry point example (PySpark) ---
# ---------------------------

# Example usage:
# hreco, rules = build_and_run_pipeline_main_spark(
#     spark=spark,
#     interactions_hdfs_path="/data/train_triplets.txt",
#     tracks_hdfs_path="/data/lastfm_data.csv",
#     fpgrowth_min_support=0.001,
#     fpgrowth_sample_frac=0.2,
#     als_factors=64,
#     als_iterations=15
# )

# Test recommendations:
# sample_user = next(iter(hreco.user2idx.keys()))
# sample_track = next(iter(hreco.item2idx.keys()))
# user_recs = hreco.recommend_for_user(sample_user, top_k=10)
# similar_tracks = hreco.recommend_similar_song(sample_track, top_k=10)



STARTING RECOMMENDATION SYSTEM - MAIN ENTRY POINT

[VALIDATION] Checking input files...
  - Interactions path: train_triplets.txt
  - Tracks CSV path: tracks.csv
  ✓ Interactions file exists
  ✓ Tracks CSV file exists

[BUILDING] Initializing recommendation pipeline...
  This will take time and memory - please be patient...



############################################################
############################################################
# HYBRID RECOMMENDATION SYSTEM - FULL PIPELINE INITIALIZATION
############################################################
############################################################

[CONFIG] Pipeline parameters:
  - Interactions file: train_triplets.txt
  - Tracks CSV file: tracks.csv
  - Chunk size: 2,000,000
  - FP-Growth min support: 0.001
  - FP-Growth sample fraction: 0.2
  - USE_IMPLICIT: True
  - FP_AVAILABLE: True


************************************************************
PHASE 1: LOAD TRACK METADATA
***************************