In [262]:
from pyspark.sql import SparkSession
from operator import add

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.111:7077") \
        .appName("lyrics_mapreduce")\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 4)\
        .config("spark.cores.max", 8)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("INFO")

In [263]:
import pyspark.sql.functions as F

lyrics = spark_session.read\
           .option("header", "true")\
           .csv("hdfs://192.168.2.111:9000/user/ubuntu/lyrics_database.csv")

lastfm = spark_session.read\
           .json("hdfs://192.168.2.111:9000/user/ubuntu/lastfm/lastfm_test/*/*/*")\
            .repartition(30)\
           .cache()

In [264]:
from pyspark.sql.types import *

genre = lastfm.filter(F.size(lastfm["tags"]) > 0 )\
        .select("tags", "track_id").cache()

In [None]:
lyrics.show(3)
genre.show(4)
genre.printSchema()
lyrics.printSchema()
lyrics.count()

In [265]:
paired_songs = lyrics.join(genre, "track_id").cache()
paired_songs.show(4)

paired_songs.count()

+------------------+----+-----+--------------------+
|          track_id|word|count|                tags|
+------------------+----+-----+--------------------+
|TRAGRAZ128F4219FB4|   i|   13|[[ska, 100], [ska...|
|TRAGRAZ128F4219FB4| the|    1|[[ska, 100], [ska...|
|TRAGRAZ128F4219FB4| you|    9|[[ska, 100], [ska...|
|TRAGRAZ128F4219FB4|  to|    3|[[ska, 100], [ska...|
+------------------+----+-----+--------------------+
only showing top 4 rows



1622597

In [266]:
from stop_words import get_stop_words

stopwords = get_stop_words("english")

songs_lite = paired_songs.filter(paired_songs['word'].isin(stopwords)==False)\
                        .select(paired_songs["tags"], \
                                paired_songs["word"],\
                                paired_songs["count"],\
                                F.explode(paired_songs["tags"]))\
                        .drop("tags")\
                        .withColumnRenamed("col","genre")\
                        .cache()
    
songs_lite.show(3)

+----+-----+--------------+
|word|count|         genre|
+----+-----+--------------+
|know|    1|    [ska, 100]|
|know|    1|[ska punk, 70]|
|know|    1|    [rock, 20]|
+----+-----+--------------+
only showing top 3 rows



In [267]:
import pyspark.sql.types


def remove_similarity(genre_tuple):
    genre, _ = genre_tuple
            
    return genre

tags_function = F.udf(remove_similarity, StringType())


wordcount_genre = songs_lite.withColumn("genre", tags_function(songs_lite["genre"]))\
                            .withColumn("wordcount", songs_lite["count"].cast(IntegerType()))\
                            .drop("count")\
                            .cache()

wordcount_genre.show(2)

+----+--------+---------+
|word|   genre|wordcount|
+----+--------+---------+
|know|     ska|        1|
|know|ska punk|        1|
+----+--------+---------+
only showing top 2 rows



In [268]:
wordcount_genre.groupBy(wordcount_genre["word"], wordcount_genre["wordcount"], wordcount_genre["genre"])\
                .count()\
                .sort("count",ascending=False)\
                .drop("wordcount")\
                .show()

+-----+-----+-----+
| word|genre|count|
+-----+-----+-----+
| time| rock|  934|
| know| rock|  899|
|  see| rock|  872|
|  now| rock|  855|
|  can| rock|  820|
|   go| rock|  812|
| come| rock|  790|
|  one| rock|  779|
| feel| rock|  724|
| make| rock|  721|
| take| rock|  720|
|  way| rock|  704|
|  eye| rock|  700|
|  say| rock|  700|
|  get| rock|  685|
|  day| rock|  667|
| time|  pop|  651|
|never| rock|  649|
| back| rock|  646|
| away| rock|  643|
+-----+-----+-----+
only showing top 20 rows



In [246]:
wordcount_genre.groupBy("genre", "word").count().sort("count", ascending=False).show(100)

+----------------+------+-----+
|           genre|  word|count|
+----------------+------+-----+
|            rock|  know| 2253|
|            rock|   now| 1879|
|            rock|  time| 1850|
|            rock|   can| 1767|
|             pop|  know| 1713|
|            rock|    go| 1695|
|            rock|   see| 1690|
|            rock|  come| 1627|
|            rock|   one| 1592|
|            rock|  love| 1566|
|            rock|  feel| 1526|
|            rock|   get| 1517|
|             pop|  love| 1496|
|            rock| never| 1413|
|            rock|  make| 1392|
|            rock|   say| 1386|
|            rock|   way| 1360|
|             pop|   can| 1356|
|            rock|  take| 1355|
|             pop|  time| 1352|
|            rock|   got| 1339|
|             pop|   now| 1308|
|            rock|    ca| 1297|
|             pop|    go| 1290|
|             pop|   see| 1253|
|     alternative|  know| 1238|
|            rock|  want| 1234|
|            rock|   day| 1214|
|       

In [247]:
wordcount_genre.groupBy("genre").count().sort("count", ascending=False).show()

+-----------------+------+
|            genre| count|
+-----------------+------+
|             rock|270730|
|              pop|194131|
|      alternative|151691|
|            indie|133500|
| female vocalists|118503|
|          Hip-Hop|117014|
|            metal|112044|
|        favorites|110182|
|          hip hop| 97384|
|              rap| 97082|
|              00s| 94792|
|             Love| 92879|
| alternative rock| 88059|
|        seen live| 79477|
|        beautiful| 73125|
|   male vocalists| 72457|
|       indie rock| 72045|
|          Awesome| 70776|
|singer-songwriter| 70399|
|       electronic| 68566|
+-----------------+------+
only showing top 20 rows



In [269]:

import pandas as pd
import matplotlib.pyplot as plt

top_genres = {"rock", "pop", "alternative", "indie", "Hip-Hop"}
genre_top_words = []

for genre in top_genres:
    #genre_top_words += 
    wordcount_genre.filter(wordcount_genre["genre"] == genre)\
            .groupBy("genre", "word")\
            .count()\
            .sort("count", ascending=False)\
            .limit(10)\
            .show()

#print(genre_top_words)

#df = pd.DataFrame({'word': ['word1', 'word2'], 'count': [12898, 4861]})
#df.plot.bar(x='word', y='count', rot=0)

+-----+----+-----+
|genre|word|count|
+-----+----+-----+
|indie|know| 1125|
|indie| now|  918|
|indie| can|  893|
|indie|time|  850|
|indie| see|  834|
|indie|  go|  806|
|indie|come|  783|
|indie| get|  749|
|indie| one|  748|
|indie|love|  736|
+-----+----+-----+

+-----------+----+-----+
|      genre|word|count|
+-----------+----+-----+
|alternative|know| 1238|
|alternative| now| 1016|
|alternative|time|  996|
|alternative| can|  982|
|alternative| see|  938|
|alternative|  go|  903|
|alternative|feel|  885|
|alternative|come|  867|
|alternative| one|  847|
|alternative| get|  840|
+-----------+----+-----+

+-------+----+-----+
|  genre|word|count|
+-------+----+-----+
|Hip-Hop| get|  582|
|Hip-Hop|know|  550|
|Hip-Hop| got|  543|
|Hip-Hop| now|  497|
|Hip-Hop| one|  464|
|Hip-Hop| can|  452|
|Hip-Hop|back|  422|
|Hip-Hop| see|  422|
|Hip-Hop|come|  420|
|Hip-Hop|make|  416|
+-------+----+-----+

+-----+----+-----+
|genre|word|count|
+-----+----+-----+
| rock|know| 2253|
| rock| now

In [270]:
spark_context.stop()