In [224]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

spark=SparkSession.builder\
    .master("local[*]")\
    .appName("MillionSongSubSet")\
    .getOrCreate()

sc=spark.sparkContext

## Reading csv file as RDD

In [314]:
rdd_data = spark.read.csv("sampledataMS.csv", sep=",", escape='"', multiLine=False,
     inferSchema=False, header=True)

In [315]:
rdd_data = rdd_data.rdd

In [348]:
rdd_data.take(1)

[Row(analysis_sample_rate='22050', audio_md5='af86941e0f2094ce0788670bc25fcb27', danceability='0.0', duration='253.04771', end_of_fade_in='0.061', energy='0.0', idx_bars_confidence='0', idx_bars_start='0', idx_beats_confidence='0', idx_beats_start='0', idx_sections_confidence='0', idx_sections_start='0', idx_segments_confidence='0', idx_segments_loudness_max='0', idx_segments_loudness_max_time='0', idx_segments_loudness_start='0', idx_segments_pitches='0', idx_segments_start='0', idx_segments_timbre='0', idx_tatums_confidence='0', idx_tatums_start='0', key='11', key_confidence='0.614', loudness='-13.54', mode='0', mode_confidence='0.674', start_of_fade_out='245.69', tempo='109.997', time_signature='4', time_signature_confidence='0.822', track_id='TRAKKPN12903CF7C4D', analyzer_version=None, artist_7digitalid='191898', artist_familiarity='0.5030023057537399', artist_hotttnesss='0.0', artist_id='ARZIKR01187B98DF08', artist_latitude=None, artist_location=None, artist_longitude=None, artist

## Word count on Song Titles

Cleaning punctuation and converting to lower case for normalization of the data.

In [None]:
def lower_clean_str(x):
  punc='!"#$%&\'()*+,./:;<=>?@[\\]^_`{|}~-'
  lowercased_str = x.lower()
    for ch in punc:
    lowercased_str = lowercased_str.replace(ch, '')
    return lowercased_str

Word Counting process by using RDD commands on our dataset such as: <b> Map, FlatMap, Key/Value pairs, reduceByKey, SortBy, Filter and Lambda operation.

In [367]:
rdd_data_wc = rdd_data.map(lambda x: x["title"]) #mapping song titles from the dataset
rdd_data_wc = rdd_data_wc.filter(lambda x:'' if x is None else x) #filtering the titles which are not blank
rdd_data_wc = rdd_data_wc.flatMap(lambda x: x.split(" ")) #splitting words with flatmap
rdd_data_wc = rdd_data_wc.map(lambda x: (lower_clean_str(x),1)).filter(lambda x: x[0] != '') #applying "lower_clean_str" function
rdd_data_wc_count = rdd_data_wc.reduceByKey(lambda x,y:(x+y)) #reducing mapped words
rdd_data_wc_count_result = rdd_data_wc_count.sortBy(lambda x : x[1], ascending=False) #sorting the count of words by descending order
rdd_data_wc_count_result.take(10)

[('the', 1296),
 ('version', 773),
 ('you', 495),
 ('album', 466),
 ('of', 448),
 ('a', 437),
 ('in', 409),
 ('i', 392),
 ('me', 349),
 ('love', 323)]

## Counting  The Songs by Release Year

Counting year by using RDD commands on our dataset such as: <b> Map, Key/Value pairs, reduceByKey, SortBy, Filter and Lambda operation.

In [366]:
rdd_data_year = rdd_data.map(lambda x: x["year"]) #mapping release year from the dataset
rdd_data_year = rdd_data_year.filter(lambda x: x != "0") #filtering the year has 0 value
rdd_data_year = rdd_data_year.map(lambda x: (x,1)) #mapping the year by key-value pairs
rdd_data_year_count = rdd_data_year.reduceByKey(lambda x,y:(x+y)) #reducing mapped the years for counting process
rdd_data_year_count_result = rdd_data_year_count.sortBy(lambda x : x[1], ascending=False) #sorting the count of release year by descending order
rdd_data_year_count_result.take(10)

[('2006', 320),
 ('2005', 304),
 ('2007', 285),
 ('2004', 270),
 ('2003', 254),
 ('2008', 253),
 ('2009', 250),
 ('2001', 217),
 ('2002', 198),
 ('2000', 192)]