In [None]:
!pip install pyspark

In [None]:
import pandas as pd
from kafka import KafkaProducer
from datetime import datetime
import time
import random
import numpy as np

# pip install kafka-python

KAFKA_TOPIC_NAME_CONS = "songTopic"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092'

if __name__ == "__main__":
    print("Kafka Producer Application Started ... ")

    kafka_producer_obj = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS,
                                       value_serializer=lambda x: x.encode('utf-8'))
    
    filepath = "tracks.csv"
    # This is the csv which has Spotify data.
    
    
    songs_df = pd.read_csv(filepath)
    #songs_df = songs_df[songs_df['release_date'] > '2020-01-01']
    songs_df = songs_df[songs_df['popularity'] > 50]
    # We use this filter to get popular songs streaming. This can be tuned based on your intrest.
    
    
    songs_df['order_id'] = np.arange(len(songs_df))
    
    songs_df['artists'] = songs_df['artists'].str.replace('[^a-zA-Z]', '')
    songs_df['id_artists'] = songs_df['id_artists'].str.replace('[^a-zA-Z]', '')
    
    # Some pre-processing performed for clean data.
    
    song_list = songs_df.to_dict(orient="records")
    
    
    

    message_list = []
    message = None
    for message in song_list:
        
        message_fields_value_list = []
        
        
        message_fields_value_list.append(message["order_id"])
        message_fields_value_list.append(message["id"])
        message_fields_value_list.append(message["name"])
        message_fields_value_list.append(message["popularity"])
        message_fields_value_list.append(message["duration_ms"])
        message_fields_value_list.append(message["explicit"])
        message_fields_value_list.append(message["artists"])
        message_fields_value_list.append(message["id_artists"])
        message_fields_value_list.append(message["release_date"])
        message_fields_value_list.append(message["danceability"])
        message_fields_value_list.append(message["energy"])
        message_fields_value_list.append(message["key"])
        message_fields_value_list.append(message["loudness"])
        message_fields_value_list.append(message["mode"])
        message_fields_value_list.append(message["speechiness"])
        message_fields_value_list.append(message["acousticness"])
        message_fields_value_list.append(message["instrumentalness"])
        message_fields_value_list.append(message["liveness"])
        message_fields_value_list.append(message["valence"])
        message_fields_value_list.append(message["tempo"])
        message_fields_value_list.append(message["time_signature"])



        message = ','.join(str(v) for v in message_fields_value_list)
        print("Message Type: ", type(message))
        print("Message: ", message)
        kafka_producer_obj.send(KAFKA_TOPIC_NAME_CONS, message)
        time.sleep(1)


    print("Kafka Producer Application Completed. ")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler
import random

import time

kafka_topic_name = "songTopic"
kafka_bootstrap_servers = 'localhost:9092'

spark = SparkSession \
        .builder \
        .appName("Spotify Streaming Reccomendation System") \
        .master("local[*]") \
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

In [None]:
# Construct a streaming DataFrame that reads from test-topic
songs_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()

songs_df1 = songs_df.selectExpr("CAST(value AS STRING)", "timestamp")


songs_schema_string = "order_id INT,id STRING, name STRING,popularity INT, duration_ms DOUBLE, explicit INT, " \
                           + "artists STRING, id_artists STRING, release_date STRING, " \
                           + "danceability DOUBLE," \
                           + "energy DOUBLE, key INT, loudness DOUBLE, " \
                           + "mode INT," \
                           + "speechiness DOUBLE," \
                           + "acousticness DOUBLE, instrumentalness DOUBLE, liveness DOUBLE, " \
                           + "valence DOUBLE, tempo DOUBLE, time_signature DOUBLE"



songs_df2 = songs_df1 \
        .select(from_csv(col("value"), songs_schema_string) \
                .alias("song"), "timestamp")


songs_df3 = songs_df2.select("song.*", "timestamp")

Favorite song data generated using Spotify API

In [None]:
import pandas as pd
from spotify_api import getSong
song_data = getSong.passs()
#song_data.rename(columns={'duration_s': 'duration_ms' }, inplace=True)
song_data = song_data.drop(['id', 'added_at', 'time_signature','duration_s'], axis='columns')
rand_n = random. randint(0,len(song_data)-1)
add_df = song_data.head(rand_n)[-1:]

