# Import Packages and Create Spark Builder

In [None]:
import pandas as pd
import numpy as np 
import re as re
import databricks.koalas as ks

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.ml.clustering import LDA, LDAModel
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Word2Vec

from nltk.stem.wordnet import WordNetLemmatizer
from nltk.corpus import stopwords

In [None]:
spark = SparkSession\
    .builder\
    .master("local[*]")\
    .appName("SpotifyPodcastClassification")\
    .config('spark.driver.memory', '24g')\
    .getOrCreate()

# 1. Read full data
## 1.1 Podcasts data

In [None]:
podcasts_df = spark.read.options(inferSchema='True',delimiter=',',header='True') \
                               .csv("../script_output/02_final_dat.csv")
podcasts_df.count()

105153

In [None]:
podcasts_df = podcasts_df.select('episode_uri', 'show_description')
podcasts_df.show(5)

+--------------------+--------------------+
|         episode_uri|    show_description|
+--------------------+--------------------+
|spotify:episode:0...|A 20-something bl...|
|spotify:episode:0...|Ever wonder what ...|
|spotify:episode:0...|Inside the 18 is ...|
|spotify:episode:0...|Your favorite pod...|
|spotify:episode:0...|The comedy podcas...|
+--------------------+--------------------+
only showing top 5 rows


## 1.2 LDA result data (topics in each cluster)

In [None]:
clusters_df = spark.read.options(inferSchema='True', delimiter=',', header='True')\
    .csv('../script_output/02_LDA_topics.csv')
clusters_kdf = clusters_df.to_koalas()
clusters_kdf.head()

	topic_0	topic_1	topic_2	topic_3	topic_4
0	fun facts	social media	true crime	mental health	pop culture
1	stories fun	talk things	cover art	think optimal	law attraction
2	kid kid	personal development	audio experience	daily audioblog	weight loss
3	parent time	answer question	help revise	audioblog blogcast	best sell
4	kid hear	tip trick	core components	daily audioblog blogcast	relate topics


# 2. Functions

In [None]:
def cossim(v1, v2): 
    return np.dot(v1, v2) / np.sqrt(np.dot(v1, v1)) / (np.sqrt(np.dot(v2, v2))+.1)


# 3. Apply Regex Tokenizer and StopWord Remover

In [None]:
regextok = RegexTokenizer(gaps = False, pattern = '\w+', 
                          inputCol = 'show_description', outputCol = 'tokens')
stopwrmv = StopWordsRemover(inputCol = 'tokens', outputCol = 'tokens_sw_removed')

podcasts_df = regextok.transform(podcasts_df)
podcasts_df = stopwrmv.transform(podcasts_df)
podcasts_df.show()

