# Hybird Core-based + Stratified Sampling

In [1]:
import json
import pandas as pd
import numpy as np
from collections import Counter, defaultdict
import random
import glob
import os
from typing import Dict, List, Tuple, Set, Iterator
import gc
import psutil
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

In [2]:
# Set random seed for reproducibility
random.seed(42)
np.random.seed(42)

print("🎵 HYBRID CORE-BASED + STRATIFIED SAMPLING")
print("=" * 70)
print("💾 Memory-efficient streaming approach for large datasets")
print("🎯 Combines: Core filtering + Stratified sampling + Priority scoring")
print()

🎵 HYBRID CORE-BASED + STRATIFIED SAMPLING
💾 Memory-efficient streaming approach for large datasets
🎯 Combines: Core filtering + Stratified sampling + Priority scoring



## 1. Memory Monitoring Utilities

In [3]:
def get_memory_usage():
    """Get current memory usage in MB"""
    try:
        process = psutil.Process(os.getpid())
        return process.memory_info().rss / 1024 / 1024  # MB
    except:
        return 0.0

def print_memory_status(stage: str):
    """Print current memory usage"""
    memory_mb = get_memory_usage()
    print(f"💾 Memory usage after {stage}: {memory_mb:.1f} MB")

print("✅ Memory monitoring utilities loaded")

✅ Memory monitoring utilities loaded


## 2. Streaming Sampler Class

