In [1]:
# There are essentially two methods to use Spark with jupyter notebook.
# 1. USe findspark to load pyspark on a regular jupyter notebook or any other IDE
# 2. Configure pyspark to always use jupyter notebook
# We will use the first method here. findspark needs to be installed first (pip install findspark)

import findspark
findspark.init()

# This is not running on any cluster
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster('local').setAppName('Songs')
sc = SparkContext(conf=conf)

In [4]:
# Reading the data and some initial parsing to generate a dictionary with the band name as the key and the list of songs as the
# value

file =  open('List.txt', 'r')
list_of_names = file.readlines()
bands = {}

counter = 0
while counter < len(list_of_names[:-1]):
    if '---' in list_of_names[counter+1]:
        key = list_of_names[counter].split('\n')[0]
        songs = []
        bands[key] = songs
        counter = counter + 2
        
    elif (list_of_names[counter] != '\n') and ('---' not in list_of_names[counter+1]):
        songs.append(list_of_names[counter].split('\n')[0])
        bands[key] = songs
        counter = counter + 1
    else:
        counter = counter+1
        
file.close()        

# Creating a pair rdd from the dictionary
rdd_from_coll = sc.parallelize([(k,v) for k,v in bands.iteritems()])   

# each pair is of the form (Bandname, [list of songs])
print rdd_from_coll.collect()   

[('Primal Fear', ['New religion', 'Running in the dust', 'World on fire', 'Face the emptiness']), ('Excalion', ['Between the lines', 'Arriving as the dark', 'Wingman', 'Losing time', 'Enter a life', 'I failed you', 'Streams of madness']), ('Kamelot', ['Center of the Universe', 'March of Mephisto', 'This pain']), ('Within Temptation', ['Our solemn hour', 'Let us burn', 'Silver moonlight', "It's the fear", 'And we run', 'Faster', 'Hand of sorrow', 'Memories', 'Iron', 'What have you done', 'Memories', 'Sinead', 'Shot in the dark', 'Paradise', 'An we run', "A demon's fate", 'The truth beneath the rose']), ('Powerwolf', ['Shot in the dark', 'Army of the night', 'We are the wild', 'Lust for blood', 'We drink your blood', 'Armata strigoi', 'Amen and attack', 'Sanctus dominus', 'Sanctified with dynamite', 'Higher than heaven', 'Night of the werewolves', "Vampires don't die", "Dead boys don't cry"]), ('Nightwish', ['Dark chest of wonders', 'Stargazers', 'Nemo', 'Everdream', 'Ghost love score', 

In [10]:
# A simple wordcount exercise on the song list to find the most common words in the song titles
# the first flatMap takes the values from the key value pairs and generates a RDD with the song names, which are then converted
# to lower case which are then separated into words by the last flatMap

rdd_from_coll_1 = rdd_from_coll.flatMap(lambda line: line[1]).map(lambda x : x.lower()).\
flatMap(lambda line : line.split(' '))

# This is really naive and in the next versions I hope to use something better (may be TF-IDF) to leave out commonly occuring 
# words. Basically this is a list of words which should not be counted 
ignore = ['the', 'and', 'a', 'an', 'of', 'to',  'for', 'with', 'in', 'on', 'at', 'we', 'me', 'i',\
          'us', 'my', '', 'now', 'this', 'that', 'as', 'all', "don't", 'are','is', 'on']

# Filtering out the words that match with something in the ignore  and then mapping each word to a key value pair with value 1
words = rdd_from_coll_1.filter(lambda x : x not in ignore).map(lambda x: (x,1))

# standard wordcount statements...this can be combined together with countByValue() action on the rdd
words_count = words.reduceByKey(lambda x,y : x+y).map(lambda (x,y) : (y,x)).sortByKey(ascending = False)
print words_count.top(10)

[(8, 'die'), (7, 'world'), (5, 'dark'), (4, 'new'), (4, 'last'), (4, 'blood'), (3, 'time'), (3, 'run'), (3, 'night'), (3, 'metal')]


In [6]:
# Top words include 'die', 'dark', 'blood' .....Seems like I have to try out some lighter and happier songs :)

In [8]:
# Lets see which bands had the most songs in the list
fav_bands = rdd_from_coll.mapValues(lambda x : len(x)).map(lambda (x,y) : (y,x)).sortByKey(ascending = False)
print fav_bands.collect()

[(38, 'Sabaton'), (25, 'Nightwish'), (22, 'Gamma Ray'), (20, 'Iron Maiden'), (17, 'Within Temptation'), (14, 'Unheilig'), (13, 'Powerwolf'), (13, 'Xandria'), (13, 'Eisbrecher'), (12, 'Lyriel'), (11, 'Krypteria'), (10, 'Altaria'), (8, 'Stahlmann'), (7, 'Excalion'), (7, 'Edenbridge'), (5, 'Gaia Epicus'), (4, 'Primal Fear'), (4, 'Wizard'), (4, 'Black Majesty'), (3, 'Kamelot')]