+--------------------+--------------------+--------------------+--------------------+
|         episode_uri|    show_description|              tokens|   tokens_sw_removed|
+--------------------+--------------------+--------------------+--------------------+
|spotify:episode:0...|A 20-something bl...|[a, 20, something...|[20, something, b...|
|spotify:episode:0...|Ever wonder what ...|[ever, wonder, wh...|[ever, wonder, mu...|
|spotify:episode:0...|Inside the 18 is ...|[inside, the, 18,...|[inside, 18, sour...|
|spotify:episode:0...|Your favorite pod...|[your, favorite, ...|[favorite, podcas...|
|spotify:episode:0...|The comedy podcas...|[the, comedy, pod...|[comedy, podcast,...|
|spotify:episode:0...|Podcasts useful f...|[podcasts, useful...|[podcasts, useful...|
|spotify:episode:0...|Enter the world o...|[enter, the, worl...|[enter, world, do...|
|spotify:episode:0...|We are four, 30 s...|[we, are, four, 3...|[four, 30, someth...|
|spotify:episode:0...|Get ready to whit...|[get, ready, to, ...|[get, ready, whit...|
|spotify:episode:0...|Let’s be real and...|[let, s, be, real...|[let, real, hones...|
|spotify:episode:0...|Behaviorbabe (aka...|[behaviorbabe, ak...|[behaviorbabe, ak...|
|spotify:episode:0...|In dance, the poc...|[in, dance, the, ...|[dance, pocket, p...|
|spotify:episode:0...|Welcome to the of...|[welcome, to, the...|[welcome, officia...|
|spotify:episode:0...|Let other student...|[let, other, stud...|[let, students, h...|
|spotify:episode:0...|Welcome to The Po...|[welcome, to, the...|[welcome, pole, d...|
|spotify:episode:0...|The eCommerce Lif...|[the, ecommerce, ...|[ecommerce, lifes...|
|spotify:episode:0...|Because there’s a...|[because, there, ...|[million, ways, m...|
|spotify:episode:0...|Does music make y...|[does, music, mak...|[music, make, fee...|
|spotify:episode:0...|Too much analytic...|[too, much, analy...|[much, analytics,...|
|spotify:episode:0...|What does it take...|[what, does, it, ...|[take, powerful, ...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 20 rows


# 4. Train Word2Vec Model

In [None]:
word2vec = Word2Vec(vectorSize = 10, minCount = 2, inputCol = 'tokens_sw_removed', outputCol = 'wordvectors')
model = word2vec.fit(podcasts_df)
wordvectors = model.transform(podcasts_df)
wordvectors.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|         episode_uri|    show_description|              tokens|   tokens_sw_removed|         wordvectors|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|spotify:episode:0...|A 20-something bl...|[a, 20, something...|[20, something, b...|[-0.0134726116795...|
|spotify:episode:0...|Ever wonder what ...|[ever, wonder, wh...|[ever, wonder, mu...|[0.50170180455167...|
|spotify:episode:0...|Inside the 18 is ...|[inside, the, 18,...|[inside, 18, sour...|[-0.4880954223793...|
|spotify:episode:0...|Your favorite pod...|[your, favorite, ...|[favorite, podcas...|[-0.0872882489974...|
|spotify:episode:0...|The comedy podcas...|[the, comedy, pod...|[comedy, podcast,...|[-0.3465767683404...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows

In [None]:
podcasts_desc = wordvectors.select('episode_uri','show_description','wordvectors').rdd.toDF()
podcasts_desc.show(5)

+--------------------+--------------------+--------------------+
|         episode_uri|    show_description|         wordvectors|
+--------------------+--------------------+--------------------+
|spotify:episode:0...|A 20-something bl...|[-0.0134726116795...|
|spotify:episode:0...|Ever wonder what ...|[0.50170180455167...|
|spotify:episode:0...|Inside the 18 is ...|[-0.4880954223793...|
|spotify:episode:0...|Your favorite pod...|[-0.0872882489974...|
|spotify:episode:0...|The comedy podcas...|[-0.3465767683404...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



In [None]:
chunk = podcasts_desc.take(podcasts_desc.count())

# 5. Export Result to Desktop due to RAM Limitation

In [None]:
for idx,i in enumerate(clusters_kdf.columns.tolist()): 
    # Build search query
    search_query = ' '.join(clusters_kdf[i].tolist())
    
    # Transform search query to vectors
    query_df  = spark.sparkContext.parallelize([(1,search_query)]).toDF(['index','show_description'])
    query_tok = regextok.transform(query_df)
    query_swr = stopwrmv.transform(query_tok)
    query_vec = model.transform(query_swr)
    query_vec = query_vec.select('wordvectors').collect()[0][0]
    
    sim_rdd = spark.sparkContext.parallelize((i[0], i[1], float(cossim(query_vec, i[2]))) for i in chunk)
    sim_df  = (spark.createDataFrame(sim_rdd)
                     .withColumnRenamed('_1', 'episode_uri')
                     .withColumnRenamed('_2', 'show_description')
                     .withColumnRenamed('_3', f'query_{idx}_similarity')
                     .to_koalas())
    
    sim_df.to_parquet(f'/Users/christianwiloejo/Desktop/similarity_matrix_{idx}.parquet')
    print(f"Done with loop {idx+1}/5")
    # result.append(sim_df)
    

21/12/11 12:24:09 WARN TaskSetManager: Stage 83 contains a task of very large size (2956 KiB). The maximum recommended task size is 1000 KiB.
21/12/11 12:24:09 WARN TaskSetManager: Stage 84 contains a task of very large size (2956 KiB). The maximum recommended task size is 1000 KiB.
                                                                                
Done with loop 1/5
21/12/11 12:24:14 WARN TaskSetManager: Stage 89 contains a task of very large size (2956 KiB). The maximum recommended task size is 1000 KiB.
21/12/11 12:24:14 WARN TaskSetManager: Stage 90 contains a task of very large size (2956 KiB). The maximum recommended task size is 1000 KiB.
Done with loop 2/5
21/12/11 12:24:17 WARN TaskSetManager: Stage 95 contains a task of very large size (2956 KiB). The maximum recommended task size is 1000 KiB.
21/12/11 12:24:18 WARN TaskSetManager: Stage 96 contains a task of very large size (2956 KiB). The maximum recommended task size is 1000 KiB.
Done with loop 3/5
21/12/11 12:24:20 WARN TaskSetManager: Stage 101 contains a task of very large size (2956 KiB). The maximum recommended task size is 1000 KiB.
21/12/11 12:24:21 WARN TaskSetManager: Stage 102 contains a task of very large size (2956 KiB). The maximum recommended task size is 1000 KiB.
Done with loop 4/5
21/12/11 12:24:24 WARN TaskSetManager: Stage 107 contains a task of very large size (2956 KiB). The maximum recommended task size is 1000 KiB.
21/12/11 12:24:24 WARN TaskSetManager: Stage 108 contains a task of very large size (2956 KiB). The maximum recommended task size is 1000 KiB.
Done with loop 5/5

# 6. Import All Result and Concatenate by Index

In [None]:
result0 = pd.read_parquet('/Users/christianwiloejo/Desktop/similarity_matrix_0.parquet')
result1 = pd.read_parquet('/Users/christianwiloejo/Desktop/similarity_matrix_1.parquet')
result2 = pd.read_parquet('/Users/christianwiloejo/Desktop/similarity_matrix_2.parquet')
result3 = pd.read_parquet('/Users/christianwiloejo/Desktop/similarity_matrix_3.parquet')
result4 = pd.read_parquet('/Users/christianwiloejo/Desktop/similarity_matrix_4.parquet')

In [None]:
result_df = pd.concat([result0, result1, result2, result3, result4], axis = 1)
result_df.shape

(105153, 15)

In [None]:
result_df = result_df.iloc[:, [0,1,2,5,8,11,14]]
result_df.columns = ['episode_uri', 'show_description', 'topic_0', 'topic_1', 'topic_2', 'topic_3', 'topic_4']
result_df

	episode_uri	show_description	topic_0	topic_1	topic_2	topic_3	topic_4
0	spotify:episode:000A9sRBYdVh66csG2qEdj	A 20-something blunt female takes on the world...	0.325469	0.311075	0.165217	0.135384	-0.031386
1	spotify:episode:000HP8n3hNIfglT2wSI2cA	Ever wonder what murder took place on today in...	0.054971	0.154958	0.141471	-0.327679	-0.588722
2	spotify:episode:001UfOruzkA3Bn1SPjcdfa	Inside the 18 is your source for all things Go...	-0.373154	-0.072368	0.297827	-0.170043	-0.164945
3	spotify:episode:001i89SvIQgDuuyC53hfBm	Your favorite podcast for everything @Chiefs! ...	0.214444	0.214337	-0.019471	0.216056	-0.143863
4	spotify:episode:0025RWNwe2lnp6HcnfzwzG	The comedy podcast about toxic characters, wri...	-0.228455	-0.241206	-0.255342	0.029290	-0.022333
...	...	...	...	...	...	...	...
105148	spotify:episode:7zzQnjBXqDApvnm1hLPzVY	Each week, John Rocha and Matt Knost breakdown...	-0.097527	0.403032	0.216955	-0.088191	0.078084
105149	George Khalife	"After connecting with someone, what's the nex...	0.225570	0.241627	0.254389	0.157477	-0.109310
105150	['en']	"Life & Peak Performance Coach. I Teach Self-R...	0.012979	0.627565	0.258692	0.183251	0.429806
105151	spotify:episode:7zzoT4r0Rhffyegk2HJ9N8	The best & funniest independent cricket podcas...	-0.119022	-0.243818	-0.151341	-0.446955	-0.556114
105152	spotify:episode:7zzq8sPfKAO4V7uIyhpbVd	Join this Mother-Daughter team for their weekl...	0.103875	0.013520	-0.349083	-0.053459	-0.086903
105153 rows × 7 columns

## 6.1 Label each podcast to a cluster based on max similarity score

In [None]:
result_df['cluster'] = result_df.iloc[:, 2:].idxmax(axis = 1)
result_df

	episode_uri	show_description	topic_0	topic_1	topic_2	topic_3	topic_4	cluster
0	spotify:episode:000A9sRBYdVh66csG2qEdj	A 20-something blunt female takes on the world...	0.325469	0.311075	0.165217	0.135384	-0.031386	topic_0
1	spotify:episode:000HP8n3hNIfglT2wSI2cA	Ever wonder what murder took place on today in...	0.054971	0.154958	0.141471	-0.327679	-0.588722	topic_1
2	spotify:episode:001UfOruzkA3Bn1SPjcdfa	Inside the 18 is your source for all things Go...	-0.373154	-0.072368	0.297827	-0.170043	-0.164945	topic_2
3	spotify:episode:001i89SvIQgDuuyC53hfBm	Your favorite podcast for everything @Chiefs! ...	0.214444	0.214337	-0.019471	0.216056	-0.143863	topic_3
4	spotify:episode:0025RWNwe2lnp6HcnfzwzG	The comedy podcast about toxic characters, wri...	-0.228455	-0.241206	-0.255342	0.029290	-0.022333	topic_3
...	...	...	...	...	...	...	...	...
105148	spotify:episode:7zzQnjBXqDApvnm1hLPzVY	Each week, John Rocha and Matt Knost breakdown...	-0.097527	0.403032	0.216955	-0.088191	0.078084	topic_1
105149	George Khalife	"After connecting with someone, what's the nex...	0.225570	0.241627	0.254389	0.157477	-0.109310	topic_2
105150	['en']	"Life & Peak Performance Coach. I Teach Self-R...	0.012979	0.627565	0.258692	0.183251	0.429806	topic_1
105151	spotify:episode:7zzoT4r0Rhffyegk2HJ9N8	The best & funniest independent cricket podcas...	-0.119022	-0.243818	-0.151341	-0.446955	-0.556114	topic_0
105152	spotify:episode:7zzq8sPfKAO4V7uIyhpbVd	Join this Mother-Daughter team for their weekl...	0.103875	0.013520	-0.349083	-0.053459	-0.086903	topic_0
105153 rows × 8 columns

# 7. Join the Full Metadata for Manual Evaluation

In [None]:
full_data = pd.read_csv('../script_output/episode_transcript_data_w_metadata.csv')
full_data = full_data[['show_uri', 'show_name', 'publisher', 'episode_uri', 'episode_name', 'episode_description', 'transcript']]
full_data.head()

In [None]:
final_result = pd.merge(result_df, full_data, on='episode_uri', how = 'left')
print(final_result.shape)
final_result.head()

In [None]:
final_result.to_csv('../script_output/03a_lda_word2vec_final_result.csv')

In [None]:
manual_annotate_podcasts = final_result.loc[:50,['episode_uri', 'show_description', 'cluster']]
manual_annotate_podcasts.to_csv('../script_output/03a_lda_word2vec_manually_annotate.csv', index = False)