In [10]:
class HybridStreamingSampler:
    """
    Hybrid Core-Based + Stratified Streaming Sampler
    - Pass 1: Core-based filtering (length, user activity, track frequency)
    - Pass 2: Stratified sampling with priority scoring
    """

    def __init__(self,
                 target_playlists: int = 50000,
                 batch_size: int = 20,
                 min_playlist_length: int = 10,
                 max_playlist_length: int = 100,
                 min_track_frequency: int = 5,
                 min_user_playlists: int = 3):

        self.target_playlists = target_playlists
        self.batch_size = batch_size
        self.min_playlist_length = min_playlist_length
        self.max_playlist_length = max_playlist_length
        self.min_track_frequency = min_track_frequency
        self.min_user_playlists = min_user_playlists

        # Statistics collectors
        self.track_counts = Counter()
        self.user_counts = Counter()
        self.playlist_stats = []

        print(f"🎯 Hybrid Streaming Sampler Configuration:")
        print(f"   • Target playlists: {target_playlists:,}")
        print(f"   • Batch size: {batch_size} files at a time")
        print(f"   • Playlist length: {min_playlist_length}-{max_playlist_length} tracks")
        print(f"   • Min track frequency: {min_track_frequency} playlists")
        print(f"   • Min user playlists: {min_user_playlists} playlists")
        print()

    def get_file_batches(self, file_pattern: str) -> List[List[str]]:
        """Split files into manageable batches"""
        file_paths = glob.glob(file_pattern)
        file_paths.sort()

        if not file_paths:
            raise FileNotFoundError(f"No files found: {file_pattern}")

        print(f"📁 Found {len(file_paths)} files")

        # Split into batches
        batches = []
        for i in range(0, len(file_paths), self.batch_size):
            batch = file_paths[i:i + self.batch_size]
            batches.append(batch)

        print(f"📦 Created {len(batches)} batches of ~{self.batch_size} files each")
        return batches

    def _extract_user_id(self, playlist: Dict) -> str:
        """Extract user identifier from playlist"""
        # Method 1: Use name-based grouping
        name = playlist.get('name', '').lower().strip()
        if name:
            # Use first word of playlist name as user identifier
            words = name.split()
            if words:
                user_id = words[0]
                # Clean user_id (keep only alphanumeric)
                user_id = ''.join(c for c in user_id if c.isalnum())[:20]
                return user_id if user_id else 'anonymous'

        # Method 2: Use PID-based grouping (if no name)
        pid = playlist.get('pid', 0)
        return f"user_{pid % 10000}"  # Group into ~10k users

    def pass1_core_filtering(self, file_pattern: str) -> Dict:
        """
        PASS 1: Core-based filtering + Statistics collection
        """
        print("🔍 PASS 1: HYBRID CORE-BASED FILTERING")
        print("=" * 60)

        batches = self.get_file_batches(file_pattern)

        # Stage counts
        stage_counts = {
            'total_seen': 0,
            'passed_length_filter': 0,
            'passed_user_filter': 0,
            'passed_track_frequency_filter': 0,
            'final_valid': 0
        }

        print("🚀 Applying Core-Based Filters:")
        print(f"   ✅ Step 1: Playlist length ({self.min_playlist_length}-{self.max_playlist_length} tracks)")
        print(f"   ✅ Step 2: User activity (≥{self.min_user_playlists} playlists per user)")
        print(f"   ✅ Step 3: Track frequency (≥{self.min_track_frequency} appearances)")
        print()

        # Sub-pass 1a: Collect user activity statistics
        print("📊 Sub-pass 1a: Collecting user activity statistics...")
        user_playlist_count = Counter()

        for batch_idx, file_batch in enumerate(batches):
            if batch_idx % 20 == 0:
                print(f"   User stats progress: {batch_idx + 1}/{len(batches)} batches")

            for file_path in file_batch:
                try:
                    with open(file_path, 'r') as f:
                        data = json.load(f)

                    file_playlists = data.get('playlists', [])

                    for playlist in file_playlists:
                        user_id = self._extract_user_id(playlist)
                        user_playlist_count[user_id] += 1

                except Exception as e:
                    continue

        # Identify active users
        active_users = {
            user for user, count in user_playlist_count.items()
            if count >= self.min_user_playlists
        }

        print(f"   ✅ Identified {len(active_users):,} active users")
        print()

        # Sub-pass 1b: Apply all core filters
        print("🔍 Sub-pass 1b: Applying all core filters...")

        for batch_idx, file_batch in enumerate(batches):
            print(f"📦 Processing batch {batch_idx + 1}/{len(batches)}")

            batch_playlists = []

            # Load batch
            for file_path in file_batch:
                try:
                    with open(file_path, 'r') as f:
                        data = json.load(f)

                    file_playlists = data.get('playlists', [])

                    # Add source file info
                    for playlist in file_playlists:
                        playlist['_source_file'] = file_path

                    batch_playlists.extend(file_playlists)

                except Exception as e:
                    print(f"   ⚠️  Error loading {os.path.basename(file_path)}: {e}")
                    continue

            # Process batch with core filtering
            for playlist_idx, playlist in enumerate(batch_playlists):
                stage_counts['total_seen'] += 1

                # CORE FILTER 1: Playlist length
                tracks = playlist.get('tracks', [])
                playlist_length = len(tracks)

                if not (self.min_playlist_length <= playlist_length <= self.max_playlist_length):
                    continue
                stage_counts['passed_length_filter'] += 1

                # CORE FILTER 2: User activity
                user_id = self._extract_user_id(playlist)
                if user_id not in active_users:
                    continue
                stage_counts['passed_user_filter'] += 1

                # Count tracks for frequency analysis
                playlist_tracks = set()
                for track in tracks:
                    track_uri = track.get('track_uri', '')
                    if track_uri:
                        self.track_counts[track_uri] += 1
                        playlist_tracks.add(track_uri)

                self.user_counts[user_id] += 1

                # Store playlist metadata
                playlist_metadata = {
                    'file_path': playlist['_source_file'],
                    'pid': playlist.get('pid'),
                    'length': playlist_length,
                    'modified_at': playlist.get('modified_at', 0),
                    'user_id': user_id,
                    'track_uris': list(playlist_tracks),
                    'name': playlist.get('name', ''),
                    'collaborative': playlist.get('collaborative', False),
                    'num_followers': playlist.get('num_followers', 0)
                }

                self.playlist_stats.append(playlist_metadata)

            # Clear batch from memory
            del batch_playlists
            gc.collect()

            if batch_idx % 10 == 0:
                print(f"   Progress: {stage_counts['total_seen']:,} seen, {len(self.playlist_stats):,} valid so far")
                print_memory_status(f"batch {batch_idx + 1}")

        # CORE FILTER 3: Track frequency
        print(f"\n🔍 Applying final core filter: Track frequency")

        core_tracks = {
            track for track, count in self.track_counts.items()
            if count >= self.min_track_frequency
        }

        print(f"   ✅ Identified {len(core_tracks):,} core tracks")

        # Filter playlists that have core tracks
        filtered_playlist_stats = []
        for playlist_meta in self.playlist_stats:
            playlist_tracks = set(playlist_meta['track_uris'])
            if playlist_tracks.intersection(core_tracks):
                filtered_playlist_stats.append(playlist_meta)
                stage_counts['passed_track_frequency_filter'] += 1

        self.playlist_stats = filtered_playlist_stats
        stage_counts['final_valid'] = len(filtered_playlist_stats)

        print(f"\n✅ CORE-BASED FILTERING COMPLETE:")
        print(f"   📊 Filtering Funnel:")
        print(f"      • Total playlists: {stage_counts['total_seen']:,}")
        print(f"      • Length filter: {stage_counts['passed_length_filter']:,} ({stage_counts['passed_length_filter']/stage_counts['total_seen']*100:.1f}%)")
        print(f"      • User filter: {stage_counts['passed_user_filter']:,} ({stage_counts['passed_user_filter']/stage_counts['total_seen']*100:.1f}%)")
        print(f"      • Track frequency: {stage_counts['passed_track_frequency_filter']:,} ({stage_counts['passed_track_frequency_filter']/stage_counts['total_seen']*100:.1f}%)")
        print(f"      • 🎯 FINAL VALID: {stage_counts['final_valid']:,} ({stage_counts['final_valid']/stage_counts['total_seen']*100:.1f}%)")
        print()

        return {
            'total_playlists': stage_counts['total_seen'],
            'valid_playlists': stage_counts['final_valid'],
            'core_tracks': core_tracks,
            'active_users': active_users,
            'unique_tracks': len(self.track_counts),
            'stage_counts': stage_counts
        }

    def create_strata(self) -> Dict[str, List[int]]:
        """Create comprehensive strata for stratified sampling"""
        print("📊 CREATING STRATIFIED SAMPLING STRATA")
        print("=" * 50)

        # Get temporal split
        timestamps = [p['modified_at'] for p in self.playlist_stats if p['modified_at'] > 0]
        if timestamps:
            median_time = np.median(timestamps)
        else:
            median_time = 1500000000  # Default

        # Get user activity split
        user_playlist_counts = {}
        for playlist_meta in self.playlist_stats:
            user_id = playlist_meta['user_id']
            user_playlist_counts[user_id] = user_playlist_counts.get(user_id, 0) + 1

        user_activity_median = np.median(list(user_playlist_counts.values())) if user_playlist_counts else 5

        print(f"   📅 Temporal split at timestamp: {median_time}")
        print(f"   👥 User activity split at: {user_activity_median} playlists per user")

        # Create 12 comprehensive strata
        strata = {
            'short_old_casual': [], 'short_old_active': [],
            'short_recent_casual': [], 'short_recent_active': [],
            'medium_old_casual': [], 'medium_old_active': [],
            'medium_recent_casual': [], 'medium_recent_active': [],
            'long_old_casual': [], 'long_old_active': [],
            'long_recent_casual': [], 'long_recent_active': []
        }

        for i, playlist_meta in enumerate(self.playlist_stats):
            length = playlist_meta['length']
            timestamp = playlist_meta['modified_at']
            user_id = playlist_meta['user_id']
            user_activity = user_playlist_counts.get(user_id, 1)

            # Length category
            if length <= 30:
                length_cat = 'short'
            elif length <= 60:
                length_cat = 'medium'
            else:
                length_cat = 'long'

            # Time category
            time_cat = 'recent' if timestamp >= median_time else 'old'

            # User activity category
            activity_cat = 'active' if user_activity >= user_activity_median else 'casual'

            # Combine into stratum
            stratum_key = f"{length_cat}_{time_cat}_{activity_cat}"
            strata[stratum_key].append(i)

        # Print strata distribution
        print("   📋 Strata Distribution:")
        total_playlists = len(self.playlist_stats)

        for stratum, indices in strata.items():
            if indices:  # Only show non-empty strata
                percentage = len(indices) / total_playlists * 100
                print(f"      • {stratum:20s}: {len(indices):6,} ({percentage:4.1f}%)")

        print()
        return strata

    def _calculate_priority_score(self, playlist_meta: Dict) -> float:
        """Calculate priority score for playlist selection"""
        score = 0.0

        # Factor 1: Track diversity (30% weight)
        unique_tracks = len(playlist_meta['track_uris'])
        playlist_length = playlist_meta['length']
        if playlist_length > 0:
            track_diversity_ratio = unique_tracks / playlist_length
            score += track_diversity_ratio * 3.0

        # Factor 2: User engagement (25% weight)
        num_followers = playlist_meta.get('num_followers', 0)
        if num_followers > 0:
            follower_score = min(np.log10(num_followers + 1), 3.0)
            score += follower_score * 2.5

        # Factor 3: Playlist completeness (20% weight)
        name = playlist_meta.get('name', '')
        has_good_name = len(name.strip()) > 3 and not name.lower().startswith('my playlist')
        if has_good_name:
            score += 2.0

        # Factor 4: Collaborative playlists bonus (10% weight)
        if playlist_meta.get('collaborative', False):
            score += 1.0

        # Factor 5: Length balance bonus (15% weight)
        length = playlist_meta['length']
        if 20 <= length <= 80:  # Sweet spot
            score += 1.5

        return score

    def pass2_stratified_sampling(self, strata: Dict[str, List[int]]) -> List[Dict]:
        """
        PASS 2: Stratified sampling with priority scoring
        """
        print("🎲 PASS 2: STRATIFIED SAMPLING WITH PRIORITY SCORING")
        print("=" * 60)

        total_available = len(self.playlist_stats)

        if total_available <= self.target_playlists:
            print(f"   📝 Available ({total_available:,}) ≤ target ({self.target_playlists:,})")
            selected_indices = list(range(total_available))
        else:
            sampling_ratio = self.target_playlists / total_available
            selected_indices = set()

            print(f"   📊 Global sampling ratio: {sampling_ratio:.3f}")
            print(f"   🏆 Using priority scoring within strata")
            print()

            for stratum, indices in strata.items():
                if not indices:
                    continue

                stratum_target = max(1, int(len(indices) * sampling_ratio))
                stratum_target = min(stratum_target, len(indices))

                # Score playlists in this stratum
                scored_playlists = []
                for idx in indices:
                    playlist_meta = self.playlist_stats[idx]
                    score = self._calculate_priority_score(playlist_meta)
                    scored_playlists.append((idx, score))

                # Sort by score and sample
                scored_playlists.sort(key=lambda x: x[1], reverse=True)

                # Hybrid: 70% top-scored + 30% random
                top_count = int(stratum_target * 0.7)
                random_count = stratum_target - top_count

                selected = [idx for idx, _ in scored_playlists[:top_count]]

                if random_count > 0 and len(scored_playlists) > top_count:
                    remaining = [idx for idx, _ in scored_playlists[top_count:]]
                    if len(remaining) >= random_count:
                        selected.extend(random.sample(remaining, random_count))
                    else:
                        selected.extend(remaining)

                selected_indices.update(selected)

                avg_score = np.mean([score for _, score in scored_playlists[:len(selected)]])
                print(f"      • {stratum:20s}: {len(selected):4,} / {len(indices):5,} (avg score: {avg_score:.2f})")

            selected_indices = list(selected_indices)

        print(f"\n   🎯 Selected {len(selected_indices):,} playlists for final loading")

        # Load selected playlists
        return self._load_selected_playlists(selected_indices)

    def _load_selected_playlists(self, selected_indices: List[int]) -> List[Dict]:
        """Load only the selected playlists from files"""
        print("   📁 Loading selected playlists...")

        # Group by file
        file_to_playlists = defaultdict(list)
        for idx in selected_indices:
            playlist_meta = self.playlist_stats[idx]
            file_path = playlist_meta['file_path']
            file_to_playlists[file_path].append(playlist_meta)

        print(f"   📂 Loading from {len(file_to_playlists)} files")

        # Load playlists
        final_playlists = []

        for file_idx, (file_path, playlist_metas) in enumerate(file_to_playlists.items()):
            if file_idx % 100 == 0:
                print(f"      📖 File {file_idx + 1}/{len(file_to_playlists)}")

            try:
                with open(file_path, 'r') as f:
                    data = json.load(f)

                file_playlists = data.get('playlists', [])
                pid_to_playlist = {p.get('pid'): p for p in file_playlists}

                for meta in playlist_metas:
                    pid = meta['pid']
                    if pid in pid_to_playlist:
                        playlist = pid_to_playlist[pid]
                        playlist['_sampling_score'] = self._calculate_priority_score(meta)
                        final_playlists.append(playlist)

            except Exception as e:
                continue

        print(f"   ✅ Loaded {len(final_playlists):,} final playlists")
        return final_playlists

    def run_hybrid_sampling(self, file_pattern: str) -> Tuple[List[Dict], Dict]:
        """Main method: Complete hybrid sampling workflow"""
        print("🚀 STARTING HYBRID CORE-BASED + STRATIFIED SAMPLING")
        print("=" * 70)

        print_memory_status("start")

        # Pass 1: Core-based filtering
        stats = self.pass1_core_filtering(file_pattern)
        print_memory_status("pass 1 complete")

        # Create strata
        strata = self.create_strata()
        print_memory_status("strata created")

        # Pass 2: Stratified sampling
        final_playlists = self.pass2_stratified_sampling(strata)
        print_memory_status("pass 2 complete")

        # Final statistics
        final_stats = {
            'methodology': 'hybrid_core_based_stratified_streaming',
            'original_total': stats['total_playlists'],
            'final_sampled': len(final_playlists),
            'retention_rate': len(final_playlists) / stats['total_playlists'],
            'core_filtering_retention': stats['valid_playlists'] / stats['total_playlists'],
            'unique_tracks': stats['unique_tracks'],
            'core_tracks_count': len(stats['core_tracks']),
            'active_users_count': len(stats['active_users']),
            'stage_counts': stats['stage_counts']
        }

        print("\n🎉 HYBRID SAMPLING COMPLETE!")
        print("=" * 70)
        print(f"📊 Results: {stats['total_playlists']:,} → {len(final_playlists):,} playlists")
        print(f"📈 Overall retention: {len(final_playlists) / stats['total_playlists']:.1%}")

        return final_playlists, final_stats

