In [1]:
pwd


'/Users/spartan/Downloads/kafka_2.13-3.8.0/spotify_realtime/enhanced_spotify_streaming'

In [7]:
# spotify_fm_analysis.py

import mmh3
import numpy as np
from tabulate import tabulate
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import time

class SpotifyFM:
    def __init__(self, client_id, client_secret):
        """Initialize with Spotify credentials"""
        self.spotify = spotipy.Spotify(
            client_credentials_manager=SpotifyClientCredentials(
                client_id=client_id,
                client_secret=client_secret
            )
        )
        self.fm_estimator = FlajoletMartin(num_estimators=32)
        
    def get_playlist_tracks(self, playlist_id):
        """Get tracks from a Spotify playlist"""
        results = self.spotify.playlist_tracks(playlist_id)
        tracks = []
        
        print("\nFetching playlist data...")
        for item in results['items']:
            if item['track']:
                track_info = {
                    'name': item['track']['name'],
                    'artist': item['track']['artists'][0]['name'],
                    'popularity': item['track']['popularity']
                }
                tracks.append(track_info)
                print(f"Found track: {track_info['name']} by {track_info['artist']}")
        
        return tracks
    
    def analyze_unique_tracks(self, playlist_id):
        """Analyze unique tracks in a playlist using FM algorithm"""
        tracks = self.get_playlist_tracks(playlist_id)
        
        print("\n=== Playlist Analysis ===")
        print(f"Total tracks fetched: {len(tracks)}")
        
        # Get actual unique count
        unique_tracks = set(track['name'] for track in tracks)
        print(f"Actual unique tracks: {len(unique_tracks)}")
        
        # Use FM algorithm
        for track in tracks:
            self.fm_estimator.add(track['name'])
        
        estimate = self.fm_estimator.estimate()
        error_percentage = abs(estimate - len(unique_tracks)) / len(unique_tracks) * 100
        
        print("\nFlajolet-Martin Analysis Results:")
        print(f"Estimated unique tracks: {estimate:.2f}")
        print(f"Error percentage: {error_percentage:.2f}%")
        
        # Show top artists
        artist_counts = {}
        for track in tracks:
            artist = track['artist']
            artist_counts[artist] = artist_counts.get(artist, 0) + 1
        
        print("\nTop Artists in Playlist:")
        sorted_artists = sorted(artist_counts.items(), key=lambda x: x[1], reverse=True)[:5]
        for artist, count in sorted_artists:
            print(f"- {artist}: {count} tracks")

class FlajoletMartin:
    """Flajolet-Martin Algorithm Implementation"""
    def __init__(self, num_estimators=32):
        self.num_estimators = num_estimators
        self.max_zeros = [0] * num_estimators
        self.item_history = {}
    
    def add(self, item):
        if item not in self.item_history:
            self.item_history[item] = []
        
        for i in range(self.num_estimators):
            hash_val = mmh3.hash(str(item), seed=i)
            binary = bin(hash_val)[2:] if hash_val >= 0 else bin(hash_val)[3:]
            trailing_zeros = len(binary) - len(binary.rstrip('0'))
            
            self.item_history[item].append({
                'estimator': i,
                'hash_value': hash_val,
                'trailing_zeros': trailing_zeros
            })
            
            self.max_zeros[i] = max(self.max_zeros[i], trailing_zeros)
    
    def estimate(self):
        avg_zeros = sum(self.max_zeros) / self.num_estimators
        return 2 ** avg_zeros
    
    def explain_estimate(self):
        """Explain the estimation process"""
        print("\n=== Estimation Process Details ===")
        print(f"Number of estimators used: {self.num_estimators}")
        print(f"Average trailing zeros: {sum(self.max_zeros) / self.num_estimators:.2f}")
        print("\nSample of processed items:")
        
        # Show details for first 3 items
        for i, (item, hashes) in enumerate(list(self.item_history.items())[:3]):
            print(f"\nItem {i+1}: {item}")
            data = []
            for h in hashes[:3]:  # Show first 3 hash functions
                data.append([
                    h['estimator'],
                    h['hash_value'],
                    h['trailing_zeros']
                ])
            print(tabulate(data, 
                         headers=['Hash Function', 'Hash Value', 'Trailing Zeros'],
                         tablefmt='grid'))

