In [0]:
import string
import math
from pyspark.sql import Row
from pyspark.sql.functions import col, desc
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [0]:
remove_punctuation = str.maketrans('', '', string.punctuation+"’")


In [0]:
len('dbfs:/FileStore/tables/hw3/song dataset/')

Out[141]: 40

In [0]:
N = 0 # total_number_of_documents
song_token_freq = spark.sparkContext.emptyRDD()
directory_path = '/FileStore/tables/hw3/song dataset'
song_id_name = {}

for path in dbutils.fs.ls(directory_path):

#     if N == 1:
#         break

    N += 1
    
    song_id_name[N] = path[0][40:].replace('_', ' ')
    
    song = sc.textFile(path[0]) # read song file
    
    num_lines = song.count()
    
    und_idx = song.zipWithIndex().filter(lambda x: '___' in x[0]).collect()[0][1] # get line index underscores start at
            

    song_filtered = song.zipWithIndex().filter(lambda x: x[1] < und_idx).map(lambda x: x[0]) # filter to only contain lyric of song and no data about author, etc.
    
    parsed_song = song_filtered.map(lambda x: x.translate(remove_punctuation).lower()).map(lambda x: x.split(' ')) # remove punctuation and split on spaces
    
    k_v_pairs = parsed_song.flatMap(lambda tokens: [(token, 1) for token in tokens if token != '']).reduceByKey(lambda accum, n: accum + n) # emit pairs and reduce
    
    song_token_freq = song_token_freq.union(k_v_pairs.map(lambda x: (N, x[0], x[1]))) # create RDD with Song ID, Token ID, and Frequency values

In [0]:
song_token_freq.take(10)

Out[143]: [(1, 'shadow', 1),
 (1, 'feel', 1),
 (1, 'us', 2),
 (1, 'away', 1),
 (1, 'is', 1),
 (1, 'out', 1),
 (1, 'of', 4),
 (1, 'wanna', 1),
 (1, 'alight', 1),
 (1, 'where', 12)]

In [0]:
song_token_freq_idf = song_token_freq.map(lambda x: (x[1], (x[0], x[2]))).groupByKey().flatMap(lambda x: [(i[0], x[0], i[1], math.log(float(N) / len(x[1])))  for i in x[1] ])


In [0]:
song_token_freq_idf.take(10)

Out[145]: [(1, 'away', 1, 1.0986122886681098),
 (7, 'away', 2, 1.0986122886681098),
 (9, 'away', 1, 1.0986122886681098),
 (11, 'away', 2, 1.0986122886681098),
 (15, 'away', 1, 1.0986122886681098),
 (1, 'of', 4, 0.06899287148695142),
 (2, 'of', 4, 0.06899287148695142),
 (3, 'of', 4, 0.06899287148695142),
 (4, 'of', 1, 0.06899287148695142),
 (5, 'of', 3, 0.06899287148695142)]

In [0]:
song_token_freq_idf_tfidf = song_token_freq_idf.map(lambda x: (x[0], x[1], x[2], x[3], x[2] * x[3]))

In [0]:
df = song_token_freq_idf_tfidf.toDF()

In [0]:
df.schema

Out[148]: StructType([StructField('_1', LongType(), True), StructField('_2', StringType(), True), StructField('_3', LongType(), True), StructField('_4', DoubleType(), True), StructField('_5', DoubleType(), True)])

In [0]:
 df = df.withColumnRenamed('_1', 'Song ID').withColumnRenamed('_2', 'Token ID').withColumnRenamed('_3', 'Term Frequency').withColumnRenamed('_4', 'Inverse Document Frequency').withColumnRenamed('_5', 'TF-IDF')

In [0]:
display(df)

Song ID,Token ID,Term Frequency,Inverse Document Frequency,TF-IDF
1,away,1,1.0986122886681098,1.0986122886681098
7,away,2,1.0986122886681098,2.19722457733622
9,away,1,1.0986122886681098,1.0986122886681098
11,away,2,1.0986122886681098,2.19722457733622
15,away,1,1.0986122886681098,1.0986122886681098
1,of,4,0.0689928714869514,0.2759714859478057
2,of,4,0.0689928714869514,0.2759714859478057
3,of,4,0.0689928714869514,0.2759714859478057
4,of,1,0.0689928714869514,0.0689928714869514
5,of,3,0.0689928714869514,0.2069786144608542


In [0]:
df.coalesce(1).write.csv("/FileStore/tables/Hw3_Q1_output.csv", header=True, mode="overwrite")

In [0]:
max_song_word_tfidf = df.groupBy('Song ID').max('TF-IDF').withColumnRenamed('max(TF-IDF)', 'TF-IDF')
result = max_song_word_tfidf.join(df, ['Song ID', 'TF-IDF']).select('Song ID', 'Token ID', 'TF-IDF').orderBy('Song ID')

In [0]:
display(result)

Song ID,Token ID,TF-IDF
1,faded,29.788552212124312
2,winds,8.12415060330663
3,hello,24.37245180991989
4,impossible,43.32880321763536
5,kryptonite,10.83220080440884
5,might,10.83220080440884
5,superhuman,10.83220080440884
5,superman,10.83220080440884
6,daddy,27.0805020110221
7,play,165.19106226723483


In [0]:
result.coalesce(1).write.csv("/FileStore/tables/Hw3_Q1_top_tfidf.csv", header=True, mode="overwrite")

In [0]:
def rank_score(song_id, keywords):
    score = 0.0
    for word in keywords:
        
        result = df.filter((col('Song ID') == song_id) & (col('Token ID') == word)).select('TF-IDF').collect()

        if len(result) == 0:
            score += 0
        else:
            score += result[0][0]
            
    return score
        

In [0]:
schema = StructType([
    StructField('Song Name', StringType(), True),
    StructField('Rank Score', FloatType(), True)
])


In [0]:
keywords = ['tear', 'feel', 'hate']
q1_pt3_result_df = spark.createDataFrame([], schema)
for i in range(1, 16):
    tmp_df = spark.createDataFrame([(song_id_name[i], rank_score(i, ['tear', 'feel', 'hate']))], ['Song Name', 'Rank Score'])
    q1_pt3_result_df = q1_pt3_result_df.union(tmp_df)

In [0]:
display(q1_pt3_result_df)

Song Name,Rank Score
Faded,0.5108256237659907
Girl from the North Country,0.0
Hello,8.12415060330663
Impossible,0.0
Kryptonite,0.5108256237659907
Mockingbird,1.0216512475319814
Murder Most Foul,2.0149030205422647
Numb,2.5541281188299534
Photograph,0.5108256237659907
Run To You,1.0216512475319814


In [0]:
q1_pt3_result_df_top3 = q1_pt3_result_df.orderBy(desc('Rank Score')).limit(3)

In [0]:
q1_pt3_result_df_top3.coalesce(1).write.csv("/FileStore/tables/Hw3_Q1_top_rankscore.csv", header=True, mode="overwrite")

In [0]:
q1_pt3_result_df_top3.show()

+------------------+------------------+
|         Song Name|        Rank Score|
+------------------+------------------+
|             Hello|  8.12415060330663|
|Somewhere I Belong| 5.108256237659907|
|              Numb|2.5541281188299534|
+------------------+------------------+