In [None]:
#!/usr/bin/env python
# coding: utf-8


import os
#import my_spotify_credentials as credentials
import numpy as np
import pandas as pd
import ujson
import spotipy
import spotipy.util
import seaborn as sns

# fill your credentials here.
os.environ["SPOTIPY_CLIENT_ID"] = ''
os.environ["SPOTIPY_CLIENT_SECRET"] = ''
os.environ["SPOTIPY_REDIRECT_URI"] = ''

scope = 'user-library-read'
username = ''

token = spotipy.util.prompt_for_user_token(username, scope)

if token:
    spotipy_obj = spotipy.Spotify(auth=token)
    saved_tracks_resp = spotipy_obj.current_user_saved_tracks(limit=50)
else:
    print('Couldn\'t get token for that username')
    
number_of_tracks = saved_tracks_resp['total']
print('%d tracks' % number_of_tracks)

def save_only_some_fields(track_response):
    return {        
        'id': str(track_response['track']['id']),
        'name': str(track_response['track']['name']),
        'artists': [artist['name'] for artist in track_response['track']['artists']],
        'duration_ms': track_response['track']['duration_ms'],
        'popularity': track_response['track']['popularity'],
        'added_at': track_response['added_at']
    }

tracks = [save_only_some_fields(track) for track in saved_tracks_resp['items']]

while saved_tracks_resp['next']:
    saved_tracks_resp = spotipy_obj.next(saved_tracks_resp)
    tracks.extend([save_only_some_fields(track) for track in saved_tracks_resp['items']])


tracks_df = pd.DataFrame(tracks)
pd.set_option('display.max_rows', len(tracks))


tracks_df['artists'] = tracks_df['artists'].apply(lambda artists: artists[0])
tracks_df['duration_ms'] = tracks_df['duration_ms'].apply(lambda duration: duration/1000)

tracks_df = tracks_df.rename(columns = {'duration_ms':'duration_s'})




audio_features = {}

for idd in tracks_df['id'].tolist():
    audio_features[idd] = spotipy_obj.audio_features(idd)[0]
    
tracks_df['acousticness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['acousticness'])
tracks_df['speechiness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['speechiness'])
tracks_df['key'] = tracks_df['id'].apply(lambda idd: str(audio_features[idd]['key']))
tracks_df['liveness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['liveness'])
tracks_df['instrumentalness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['instrumentalness'])
tracks_df['energy'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['energy'])
tracks_df['tempo'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['tempo'])
tracks_df['time_signature'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['time_signature'])
tracks_df['loudness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['loudness'])
tracks_df['danceability'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['danceability'])
tracks_df['valence'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['valence'])


class getSong(): 
    def __init__(self):
        super(getSong, self).__init__()
        
    def passs():

        return tracks_df


In [None]:
df = spark.sql("SELECT * FROM testedTable5")

df = df.sort(df.release_date.desc())

df_stream = df

df = df.drop('order_id',
 'id',
 'explicit',
  'mode',
 'release_date',
 'id_artists',
 'time_signature',
 'duration_ms',
 'timestamp')

df_sp = spark.createDataFrame(add_df)
df = df.union(df_sp)
from pyspark.ml.feature import VectorAssembler
assembler=VectorAssembler(inputCols=[
 'danceability',
 'energy',
 'loudness',
 'speechiness',
 'acousticness',
 'instrumentalness',
 'liveness',
 'valence',
 'tempo'], outputCol='features')
assembled_data=assembler.setHandleInvalid("skip").transform(df)

In [None]:
from pyspark.ml.feature import StandardScaler
scale=StandardScaler(inputCol='features',outputCol='standardized')
data_scale=scale.fit(assembled_data)
df=data_scale.transform(assembled_data)

K MEANS CLUSTER


In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
silhouette_score=[]
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='standardized', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')


KMeans_algo=KMeans(featuresCol='standardized', k=3)
    
KMeans_fit=KMeans_algo.fit(df)
    
output_df =KMeans_fit.transform(df)