In [1]:
import pyspark.sql.types as T
import pyspark.sql.functions as F

from pyspark import SparkContext

#sc = SparkContext()

charts_df = spark.table("weekly_charts_table")
charts_df.show(3)
charts_df.printSchema()

In [2]:
#UDF to get the ids separated with space as a document
from pyspark.sql.functions import udf

@udf(T.ArrayType(T.StringType()))
def get_ids(songs):
  return [s["id"] for s in songs]

charts_df = charts_df.withColumn("songs_array", get_ids(charts_df.songs))

In [3]:
charts_df.count()

In [4]:
charts_time_df = charts_df.filter("day = '2019-05-23'")
charts_time_df.count()

In [5]:
#Count number of songs in each country
charts_time_df.select("country", F.size(charts_time_df.songs_array).alias('n')).show()

In [6]:
charts_time_df.limit(1).select("songs_array").show(truncate=False)

In [7]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="songs_array", outputCol="features").fit(charts_time_df)
cv_result = cv.transform(charts_time_df)
cv_result.show()

In [8]:
cv.vocabulary

In [9]:
from pyspark.ml.feature import IDF

idf = IDF(inputCol="features", outputCol="tfidf").fit(cv_result)
tfidf = idf.transform(cv_result)
tfidf.show()

In [10]:
@udf(T.MapType(T.StringType(), T.DoubleType()))
def map_weights(tfidf_vector, vocab):
  vocab = vocab.split(",")
  values = tfidf_vector.values
  indices = tfidf_vector.indices
  map_idf = {vocab[i]: float(v) for i, v in zip(indices, values)} # {6: 0.9162907318741551, 7: 0.9162907318741551, 8: 0.9162907318741551, 11: 0.9162907318741551}
  return map_idf

tfidf = tfidf.withColumn("weights", map_weights(tfidf.tfidf, F.lit(",".join(cv.vocabulary)))) #Dirty making the join array is not literal ¯\_(ツ)_/¯
tfidf.select("country", "day", "weights").show()

In [11]:
eDF = tfidf.select(F.explode(tfidf.weights).alias("id", "weight"), "country", "day")
eDF.show()

In [12]:
eDF.count()

In [13]:
audio_df = spark.table("song_features_table")
audio_df.show()
audio_df.printSchema()

In [14]:
audio_df = audio_df.select("s_id","acousticness","danceability","energy","liveness","loudness","speechiness","tempo","valence")

In [15]:
eDF = eDF.join(audio_df, eDF.id == audio_df.s_id,how='left')
eDF.show()

In [16]:
#Remove all those songs that do not have audio features
eDF = eDF.filter("s_id is not NULL")
eDF.show(10)

In [17]:
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def weighted_average(audio_feaures, weight):
  #Compute weighted average
  return sum(audio_feaures * weight) / sum(weight)

weighted_df = eDF.groupBy("country").agg(weighted_average(eDF["acousticness"], eDF["weight"]).alias("acousticness"),
                           weighted_average(eDF["danceability"], eDF["weight"]).alias("danceability"),
                           weighted_average(eDF["energy"], eDF["weight"]).alias("energy"),
                           weighted_average(eDF["liveness"], eDF["weight"]).alias("liveness"),
                           weighted_average(eDF["loudness"], eDF["weight"]).alias("loudness"),
                           weighted_average(eDF["speechiness"], eDF["weight"]).alias("speechiness"),
                           weighted_average(eDF["tempo"], eDF["weight"]).alias("tempo"),
                           weighted_average(eDF["valence"], eDF["weight"]).alias("valence"))
weighted_df.show(62, truncate=False)

In [18]:
#Maybe scale the data
#Apply K-means