In [7]:
import findspark
findspark.init()

In [8]:
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import *

In [10]:
spark = SparkSession.builder.master('local').config('spark.executor.memory', '16g') \
    .config('spark.driver.memory', '16g').appName('Generating Genre Labels with LDA').getOrCreate()

spark.sparkContext.setLogLevel('ERROR')
sc = spark.sparkContext

In [11]:

# Helper functions:
# Define a function that takes in word ids and maps them to the actual words they represent
def id2word(topic, num_words, vocab):
    result = [vocab[topic[1][i]] for i in range(num_words)]
    return result

# given the topic distribution for an artist, finds the topic with the maximum probability and assigns the corresponding genre label
def label(topic_dist):
    topic_idx = list(topic_dist).index(max(topic_dist))
    topic_list = df_topics_final[topic_idx]
    for k, v in genres.items():
        if v == topic_list:
            return k

In [5]:
print('Reading in Data...')
df = spark.read.load('MSD_FINAL_me.parquet')

df.select('artist_terms').show()

limit_five = f.udf(lambda x: x[:5], ArrayType(StringType()))
df = df.withColumn('tokens', limit_five(df['artist_terms']))

df.select('tokens').show(truncate=False)

vector = CountVectorizer(inputCol='tokens', outputCol='count_vectors')
df_cvec_model = vector.fit(df)
df_counts = df_cvec_model.transform(df)  # tokens, count_vectors

df_counts.select('tokens', 'count_vectors').show()

tfidf = IDF(inputCol='count_vectors', outputCol='features')
df_tfidf_model = tfidf.fit(df_counts)
df_tfidf = df_tfidf_model.transform(df_counts)  # tokens, count_vectors, features

df_tfidf.select('tokens', 'count_vectors', 'features').show()

Reading in Data...


NameError: name 'spark' is not defined

In [None]:
number_of_topics = [3, 5, 8, 10]
# number_of_topics = [10]
df_topics_final = []
df_lda_transformed = None

for num_topics in number_of_topics:
    max_iter = 50
    lda = LDA(seed=1, optimizer="em", k=num_topics, maxIter=max_iter)
    df_lda_model = lda.fit(df_tfidf)
    df_lda_transformed = df_lda_model.transform(df_tfidf)

# Get topics and words
    df_topics = df_lda_model.topicsMatrix()
    df_vocab = df_cvec_model.vocabulary

    num_words = 5  # specify number of words per topic
    topic_word_ids = df_lda_model.describeTopics(maxTermsPerTopic=num_words).rdd.map(tuple)  # get ids of words in topics
    df_topics_final = topic_word_ids.map(
        lambda x: id2word(x, num_words, df_vocab)).collect()  # get the list of words for each topic

    for i in range(len(df_topics_final)):
        print("Topic " + str(i + 1) + ":")
        print(df_topics_final[i])
        print("\n\n")

In [None]:
# hard coded in order to assign human-understandable labels to the topics
genres = {
    'reggae': df_topics_final[0],
    'blues': df_topics_final[1],
    'electronic': df_topics_final[2],
    'chill': df_topics_final[3],
    'metal': df_topics_final[4],
    'country': df_topics_final[5],
    'rock': df_topics_final[6],
    'pop': df_topics_final[7],
    'hip hop': df_topics_final[8],
    'latin': df_topics_final[9]
}


get_genre = f.udf(lambda x: label(x))

df_labeled = df_lda_transformed.withColumn('genre', get_genre(df_lda_transformed['topicDistribution']))

df_labeled.select('genre').show(truncate=False)

df_final = df_labeled.drop('tokens', 'count_vectors', 'features', 'topicDistribution')

df_final.select('track_id', 'genre').show()

df_final.write.save('MSD_GENRE_me.parquet')
