<a href="https://colab.research.google.com/github/clutchkingasiimov/BigDataAlgorithms/blob/main/ass4_map_reduce_spark_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1><center>Big Data Algorithms Techniques & Platforms</center></h1>
<h2>
<hr style=" border:none; height:3px;">
<center>Assignment 4 - MapReduce and Spark</center>
<hr style=" border:none; height:3px;">
</h2>

# Introduction


<p align="justify">
<font size="3">
In this exercise you is asked to use Spark for implementing an algorithm that applies computations on documents and dataframes.
</font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**Execute the following cell in order to initialize Spark**</font>
<hr style=" border:none; height:2px;">
</p>

In [1]:
# !apt-get update
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
# !tar zxvf spark-3.0.3-bin-hadoop2.7.tgz
# !pip install -q findspark

import os
import re
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"


import findspark
findspark.init()

#import of the SparkSession
import pyspark 
from pyspark.sql import SparkSession

#inizialization of the Spark Session
spark = SparkSession \
    .builder \
    .appName("Assignment4") \
    .getOrCreate()

import pandas as pd

# Analysing documents


<p align="justify">
<font size="3">
We have already seen that MapReduce procedures are good in analyzing text-files.
    
The provided data comes from a scraping operation on the website https://www.vagalume.com.br/ and is available on kaggle:
    
https://www.kaggle.com/neisse
    

    
The assignment is divided in 2 parts:
    
* Part 1 is focused on MapReduce 
    
* Part 2  is focuses on dataframes
    </font>
    </p>
    
<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>Notice that  dataset is noisy and shows all the typical issues related with data coming from this procedure (duplicated entries, etc).</font>
<hr style=" border:none; height:2px;">
</p>

# Part 1 -  MapReduce
<p align="justify">
<font size="3">
In the provided folder you can find a set of documents/files containing  descriptions of songs (lyrics and additional informations). Specifically in each file:

- the first line is the idiom/language
- the second line is the title of a song
- the third line is the relative url of the song of the original website
- from fourth line on the text you find the lyrics of the song.
    </font>
    </p>

## Exercise 1 - (2 points) - Song's lyrics 

<p align="justify">
<font size="3">
Provide a Spark MapReduce procedure that reads the documents and checks how many song's lyrics appear at least two times.

In the data-interpretation of this exercise you can consider that two files represent the same lyric if the url (3rd line of each file) is the same.

 </font>
</p>

<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>Notice that  you can reuse any code that was made available for the previous labs/assignments or that you already developed in these contexts.</font>
<hr style=" border:none; height:2px;">
</p>

In [None]:
#Unzipping the file to extract the folder 
!unzip /content/lyrics_files_idioms.zip

In [2]:
#Coroutine function for loading the files 
#This coroutine file will work as a mapper function 
def file_opener(file_input):
    fle = file_input[1]
    contents = re.compile(r'\n').split(fle)
    url = contents[1]
    return url, 1

In [11]:
#Let's see a random file inside the folder 
import shutil 
path = "/content/lyrics_files_idioms"
dir_list = os.listdir(path)
first_1500 = dir_list[:15000]

# new_dir = os.mkdir("/content/lyrics_files_idioms_small")
for files in first_1500:
    shutil.copy("/content/lyrics_files_idioms/"+files, "/content/lyrics_files_idioms_small")

# len(os.listdir("content/lyrics_files_idioms_small"))

In [19]:
### Write here your code
from pyspark import SparkConf, SparkContext 
conf = SparkConf().setMaster("local")
sc = SparkContext.getOrCreate(conf=conf)

#wholeTextFiles is a method that allows us to read text files quickly
lyrics_file = sc.wholeTextFiles("/content/lyrics_files_idioms/*")
links = lyrics_file.map(file_opener) #We use the coroutine mentioned above
duplicates = links.reduceByKey(lambda x,y: x+y).filter(lambda x: x[1] > 1)
print(f"Duplicate links: {duplicates.count()}")

