In [50]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from customTransformer import KonlpTokenizer
from pyspark.ml.feature import CountVectorizer

from pyspark.sql.functions import *
from pyspark.ml.feature import IDF
from pyspark.ml.clustering import LDA

from pyspark.sql.types import ArrayType, StringType, FloatType, IntegerType

In [2]:
conf = SparkConf()
sc = SparkContext(conf=conf)
spark = SparkSession.builder.appName("korean lyric analysis").getOrCreate()

In [3]:
df = spark.read\
.format("com.databricks.spark.csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("lyric_ko_pre.csv")

In [4]:
df.show(3)

+--------------------+-----------+--------------------+
|               title|     artist|               lyric|
+--------------------+-----------+--------------------+
|             그때 또 다시|         유노|돌이켜보면 너같은 사람 나에게 ...|
|      Love Love Love|선비(SunBee..|Are you there my ...|
|Butterfly (Prod. ...|  LambC(램씨)|There is a time t...|
+--------------------+-----------+--------------------+
only showing top 3 rows



In [5]:
df.count()

78036

In [6]:
tokenizer = KonlpTokenizer(inputCol="lyric", outputCol="words")

In [7]:
tokensDF = tokenizer.transform(df)

In [12]:
tokensDF = tokensDF.filter(size(col("words")) > 3).cache()

In [13]:
vectorizer = CountVectorizer(inputCol="words", outputCol="rawFeatures")
cvmodel = vectorizer.fit(tokensDF)
featurizedData = cvmodel.transform(tokensDF)

vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)

In [14]:
len(vocab)

81288

In [15]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [20]:
lda = LDA(k=5, seed=123, optimizer="em", featuresCol="features")

ldamodel = lda.fit(rescaledData)
ldatopics = ldamodel.describeTopics()

In [21]:
def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

In [22]:
udf_map_termID_to_Word = udf(map_termID_to_Word , ArrayType(StringType()))
ldatopics_mapped = ldatopics.withColumn("topic_desc", udf_map_termID_to_Word(ldatopics.termIndices))
ldatopics_mapped.select(ldatopics_mapped.topic, ldatopics_mapped.topic_desc).show(50,False)

+-----+-----------------------------------------+
|topic|topic_desc                               |
+-----+-----------------------------------------+
|0    |[그대, 사랑, 다시, 시간, 우리, 눈물, 없는, 내게, 기억, 사람] |
|1    |[오늘, 너무, 우리, 그냥, 싶어, 같은, 지금, 있어, 그대, 몰라] |
|2    |[그대, 사랑, 사랑해, 눈물, 당신, 사람, 나를, 마음, 다시, 없는]|
|3    |[예뻐, 모두, 인생, 없어, 우린, 여자, 오늘, 그냥, 지금, 이제] |
|4    |[주님, 사랑, 노래, 당신, 세상, 주의, 그대, 우리, 하나님, 하늘]|
+-----+-----------------------------------------+



In [23]:
ldaResults = ldamodel.transform(rescaledData)

In [24]:
def breakout_array(index_number, record):
    vectorlist = record.tolist()
    return vectorlist[index_number]

udf_breakout_array = udf(breakout_array, FloatType())

In [68]:
ldaResults.select("title", "topicDistribution").show()

+--------------------+--------------------+
|               title|   topicDistribution|
+--------------------+--------------------+
|             그때 또 다시|[0.18073939857700...|
|      Love Love Love|[0.34864345792893...|
|            GOOD BAD|[0.25979247691762...|
|             너무 원했기에|[0.37514571741299...|
|               이별중이야|[0.60426807783482...|
|                Time|[0.26737701644818...|
|우유부단 (Prod. by K ...|[0.13485250946308...|
|  기억이 난다 (Feat. 강슬기)|[0.19592398153157...|
|                  그냥|[0.21556569050354...|
|                  이유|[0.10228298743847...|
|               그대이기에|[0.39238697832568...|
|                Dawn|[0.14727578965663...|
|party (SHUT DOWN)...|[0.36854286068614...|
|               그렇게 또|[0.22076263620240...|
|                 매화향|[0.15274717539087...|
| 어디로 가나요 (Feat. 신지윤)|[0.07603512771547...|
|               데이트할까|[0.12788500930243...|
|             Show Me|[0.30439044678478...|
| Save me (Feat. 김환수)|[0.27919822092340...|
|            시간이 춤을 춰|[0.2112045

In [25]:
enrichedData = ldaResults\
.withColumn("Topic_0", udf_breakout_array(lit(0), ldaResults.topicDistribution))\
.withColumn("Topic_1", udf_breakout_array(lit(1), ldaResults.topicDistribution))\
.withColumn("Topic_2", udf_breakout_array(lit(2), ldaResults.topicDistribution))\
.withColumn("Topic_3", udf_breakout_array(lit(3), ldaResults.topicDistribution))\
.withColumn("Topic_4", udf_breakout_array(lit(4), ldaResults.topicDistribution))

In [26]:
enrichedData.createOrReplaceTempView("enrichedData")

In [35]:
res = spark.sql("SELECT COUNT(*) as TOPIC2 FROM ENRICHEDDATA WHERE TOPIC_2 >= 0.5")

In [24]:
res = spark.sql("SELECT TITLE, TOPIC_5 FROM ENRICHEDDATA WHERE TOPIC_5 >= 0.25 ORDER BY TOPIC_5 DESC")

In [69]:
res.show(10, False)

+------+
|TOPIC2|
+------+
|20814 |
+------+



In [77]:
getMainTopicIdx = udf(lambda l: int(np.argmax([float(x) for x in l])), IntegerType())

In [78]:
countTopDocs = (ldaResults
                .select(getMainTopicIdx("topicDistribution").alias("idxMainTopic"))
                .groupBy("idxMainTopic").count().sort("idxMainTopic"))

In [79]:
countTopDocs.show()

+------------+-----+
|idxMainTopic|count|
+------------+-----+
|           0|18542|
|           1| 9596|
|           2|14526|
|           3| 8552|
|           4| 9465|
+------------+-----+



In [87]:
cluster_lda = ldaResults.withColumn("cluster", getMainTopicIdx("topicDistribution"))

In [139]:
getMainTopicWeight = udf(lambda l: float(max([float(x) for x in l]) * 10), FloatType())
clusterdf = cluster_lda\
.select("title", "artist", "lyric", "cluster", getMainTopicValue("topicDistribution").alias("weight"))\
.toPandas()

In [143]:
clusterdf.count()

title      60681
artist     60681
lyric      60681
cluster    60681
weight     60681
dtype: int64

In [142]:
clusterdf.to_csv("LDAcluster.csv", header=True, index=False)