def main():
    # Your Spotify API credentials
    CLIENT_ID = '95a8aadaabde43e2977306a50410c381'
    CLIENT_SECRET = 'dcb6c9927af244d2a293d52d6634c21d'
    
    # Initialize SpotifyFM analyzer
    spotify_fm = SpotifyFM(CLIENT_ID, CLIENT_SECRET)
    
    # Analyze different playlists
    playlists = [
        ('37i9dQZEVXbMDoHDwVN2tF', 'Global Top 50'),
        ('37i9dQZEVXbLRQDuF5jeBp', 'US Top 50'),
    ]
    
    for playlist_id, name in playlists:
        print(f"\n=== Analyzing Playlist: {name} ===")
        spotify_fm.analyze_unique_tracks(playlist_id)
        spotify_fm.fm_estimator.explain_estimate()
        time.sleep(1)  # Respect API rate limits

if __name__ == "__main__":
    main()


=== Analyzing Playlist: Global Top 50 ===

Fetching playlist data...
Found track: APT. by ROSÉ
Found track: Die With A Smile by Lady Gaga
Found track: luther (with sza) by Kendrick Lamar
Found track: squabble up by Kendrick Lamar
Found track: BIRDS OF A FEATHER by Billie Eilish
Found track: tv off (feat. lefty gunplay) by Kendrick Lamar
Found track: That’s So True by Gracie Abrams
Found track: Who by Jimin
Found track: wacced out murals by Kendrick Lamar
Found track: All I Want for Christmas Is You by Mariah Carey
Found track: Sailor Song by Gigi Perez
Found track: Running Wild by Jin
Found track: Last Christmas - Single Version by Wham!
Found track: Tu Boda by Oscar Maydon
Found track: hey now (feat. dody6) by Kendrick Lamar
Found track: reincarnated by Kendrick Lamar
Found track: man at the garden by Kendrick Lamar
Found track: WILDFLOWER by Billie Eilish
Found track: Espresso by Sabrina Carpenter
Found track: dodger blue (feat. wallie the sensei, siete7x, roddy ricch) by Kendrick L

In [None]:
# dgim_spotify.py

from kafka import KafkaConsumer
import json
from collections import deque
import time

class DGIMCounter:
    def __init__(self, window_size):
        self.window_size = window_size
        self.buckets = deque()
        self.timestamp = 0
        self.current_window = []
    
    def add(self, bit, value_info=None):
        # Add to current window
        if len(self.current_window) >= self.window_size:
            self.current_window.pop(0)
        self.current_window.append((bit, value_info))
        
        # Add new bucket if bit is 1
        if bit == 1:
            self.buckets.append((self.timestamp, 1))
            self._merge_buckets()
        
        # Remove old buckets
        while self.buckets and self.timestamp - self.buckets[0][0] >= self.window_size:
            self.buckets.popleft()
        
        self.timestamp += 1
    
    def _merge_buckets(self):
        i = len(self.buckets) - 1
        while i > 0:
            if len(self.buckets) < 2:
                break
            if self.buckets[i][1] == self.buckets[i-1][1]:
                ts = min(self.buckets[i][0], self.buckets[i-1][0])
                size = self.buckets[i][1] * 2
                self.buckets.pop()
                self.buckets.pop()
                self.buckets.append((ts, size))
                i -= 2
            else:
                i -= 1
    
    def count_ones(self):
        return sum(bucket[1] for bucket in self.buckets)
    
    def get_actual_count(self):
        return sum(1 for bit, _ in self.current_window if bit == 1)