Duplicate links: 38096


In [None]:
duplicates.count()

1

## Exercise 2

### 2.1 - (1 point) - Distinct songs
Provide a Spark MapReduce procedure that provides how many distinct song's lyrics are present.

Also in this case consider the uri as key: two files represent the same lyric if the url is equal.

### 2.2 - (1 point) - Chaining MapReduce steps
According to your implementation of Exercise 1, can you chain MapReduce additional MapReduce steps for solving Exercise 2.1? 

Provide the code for 2.1 and anwer for 2.2

In [None]:
#Show all the links in the text files
for link in links.collect():
    print(link)

As seen above, the text file consists of the first line as the language.

In [22]:
### Write here your code followed by the answer to question 2.1
links.reduceByKey(lambda x,y: x+y).count()

167499

In [None]:
links.count()

1500

For Question 2, a convenient method to detect the number of duplicates is to take the total count of the documents and subtract the duplicates from the document. The solution has already been implemented in Question 1 already.

# Exercise 3

### 3.1 - (3 points) - Most common word for language

Now that you discovered the duplicated documents consider just one occurence of each song's lyric and define a MapReduce procedure that finds the most common word for each language (of course you must remove stop words).




In [77]:
### Write here your code

#First we load a stopword file in a json format 
import json 
from collections import defaultdict

with open("/content/stopwords-all.json","r") as stop_file:
    stopword = json.load(stop_file)

#Storing the stopwords in a defaultdict to track them. 
stopwords = defaultdict(list)
for key, value in stopword.items():
    stopwords[key] = value

#Storing the ISO codes in a defaultdict 
iso_codes = defaultdict(str)
with open("/content/iso_codes.txt","r") as codes:
    lines = codes.readlines()
    for line in lines: 
        iso_code = line.split()[0].replace("'",'').replace("(",'')\
        .replace(")",'').replace(",",'')
        language = line.split()[1].replace("'",'').replace("(",'')\
        .replace(")",'').replace(",",'').replace(";",'')
        iso_codes[language] = iso_code

#Define a coroutine for finding the song words from each song 
def song_words(file_input):
    fle = file_input[1]
    contents = re.compile(r'\n').split(fle)
    lang = contents[0].title()
    #Find the ISO code that reflects the language 
    iso_code = iso_codes[lang]
    
            
    # # #Extract the song lyrics and lower string, and remove special characters
    lyrics = contents[3].lower()
    lyrics = re.compile(r"[.:;,\s\?!\[\]\(\)\"&\*/]+").split(lyrics)
    if iso_code != '':
        song_stopwords = stopwords[iso_code]
        cleaned_words = [word for word in lyrics if word not in song_stopwords]
        return lang, tuple(cleaned_words)
    else:
        return 'Na',tuple()

top_words = lyrics_file.map(song_words)\
    .filter(lambda x : x[0] != 'Na')\
    .distinct()\
    .flatMapValues(lambda x: x)\
    .map(lambda x: ((x[0], x[1]), 1))\
    .reduceByKey(lambda x, y: x + y)\
    .map(lambda x: (x[0][0], (x[0][1], x[1])))\
    .sortBy(lambda x: x[1][1],ascending=False)\
    .groupByKey()\
    .mapValues(list)\
    .mapValues(lambda x: (x[0]))

top_words.collect()