print("✅ HybridStreamingSampler class loaded")

✅ HybridStreamingSampler class loaded


## 3. Convenience Functions

In [11]:
def run_hybrid_sampling(file_pattern: str,
                       target_size: int = 50000,
                       batch_size: int = 20):
    """
    One-function call to run complete hybrid sampling

    Args:
        file_pattern: e.g., "../data/raw/data/mpd.slice.*.json"
        target_size: Target number of playlists
        batch_size: Files to process at once (adjust for RAM)
    """

    # Initialize sampler
    sampler = HybridStreamingSampler(
        target_playlists=target_size,
        batch_size=batch_size,
        min_playlist_length=10,
        max_playlist_length=100,
        min_track_frequency=5,
        min_user_playlists=3
    )

    # Run sampling
    sampled_playlists, stats = sampler.run_hybrid_sampling(file_pattern)

    # Save results
    output_data = {
        'info': {
            'generated_on': datetime.now().isoformat(),
            'sampling_method': 'hybrid_core_based_stratified_streaming',
            'parameters': {
                'target_playlists': target_size,
                'batch_size': batch_size,
                'min_playlist_length': 10,
                'max_playlist_length': 100,
                'min_track_frequency': 5,
                'min_user_playlists': 3
            }
        },
        'sampling_stats': stats,
        'playlists': sampled_playlists
    }

    # Save
    os.makedirs('../data/processed', exist_ok=True)
    output_file = f'../data/processed/spotify_hybrid_sampled_{target_size}.json'

    with open(output_file, 'w') as f:
        json.dump(output_data, f, indent=2)

    file_size_mb = os.path.getsize(output_file) / (1024 * 1024)
    print(f"\n💾 Saved to: {output_file}")
    print(f"📦 File size: {file_size_mb:.1f} MB")

    return sampled_playlists, stats, output_file

