In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.express as px
import seaborn as sns
import findspark
findspark.init()

import pyspark
findspark.find()
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, desc , col, max
from pyspark.ml.feature import  StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
import os
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.appName("WebApp").getOrCreate()
my_model= ALSModel.load("model_recommend_song_for_user")
recommend_artist_model= ALSModel.load("model_recommend_artist_for_user")
# model.recommendForAllItems(10).show(3, False)


In [3]:
data_song = pd.read_csv("song_data.csv")
data_user = pd.read_csv('10000.txt', sep='\t', header=None)
data_user.columns = ['user_id', 'song_id', 'listen_count']
data_user=data_user.dropna()
data_song=data_song.dropna()

df_listenings_agg  = spark.createDataFrame(data_user)
df_song  = spark.createDataFrame(data_song)
df_listenings_agg = df_listenings_agg.limit(20000)

old_strindexer = [StringIndexer(inputCol = col, outputCol = col + '_index').fit(df_listenings_agg) for col in list(set(df_listenings_agg.columns)- set(['count']))]
indexer = [curr_strindexer.setHandleInvalid("keep") for curr_strindexer in old_strindexer]
pipeline = Pipeline(stages = indexer)
data_temp = pipeline.fit(df_listenings_agg).transform(df_listenings_agg)

## for artist
df_artist_listen=df_listenings_agg.join(df_song.select("song_id","artist_name").distinct(),
                       df_listenings_agg.song_id==df_song.song_id
                       ,"inner").select('user_id','artist_name', 'listen_count').groupby('user_id','artist_name').sum('listen_count').withColumnRenamed("sum(listen_count)", 'listen_count')
#convert column to numberic
old_strindexer = [StringIndexer(inputCol = col, outputCol = col + '_index').fit(df_artist_listen) for col in list(set(df_artist_listen.columns)- set(['count']))]
indexer = [curr_strindexer.setHandleInvalid("keep") for curr_strindexer in old_strindexer]
pipeline = Pipeline(stages = indexer)
df_artist_listen = pipeline.fit(df_artist_listen).transform(df_artist_listen)
df_artist_listen = df_artist_listen.withColumn("listen_count", df_artist_listen["listen_count"].cast('double'))

  for column, series in pdf.iteritems():


In [8]:
## Function for recommend user for a Song

def get_index_of_user_id(user_id):
    result=data_temp.filter(data_temp["user_id"]==user_id).select("user_id_index")
    if result.rdd.isEmpty():
#     if result.empty:
        return -1
    return result.select("user_id_index").collect()[0][0]


def get_recs_for_user(recs):
    recs = recs.select("recommendations.song_id_index","recommendations.rating")
    songs=recs.select("song_id_index").toPandas().iloc[0,0]
    listencount=recs.select("rating").toPandas().iloc[0,0]
    ltc_matrix = pd.DataFrame(songs,columns=["song_id_index"])
    ltc_matrix["prediction"] = listencount
    ltc_matrix_ps = spark.createDataFrame(ltc_matrix)
    ltc_matrix_ps=ltc_matrix_ps.join(data_temp.select("song_id","song_id_index").distinct(),               
                     ltc_matrix_ps.song_id_index==data_temp.song_id_index
                    ,"inner"
                    ).select("song_id","prediction")
    ltc_matrix_ps=ltc_matrix_ps.withColumn("prediction", F.abs(F.round(ltc_matrix_ps["prediction"],0)))
    ltc_matrix_ps=df_song.join(ltc_matrix_ps,
                              df_song.song_id==ltc_matrix_ps.song_id, 
                              'inner').drop(ltc_matrix_ps.song_id).select("song_id","title","release","artist_name","year","prediction")
    return ltc_matrix_ps

## Function for recommend user for a Song
def get_index_of_song_id(user_id):
    result=data_temp.filter(data_temp["song_id"]==user_id)
    if result.rdd.isEmpty():
        return -1
    return result.select("song_id_index").collect()[0][0]