class SpotifyDGIMAnalyzer:
    def __init__(self, window_size=100):
        self.popularity_counter = DGIMCounter(window_size)
        self.energy_counter = DGIMCounter(window_size)
        self.dance_counter = DGIMCounter(window_size)
        
        # Thresholds
        self.popularity_threshold = 80
        self.energy_threshold = 0.8
        self.dance_threshold = 0.8
    
    def process_track(self, track_data):
        try:
            # Extract track info
            track_info = f"{track_data['name']} by {track_data['artist']}"
            
            # Process popularity
            popularity_bit = 1 if track_data['popularity'] >= self.popularity_threshold else 0
            self.popularity_counter.add(popularity_bit, track_info)
            
            # Process energy
            energy_bit = 1 if track_data['energy'] >= self.energy_threshold else 0
            self.energy_counter.add(energy_bit, track_info)
            
            # Process danceability
            dance_bit = 1 if track_data['danceability'] >= self.dance_threshold else 0
            self.dance_counter.add(dance_bit, track_info)
            
            # Print analysis
            self._print_analysis(track_info)
            
        except Exception as e:
            print(f"Error processing track: {e}")
    
    def _print_analysis(self, current_track):
        print(f"\nProcessing track: {current_track}")
        print("-" * 50)
        
        print("\nPopular Tracks (>= 80 popularity):")
        print(f"Estimated count: {self.popularity_counter.count_ones()}")
        print(f"Actual count: {self.popularity_counter.get_actual_count()}")
        
        print("\nHigh Energy Tracks (>= 0.8 energy):")
        print(f"Estimated count: {self.energy_counter.count_ones()}")
        print(f"Actual count: {self.energy_counter.get_actual_count()}")
        
        print("\nDanceable Tracks (>= 0.8 danceability):")
        print(f"Estimated count: {self.dance_counter.count_ones()}")
        print(f"Actual count: {self.dance_counter.get_actual_count()}")

def main():
    print("Starting Spotify DGIM Analysis...")
    
    try:
        # Create Kafka consumer
        consumer = KafkaConsumer(
            'spotify_stream',
            bootstrap_servers=['localhost:9092'],
            auto_offset_reset='latest',
            enable_auto_commit=True,
            value_deserializer=lambda x: json.loads(x.decode('utf-8'))
        )
        
        # Initialize analyzer
        analyzer = SpotifyDGIMAnalyzer(window_size=100)
        
        print("Waiting for messages...")
        
        # Process messages
        for message in consumer:
            track_data = message.value
            analyzer.process_track(track_data)
            
    except KeyboardInterrupt:
        print("\nAnalysis stopped by user")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        if 'consumer' in locals():
            consumer.close()

if __name__ == "__main__":
    main()

Starting Spotify DGIM Analysis...
Waiting for messages...

Processing track: APT. by ROSÉ
--------------------------------------------------

Popular Tracks (>= 80 popularity):
Error processing track: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got generator.
Error processing track: min() takes 1 positional argument but 2 were given

Processing track: luther (with sza) by Kendrick Lamar
--------------------------------------------------

Popular Tracks (>= 80 popularity):
Error processing track: [NOT_COLUMN_OR_STR] Argument `col` should be a Column or str, got generator.
Error processing track: min() takes 1 positional argument but 2 were given
Error processing track: min() takes 1 positional argument but 2 were given
Error processing track: min() takes 1 positional argument but 2 were given
Error processing track: min() takes 1 positional argument but 2 were given
Error processing track: min() takes 1 positional argument but 2 were given
Error processing track: min()

In [None]:
import mmh3  # for Bloom Filter
import math
import random
from collections import defaultdict
            
class BloomFilter:
    def __init__(self, size, num_hash_functions):
        self.size = size  
        self.num_hash_functions = num_hash_functions
        self.bit_array = [0] * size
    
    def add(self, item):
        for seed in range(self.num_hash_functions):
            index = mmh3.hash(str(item), seed) % self.size
            self.bit_array[index] = 1
    
    def check(self, item):
        for seed in range(self.num_hash_functions):
            index = mmh3.hash(str(item), seed) % self.size   
            if self.bit_array[index] == 0:
                return False
        return True
    
class ReservoirSampling:
    def __init__(self, k):
        self.k = k
        self.reservoir = []
        self.count = 0
        
    def add(self, item):
        self.count += 1
        if len(self.reservoir) < self.k: 
            self.reservoir.append(item)
        else:
            j = random.randrange(self.count)
            if j < self.k:
                self.reservoir[j] = item
     
    def get_sample(self):
        return self.reservoir

def apply_streaming_algorithms(df):
    """Apply streaming algorithms to the data"""
    # Initialize Bloom Filter for popular songs (popularity > 90)
    bloom = BloomFilter(size=1000, num_hash_functions=3)

    # Initialize Reservoir Sampling for random song selection
    reservoir = ReservoirSampling(k=10)

    # Process each song
    for song in df.collect():
        # Add highly popular songs to Bloom Filter
        if song.popularity > 90:
            bloom.add(song.name)

        # Add all songs to reservoir sampling
        reservoir.add({
            'name': song.name,
            'artist': song.artist,
            'popularity': song.popularity
        })
        
    return {
        'bloom_filter': bloom,
        'reservoir_sample': reservoir.get_sample()
    }
    
        

        