print("✅ Convenience functions loaded")

✅ Convenience functions loaded


In [12]:
# Configure for your system
file_pattern = "../data/raw/data/mpd.slice.*.json"  # Your file path
target_size = 50000    # Target playlists
batch_size = 20        # Adjust based on your RAM:
                       # 20 for 8GB+ RAM
                       # 10 for 4GB RAM
                       # 5 for 2GB RAM

# Run hybrid sampling
sampled_playlists, stats, output_file = run_hybrid_sampling(
    file_pattern=file_pattern,
    target_size=target_size,
    batch_size=batch_size
)

🎯 Hybrid Streaming Sampler Configuration:
   • Target playlists: 50,000
   • Batch size: 20 files at a time
   • Playlist length: 10-100 tracks
   • Min track frequency: 5 playlists
   • Min user playlists: 3 playlists

🚀 STARTING HYBRID CORE-BASED + STRATIFIED SAMPLING
💾 Memory usage after start: 39.9 MB
🔍 PASS 1: HYBRID CORE-BASED FILTERING
📁 Found 1000 files
📦 Created 50 batches of ~20 files each
🚀 Applying Core-Based Filters:
   ✅ Step 1: Playlist length (10-100 tracks)
   ✅ Step 2: User activity (≥3 playlists per user)
   ✅ Step 3: Track frequency (≥5 appearances)

📊 Sub-pass 1a: Collecting user activity statistics...
   User stats progress: 1/50 batches
   User stats progress: 21/50 batches
   User stats progress: 41/50 batches
   ✅ Identified 8,526 active users

🔍 Sub-pass 1b: Applying all core filters...
📦 Processing batch 1/50
   Progress: 20,000 seen, 14,818 valid so far
💾 Memory usage after batch 1: 4096.2 MB
📦 Processing batch 2/50
📦 Processing batch 3/50
📦 Processing batch