[('Portuguese', ('pra', 146459)),
 ('English', ('love', 144789)),
 ('Kinyarwanda', ('ni', 712)),
 ('German', ('komm', 192)),
 ('Danish', ('forever', 105)),
 ('Tagalog', ('ang', 79)),
 ('Irish', ('me', 67)),
 ('Swedish', ('a', 54)),
 ('Malagasy', ('me', 38)),
 ('Norwegian', ('vestido', 32)),
 ('Catalan', ('tu', 29)),
 ('Swahili', ('pouss', 28)),
 ('Russian', ('стиле', 25)),
 ('Turkish', ('loco', 19)),
 ('Korean', ('you', 16)),
 ('Bosnian', ('je', 8)),
 ('Polish', ('policz', 6)),
 ('Hungarian', ('virágom', 4)),
 ('Romanian', ("pe'", 3)),
 ('Spanish', ('amor', 6698)),
 ('French', ('the', 755)),
 ('Italian', ("c'è", 417)),
 ('Icelandic', ('og', 128)),
 ('Japanese', ('huh', 66)),
 ('Dutch', ('nimma', 40)),
 ('Indonesian', ('i', 40)),
 ('Galician', ('paso', 36)),
 ('Sundanese', ('you', 34)),
 ('Finnish', ('the', 34)),
 ('Arabic', ('y', 21)),
 ('Estonian', ('jää', 20)),
 ('Slovak', ('uncoolohol', 16)),
 ('Czech', ('senta', 13)),
 ('Serbian', ('i', 13)),
 ('Basque', ('nire', 12)),
 ('Kurdish',

In [None]:
# os.chdir("")

# os.listdir("..")

# f = open("/content/lyrics_files_idioms_small", "r")
# contents = re.compile(r'\n').split(f.read())
# lol = stopwords[iso_codes[contents[0].title()]]
# lyrics =re.compile(r"[.:;,\s\?!\[\]\(\)\"&\*/]+").split(contents[3].lower())
# cleaned_words = [word for word in lyrics if word not in lol]

### 3.2 - (3 points) - Most common end/start words

Finally discover, for each language, the most common ending and starting word (of course, also in this case) you must remove stop words).

In [73]:
### Write here your code

#First we load a stopword file in a json format 
import json 
from collections import defaultdict

with open("/content/stopwords-all.json","r") as stop_file:
    stopword = json.load(stop_file)

#Storing the stopwords in a defaultdict to track them. 
stopwords = defaultdict(list)
for key, value in stopword.items():
    stopwords[key] = value

#Storing the ISO codes in a defaultdict 
iso_codes = defaultdict(str)
with open("/content/iso_codes.txt","r") as codes:
    lines = codes.readlines()
    for line in lines: 
        iso_code = line.split()[0].replace("'",'').replace("(",'')\
        .replace(")",'').replace(",",'')
        language = line.split()[1].replace("'",'').replace("(",'')\
        .replace(")",'').replace(",",'').replace(";",'')
        iso_codes[language] = iso_code

#Define a coroutine for finding the song words from each song 
def song_firstlast_words(file_input):
    fle = file_input[1]
    contents = re.compile(r'\n').split(fle)
    lang = contents[0].title()
    #Find the ISO code that reflects the language 
    iso_code = iso_codes[lang]
    # # #Extract the song lyrics and lower string, and remove special characters
    lyrics = contents[3].lower()
    lyrics = re.compile(r"[.:;,\s\?!\[\]\(\)\"&\*/]+").split(lyrics)
    cleaned_lyrics = []
    if iso_code != '':
        song_stopwords = stopwords[iso_code]
        cleaned_lyrics = [word for word in lyrics if word != '']
        cleaned_words = [word for word in cleaned_lyrics if word not in song_stopwords]
        if cleaned_words != []:
            return lang, str(tuple(cleaned_words)[0]), str(tuple(cleaned_words)[-1])
        else:
            return lang, None, None 
    else:
        return 'Na',tuple()


#Extract the common words into a list 
common_words = lyrics_file.map(song_firstlast_words)\
    .filter(lambda x: x[0] != 'Na')\
    .filter(lambda x: x[2] != None)\
    .distinct()\
    .map(lambda x: (x[0], (x[1], x[2])))\
    .flatMapValues(lambda x: [("F", x[0]), ("L", x[1])])\
    .map(lambda x: ((x[0], x[1][0], x[1][1]), 1))\
    .reduceByKey(lambda x,y : x+y)\
    .map(lambda x: ((x[0][0], x[0][1]),(x[0][2],x[1])))\
    .reduceByKey(lambda x,y: x if x[1] >= y[1] else y)\
    .sortBy(lambda x: x[0][0],ascending=False)
    # .groupByKey()\
    # .mapValues(list)


    # .map(lambda x: ((x[0][0], x[0][1]), (x[0][2], x[1])))
    # .reduceByKey(lambda x, y: x[1] >= y[1])


first_last_common_words = common_words.collect()


In [74]:
first_last_common_words

[(('Welsh', 'F'), ('sterling', 1)),
 (('Welsh', 'L'), ('mae', 1)),
 (('Turkish', 'L'), ('geci', 1)),
 (('Turkish', 'F'), ('just', 1)),
 (('Tagalog', 'L'), ('2x', 1)),
 (('Tagalog', 'F'), ('noong', 1)),
 (('Swedish', 'L'), ('minns', 1)),
 (('Swedish', 'F'), ('vet', 2)),
 (('Swahili', 'L'), ('day', 1)),
 (('Swahili', 'F'), ('so', 1)),
 (('Sundanese', 'F'), ('mayakovsky', 1)),
 (('Sundanese', 'L'), ('geoya', 1)),
 (('Spanish', 'F'), ('quiero', 83)),
 (('Spanish', 'L'), ('amor', 144)),
 (('Slovak', 'F'), ('the', 1)),
 (('Slovak', 'L'), ('uncool', 1)),
 (('Serbian', 'F'), ('srce', 1)),
 (('Serbian', 'L'), ('vjeènim', 1)),
 (('Russian', 'L'), ('a-ma-super-super-star', 1)),
 (('Russian', 'F'), ('муз', 2)),
 (('Romanian', 'L'), ('core', 1)),
 (('Romanian', 'F'), ('tenímmoce', 1)),
 (('Portuguese', 'L'), ('amor', 961)),
 (('Portuguese', 'F'), ('vou', 800)),
 (('Polish', 'L'), ('x2', 1)),
 (('Polish', 'F'), ('s³ucham', 1)),
 (('Norwegian', 'L'), ('veeei', 1)),
 (('Norwegian', 'F'), ('drøm', 1)),


<p align="justify">
<hr style=" border:none; height:2px;">
 <font  size="3" color='#91053d'>**DataFrames**</font>
<hr style=" border:none; height:2px;">
</p>

# Part 2 - Dataframes

In this part you can use Pandas Dataframes or  Spark Dataframes.  I suggest to use a Spark Dataframe
end exploit the Pandas functionalities as we have seen in the 2nd assignment. Download the two available datasets at the link:

https://www.kaggle.com/neisse/scrapped-lyrics-from-6-genres

You can find two .cvs files: 

* artists-data.csv

* lyrics-data.csv


# Import artist data.
<p align="justify">
<font size="3">
The artist data in the .csv file can be stored in a dataframe. 
    
Each row of the .csv file describes an artist and the columns represent the following data:
    
* Artist - The artist's name
* Popularity - Popularity score at the date of scrapping
* ALink - The link to the artist's page
* AGenre - Primary musical genre of the artist
* AGenres - A list (pay attention to the format) of genres the artist fits in
    
</font>
</p>


# Import song's lyrics data.


<p align="justify">
<font size="3">
    
Each row of the .csv file describes a lyric and the columns represent the following data:
    
* ALink - The link to the webpage of the artist
* SLink - The link to the webpage of the song
* Idiom - The idiom of the lyric
* Lyric - The lyrics
* SName - The name of the song

    

</font>
</p>


In [106]:
#Artists dataframe
artists = spark.read.option("header","true")\
    .csv('/content/artists-data.csv')


#  Exercise 4 - (3 points) - Artist's genre

<p align="justify">
<font size="3">
Provide a program that finds the artists for which the genre is not specified.

</font>
</p>






In [107]:
#First 5 rows of Artists dataframe
artists.show(5)

+-----------------+-----+----------+-------------------+-----+--------------------+
|           Artist|Songs|Popularity|               Link|Genre|              Genres|
+-----------------+-----+----------+-------------------+-----+--------------------+
|    10000 Maniacs|  110|       0.3|    /10000-maniacs/| Rock|Rock; Pop; Electr...|
|        12 Stones|   75|       0.3|        /12-stones/| Rock|Rock; Gospel/Reli...|
|              311|  196|       0.5|              /311/| Rock|Rock; Surf Music;...|
|    4 Non Blondes|   15|       7.5|    /4-non-blondes/| Rock|Rock; Pop/Rock; R...|
|A Cruz Está Vazia|   13|         0|/a-cruz-esta-vazia/| Rock|                Rock|
+-----------------+-----+----------+-------------------+-----+--------------------+
only showing top 5 rows



In [108]:
#We use a MySQL query approach to filter the artists 
artists.filter("Genre IS NULL").show(5)

+------+-----+----------+----+-----+------+
|Artist|Songs|Popularity|Link|Genre|Genres|
+------+-----+----------+----+-----+------+
+------+-----+----------+----+-----+------+



#  Exercise 5 - (3 points) - Duplicates

<p align="justify">
<font size="3">
Provide a program that removes the duplicates in the artists (also in this case the URL is the key).

</font>
</p>




In [109]:
### Write here your code
unique_artists = artists.dropDuplicates(['Link'])
unique_artists.show(5)

+--------------------+-----+----------+--------------------+---------+--------------------+
|              Artist|Songs|Popularity|                Link|    Genre|              Genres|
+--------------------+-----+----------+--------------------+---------+--------------------+
|           DJ Khaled|  108|       2.7|         /dj-khaled/|  Hip Hop|Hip Hop; Rap; Bla...|
|       Dying Kingdom|   10|         0|     /dying-kingdom/|     Rock|Rock; Hard Rock; ...|
|         ExaltaSamba|  238|        13|      /exalta-samba/|    Samba|               Samba|
|   Gabriella Caetano|    2|         0| /gabriella-caetano/|Sertanejo|           Sertanejo|
|Luiz Henrique & F...|   43|         0|/luiz-henrique-e-...|Sertanejo|           Sertanejo|
+--------------------+-----+----------+--------------------+---------+--------------------+
only showing top 5 rows



#  Exercise 6 - (4 points)

<p align="justify">
<font size="3">
Provide a program that using dataframe return the 100 most popular artists and the lyrics of their songs.
</font>
</p>

In [111]:
### Write here your code

#Load in the lyrics data first 
lyrics_data = spark.read.option("header","true")\
    .csv('/content/lyrics-data.csv')


#We perform inner join of the top artists with their songs using Alink as the key

best_artists = unique_artists.sort(unique_artists.Popularity.desc())\
    .limit(100).select("Link","Artist")

best_artists_lyrics = best_artists.join(lyrics_data, best_artists.Link == lyrics_data.ALink)
artists_lyrics = best_artists_lyrics.select("Artist","SName","Lyric")

artists_lyrics.show(5)

+-------------+--------------------+--------------------+
|       Artist|               SName|               Lyric|
+-------------+--------------------+--------------------+
|4 Non Blondes|           What's Up|Twenty-five years...|
|4 Non Blondes|            Spaceman|Starry night brin...|
|4 Non Blondes|     Pleasantly Blue|Every time you wa...|
|4 Non Blondes|               Train|What ya gonna do ...|
|4 Non Blondes|Calling All The P...|How can you tell,...|
+-------------+--------------------+--------------------+
only showing top 5 rows



# 2 - Bonus 


<p align="justify">
<font size="3">
Using the approach you prefer (just Dataframes, hybrid approach)  :
    
* the 10 most common words in the lyrics of each artist
* the 10 most common words for each genre. For this question we can use the primary genre of the artist.

</font>
</p>




In [None]:
# Write here your code and the detailed description of the MapReduce algorithm.




