def get_recs_for_a_song(recs):
    recs = recs.select("recommendations.user_id_index","recommendations.rating")
    users=recs.select("user_id_index").toPandas().iloc[0,0]
    listencount=recs.select("rating").toPandas().iloc[0,0]
    ltc_matrix = pd.DataFrame(users,columns=["user_id_index"])
    ltc_matrix["prediction"] = listencount
    ltc_matrix_ps = spark.createDataFrame(ltc_matrix)
    ltc_matrix_ps=ltc_matrix_ps.join(data_temp.select("user_id","user_id_index").distinct(),               
                     ltc_matrix_ps.user_id_index==data_temp.user_id_index
                    ,"inner"
                    ).select("user_id","prediction")
    ltc_matrix_ps=ltc_matrix_ps.withColumn("prediction", F.abs(F.round(ltc_matrix_ps["prediction"],0)))

    return ltc_matrix_ps

## Function for recommend artist for a User
def get_index_of_user_id_for_artist(user_id):
    result=df_artist_listen.filter(df_artist_listen["user_id"]==user_id).select("user_id_index").collect()[0][0]
    return result

def get_recs_artist_for_user(recs):
    recs = recs.select("recommendations.artist_name_index","recommendations.rating")
    songs=recs.select("artist_name_index").toPandas().iloc[0,0]
    listencount=recs.select("rating").toPandas().iloc[0,0]
    ltc_matrix = pd.DataFrame(songs,columns=["artist_name_index"])
    ltc_matrix["prediction"] = listencount
    ltc_matrix_ps = spark.createDataFrame(ltc_matrix)
    ltc_matrix_ps=ltc_matrix_ps.join(df_artist_listen.select("artist_name","artist_name_index").distinct(),               
                     ltc_matrix_ps.artist_name_index==df_artist_listen.artist_name_index
                    ,"inner"
                    ).select("artist_name","prediction")
    ltc_matrix_ps=ltc_matrix_ps.withColumn("prediction", F.abs(F.round(ltc_matrix_ps["prediction"],0)))
    return ltc_matrix_ps

In [5]:
import anvil.server
anvil.server.connect("FHLEHGTRX7RX6VWDENJFZEO7-4D5ZN6QGSZYZIP3C")


Connecting to wss://anvil.works/uplink
Anvil websocket open
Connected to "Default environment" as SERVER


In [6]:
@anvil.server.callable
def get_recommend_for_a_user(user_id):
    recs_user = my_model.recommendForAllUsers(9)
#     user_id="b80344d063b5ccb3212f76538f3d9e43d87dca9e"
    user_id_index=get_index_of_user_id(user_id)
    if(user_id_index<0):
        return []
    # recommend for a special user
    a=get_recs_for_user(recs_user.filter(recs_user["user_id_index"]==user_id_index)) 
    return a.toPandas().to_numpy().tolist()#a.toPandas().to_dict('records')

@anvil.server.callable
def get_recommend_for_a_song(song_id):
    recs_song = my_model.recommendForAllItems(9)
#     song_id="SOZPLKR12A6D4F8421"
    song_id_index=get_index_of_song_id(song_id)
    if(song_id_index<0):
        return []
    # recommend for a special song
    a=get_recs_for_a_song(recs_song.filter(recs_song["song_id_index"]==song_id_index))

    a=a.orderBy("prediction")
    # anvil.server.wait_forever()
    return a.toPandas().to_numpy().tolist()
@anvil.server.callable
def get_recommend_artist_for_a_user(user_id):
    recs_user = recommend_artist_model.recommendForAllUsers(9)
#     user_id="b80344d063b5ccb3212f76538f3d9e43d87dca9e"
    user_id_index=get_index_of_user_id_for_artist(user_id)
    if(user_id_index<0):
        return []
    # recommend for a special user
    a=get_recs_artist_for_user(recs_user.filter(recs_user["user_id_index"]==user_id_index))

    a=a.orderBy("prediction")
    # anvil.server.wait_forever()
    return a.toPandas().to_numpy().tolist()