# EmoteDet: Emotion Detection for Artist Discography


## The idea

Implementation of a **Data Pipeline** to analyze the discography from an **Artist**, using Machine Learning to detect the **emotions** associated to the lyrics of their tracks, and then gathering and visualizing the data to have a general idea of the emotions conveyed by the artist in their music, adding new music in **real time** when released.

## The Implementation

### First steps

The first problem to solve to implement the idea, is to decide where to gather the data, meaning the lyrics from all the discography from an artist.
The choice fell on a crawler written in **Python**, using two services to gather the data: **Spotify and Genius.com**

<img src="notebook/crawler.png" align="middle">

## Why two APIs?

The question now is: why using two services when you could use just one?  
Well, weirdly, the answer is **efficency**.  
  
Using only the Genius APIs, which are the ones used to actually get the lyrics into our pipeline, the search times for all the songs were way too long, because the actual crawler first researches the artist name, and then from the object returned, would download every single lyrics one by one searching the track by artist, saving them all only at the end of the last search.  
  
Now this would be pretty bad for a real-time project like this. So the solution was to actually get all the track titles before searching for the lyrics, but with the Python API Interface for Genius (**lyricsgenius**) this was not possible. Here enters Spotify.  
  
Using **Spotipy**, the Python API Interface for Spotify, the crawler first searches for all the track titles, which then are read by the Genius APIs which actually get the lyrics track by track.  

## Let's look at some code now

It's finally time to start looking at the **code!**
The crawler is a little Python Script which uses lyricsgenius and spotipy to actually get our data.
We will look at all the **functions** wrote to make the pieces work togheter now

```python
def searchArtist(artistName, spotify):
    
    result = spotify.search('{' + artistName + '}',type="artist")
    return result['artists']['items'][0]['uri']

```

We start by searching for the Artist into the Spotify database with their APIs. The search returns a **JSON response**, from which we extract the only field we need: the Spotify **URI related to the Artist.**

```python
def searchAlbums(artistUri, spotify):ù
    album_uris = []
    sp_albums = spotify.artist_albums(artistUri, album_type='album')

    for i in range(len(sp_albums['items'])):
        album_uris.append(sp_albums['items'][i]['uri'])
    return album_uris
```

Using the Artist URI, now we can get all the **Albums** published by the artist we are looking for. The only information we need is the URI again, so we get a list of **Album URIs**, extracted within the APIs JSON response.

```python
def searchTracks(albumUris, spotify):
    trackList = []
    for uri in albumUris:
        albumTracks = spotify.album_tracks(uri)
        for n in range(len(albumTracks['items'])):
            if albumTracks['items'][n]['name'] not in trackList:
                trackList.append(albumTracks['items'][n]['name'])
    return trackList

```

We finally have what we needed: using the URIs from the album list, the function can get all the **track titles** we needed, again extracted from the APIs JSON response, which are then returned as a list of titles, checking for duplicates (we do not want the same track to be analyzed twice). The track titles will be **sanitized** right after (meaning we remove all the characters that will raise Exceptions during the save process or make the research more difficult)

```python
def getTrackData(track, genius, artistName):
  
    trackData = genius.search_song(title=track, artist=artistName)
    if trackData != None:
        return trackData.to_json()
    else:
        return None
```

We are almost at the end: now, having all the track titles, this function will be called for each of them. The result will be a JSON response from Genius, with all the data we need for the track (**title, artist, album, lyrics, ecc)**

```python

def writeTrackData(trackData, track):
    filename = "/opt/tracks/" + track + ".json"
    file = open(filename, 'w')
    file.write(trackData)
    file.close()
```

Finally, the crawler will write the data on a **JSON file**, ready for the ingestion into our Data Pipeline.

## The journey begins

It's time for our data to start traversing our Pipeline, starting with the Data Ingestion:
We will be using [**Logstash**](https://www.elastic.co/logstash) as our Data Ingestion layer.

<img src="notebook/cralog.png" align="middle">

Logstash will read all the JSON files the crawler will create in real time, **logging** them as soon as the new file will be created. Here's the configuration for the **Logstash Pipeline** (with a little spolier for our next layer!) 

```json
input {
    file {
        path => ["/opt/tracks/*.json"]
         codec => "json"
        mode => "read"
    }
}

filter {
    json {
        source => "message"
    }
}

output {
    kafka {
        topic_id => "emotedet"
        bootstrap_servers => "kafkaserver:9092"
        codec => json
    }
}
```

## Time to deliver our messages

The next step in our Pipeline is to actually deliver our data to the recipient (the Data Analysis layer)
This will be our **Data Streaming** layer, and the choice for it is [**Apache Kafka**](https://kafka.apache.org/), an open-source distributed event streaming platform. Logstash will write all the data ingested into a Kafka **topic**, ready to be read by the Analysis layer.

<img src="notebook/cralogkaf.png" align="middle">

## Let's detect some emotions!

We've reached the heart of the Pipeline now: it's time to analyze our data to actually do the **Emotion Detection** on the lyrics we've gotten by Genius.   
  
Our **Data Analyis** layer will be executed by [**Apache Spark**](https://spark.apache.org/), a unified analytics engine for data processing, in particular using two modules of Spark: **Spark ML**, the Machine Learning library to actually classify our text, and **Spark Structured Streaming**, to read and write data in real-time.  

<img src="notebook/cralogkafspa.png" align="middle">

## Spark ML

Apache Spark offers a library called Spark ML, to use Machine Learning functions to actually analyze data based on Spark's own type of container, the **Dataframe**.

We built a **ML Pipeline** to train our model first, using the **[ISEAR dataset](https://www.unige.ch/cisa/research/materials-and-online-research/research-material/)**, derived by a study from the University of Geneva where respondents where asked to report situations where they experienced one of the seven major emotions (**joy, fear, anger, sadness, disgust, shame, and guilt**). The final project was to actually build a dataset based on the data collected by the study, which we'll be using for our own project.



## ML code

```python
trainingSet = spark.read.csv("/opt/spark/dataset/DATA.csv",
                            schema=schema,
                            header=True,
                            sep=';')


trainingSet = trainingSet.na.drop(subset="lyrics")

removerToGetList = StopWordsRemover(inputCol="words", outputCol="wordsSanitized")
stopWordList = removerToGetList.getStopWords()
stopWordList.extend(['verse', 'refrain', 'intro', 'outro', 'bridge', 'chorus', '1', '2', '3', 're','pre', 'oh', 'll', 'solo', 've', 'yeah', 'm'])
```

We start by reading our dataset into a **Spark Dataframe**, and preparing our first pieces for the ML Pipeline: getting the **Stop Words** to remove from the phrases words and adding music-related ones.

```python

stage_1 = RegexTokenizer(inputCol="lyrics", outputCol="words", pattern='\\W')

stage_2 = StopWordsRemover(inputCol="words", outputCol="wordsSanitized", stopWords=stopWordList)

stage_3 = CountVectorizer(inputCol="wordsSanitized", outputCol="vector", minDF=1.0)

stage_4 = StringIndexer(inputCol="emotion", outputCol="emotionLabel")

model = LogisticRegression(featuresCol="vector", labelCol="emotionLabel")

#creating ML Pipeline
pipeline = Pipeline(stages=[stage_1, stage_2, stage_3, stage_4, model])
```

Then we actually build our pipeline, using five stages:  

1. We use a **RegexTokenizer** to split the phrases word by word;
2. Then a **StopWordsRemover** to remove stop words from our analysis;
3. Then we pass our words through a **CountVectorizer**, to get the corresponding Frequency Vector to use in our analysis;
4. We use a **StringIndexer** to "cast" our emotions from strings to int, using indexes.
5. Then we finally use **Logistic Regression** to actually do the analysis and get our prediction.

```python
pipelineFit = pipeline.fit(trainingSet)

labels = pipelineFit.stages[3].labels

evaluationData = pipelineFit.transform(trainingSet)

reindexer = IndexToString(inputCol="prediction", outputCol="predictedEmotion", labels=labels)

evaluator = MulticlassClassificationEvaluator(labelCol="emotionLabel", predictionCol="prediction")
accuracy = evaluator.evaluate(evaluationData)
```

Once our pipeline is built, we'll fit the training set to our pipeline, and get the correspondent labels for our emotion to actually get the string and not the index. We then use a **MulticlassClassificationEvaluator** to actually get the **accuracy** for our model: *0.9868159176517879*

```python
trackDataFrame = spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "kafkaserver:9092") \
                .option("subscribe", "emotedet") \
                .option("startingOffsets", "earliest").load()

trackDataFrame = trackDataFrame.selectExpr("CAST(value AS STRING)") \
                .select(from_json("value", lyricsSchema).alias("data")) \
                .select("data.*")

trackDataFrame = reindexer.transform(pipelineFit.transform(trackDataFrame)).select('id', 'title', 'artist', 'lyrics','wordsSanitized', 'header_image_url', 'predictedEmotion')

trackDataFrame.withColumnRenamed('wordsSanitized', 'words').withColumn("timestamp", current_timestamp()).writeStream \
            .option("checkpointLocation", "/save/location")\
                .format("es")\
                .start("emotedet")\
                .awaitTermination()
```

Our model is ready: we can now **read** our data from Kafka in real time using **Spark Structured Streaming**, and then applying our model to the data read to actually **analyze** it. After the analysis, we'll write the data somewhere (no spoilers!) and we will be ready to **visualize** it.

## Time for pie charts!

Now that our data is analyzed, we are ready to visualize our results! We'll use [**Kibana**](https://www.elastic.co/kibana),  a free and open user interface that lets you visualize your data, to  have more **insight** on the artist discography. To use Kibana though, we need Spark to write the data on **[Elasticsearch](https://www.elastic.co/elasticsearch/)**, a distributed, JSON-based search and analytics engine, to organize our data ready for Kibana to visualize.

<img src="notebook/almostdone.png" align="middle">

When the project will be deployed (we'll se later how!), Kibana will be ready with an **Index Pattern** to match the Elasticsearch index created by Spark, and a **Dashboard** to visualize some basic data from the project.

<img src="notebook/dashboard.png" align="middle">

## When 16 GB of memory is not enough

Our data pipeline is **completed and running** now, but how much our devices will be **affected** by it? Will my PC explode or it will survive?   
   
The deployement of the pipeline will include a system to collect **metrics** from the various layers of our pipeline, and the choice was to use **[Metricbeat](https://www.elastic.co/beats/metricbeat)**, a lightweight way to send system and service **statistics**, based on the ELK stack and Kibana-ready for the visualization of our metrics.

<img src="notebook/done.png" align="middle">

## Metrics

With **Metricbeats**, we can get all the metrics from the ELK stack directly on Kibana, all the data from our Docker containers (because the project is intended to be deployed using containers, we'll discuss about it later on), and various metrics from our Kafka broker.
   
Both the Docker and Kafka module will have their **dashboard** ready to visualize the metrics, while the ones related to the ELK stack can be checked directly on Kibana's **Stack Management** section.

```yaml
metricbeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

processors:
  - add_cloud_metadata: ~
  - add_docker_metadata: ~

output.elasticsearch:
  hosts: 'http://elastic:9200'

setup.kibana.host: "kibana:5601"
metricbeat.modules:
- module: docker
  metricsets:
    - "container"
    - "cpu"
    - "diskio"
    - "event"
    - "healthcheck"
    - "info"
    - "memory"
    - "network"
  hosts: ["unix:///var/run/docker.sock"]
  period: 10s
  enabled: true

# Kafka metrics collected using the Kafka protocol
- module: kafka
  metricsets:
    - partition
    - consumergroup
  period: 10s
  hosts: ["kafkaserver:9092"]

# Metrics collected from a Kafka broker using Jolokia
- module: kafka
  metricsets:
    - broker
  period: 10s
  hosts: ["kafkaserver:8779"]

- module: logstash
  period: 10s
  hosts: ["logstash:9600"]
  xpack.enabled: true


- module: kibana
  period: 10s
  hosts: ["kibana:5601"]
  enabled: true
  xpack.enabled: true

- module: elasticsearch
  period: 10s
  hosts: ["elastic:9200"]
  xpack.enabled: true
```


# The Deployment

## Containers orchestration

The project is intended to be deployed using **[Docker containers](https://www.docker.com/resources/what-container)**, a standard unit of software that packages up code and all its dependencies so the application runs **quickly and reliably from one computing environment to another.**  
  
All the layers of the pipeline are already Docker-ready, and the project can be deplyed using two different container orchestration methods: **Docker-Compose and Kubernetes**

## Docker-Compose

The project includes a docker-compose.yml file, ready to be passed to the **[Docker-Compose CLI](https://docs.docker.com/compose/)** and deployed as a series of Docker containers. 

```yaml
version: "3"
services:
    logstash:
        build: logstash
        container_name: "logstash"
        image: "logstash:emotedet"
        depends_on:
            - "crawler"
            - "kafkaserver"
        ports:
            - "9600:9600"
        volumes:
            - "tracks:/opt/tracks"
        networks:
            - "emotedet"

    # Crawler container
    crawler:
        build: "geniusCrawler"
        image: "crawler:emotedet"
        container_name: "crawler"
        depends_on:
            - "kafkaserver"
        volumes: 
            - "tracks:/opt/tracks"
        environment:
            - "PYTHONUNBUFFERED=1"
            - "ARTIST"
    
    # Zookeeper for Kafka
    zookeeper:
        image: "confluentinc/cp-zookeeper:6.1.1"
        container_name: "zookeeper"
        ports:
            - "2181:2181"
        environment:
            ZOOKEEPER_CLIENT_PORT: "2181"
            ZOOKEEPER_SERVER_ID: "1"
        networks:
            - "emotedet"


    # Kafka Broker
    kafkaserver:
        build: "kafka"
        image: "kafka:emotedet"
        container_name: "kafkaserver"
        hostname: "kafkaServer"
        depends_on:
            - "zookeeper"
        ports:
            - "9092:9092"
            - "9101:9101"
            - "8779:8779"
        environment:
            KAFKA_BROKER_ID: 0
            KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
            KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafkaserver:9092"
            KAFKA_DEFAULT_REPLICATION_FACTOR: 1
            KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
            KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
            KAFKA_OPTS: "-javaagent:/opt/jolokia-jvm-1.6.2-agent.jar=port=8779,host=kafkaserver"
        networks:
            - "emotedet"


    # Container to create Kafka topic
    kafkatopic:
        image: "confluentinc/cp-kafka:6.1.1"
        container_name: "kafkatopic"
        depends_on:
            - "kafkaserver"
        command: bash -c "kafka-topics --create --bootstrap-server kafkaserver:9092 --replication-factor 1 --partitions 1 --topic emotedet"
        networks:
            - "emotedet"

    # Kafka webui
    webui:
        image: "provectuslabs/kafka-ui:latest"
        container_name: "kafkaWebUI"
        environment:
            KAFKA_CLUSTERS_0_NAME: "my_cluster"
            KAFKA_CLUSTERS_0_ZOOKEEPER: "zookeeper:2181"
            KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafkaServer:9092"
        ports:
            - "8080:8080"
        depends_on:
            - "kafkaserver"
        networks:
            - "emotedet"


    # Spark container
    spark:
        build: "spark"
        image: "spark:emotedet"
        ports:
            - "4040:4040"
        container_name: "spark"
        depends_on:
            - "kafkaserver"
            - "kibana"
        networks:
            - "emotedet"

    # ES container
    elastic:
        image: "docker.elastic.co/elasticsearch/elasticsearch:7.13.2"
        container_name: "elastic"
        environment:
            - "node.name=es"
            - "cluster.name=es-docker-cluster"
            - "discovery.seed_hosts=es"
            - "discovery.type=single-node"
            - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
        ports:
            - "9200:9200"
        networks:
            - "emotedet"

    # Kibana container
    kibana:
        image: "docker.elastic.co/kibana/kibana:7.12.1"
        container_name: "kibana"
        ports:
            - "5601:5601"
        depends_on:
            - "elastic"
        networks:
            - "emotedet"
        environment:
            - ELASTICSEARCH_HOSTS="http://elastic:9200"


    # Metricbeat container
    metricbeat:
        build: "metricbeat"
        image: "metricbeat:emotedet"
        container_name: "metricbeat"
        restart: "always"
        environment:
            ELASTICSEARCH_HOSTS: "elastic:9200"
        volumes:
            - "metricbeat:/usr/share/metricbeat/data"
            - "/var/run/docker.sock:/var/run/docker.sock"
        networks:
            - "emotedet"

    # Container to import Kibana dashboards and indexes via REST Api
    kibImporter:
        build: "kibImporter"
        image: "kibmporter:emotedet"
        container_name: "kibImporter"
        depends_on:
            - "kibana"
        networks:
            - "emotedet"

volumes:
    tracks:
    metricbeat:

networks:
    emotedet:

```

Aside from our pipeline layers, the compose files will create a couple of other containers:  

* A Container for **[Zookeeper](https://zookeeper.apache.org/)**, needed by Kafka for running
* A Container to create the **Kafka Topic** that will be written by Logstash and read by Spark
* A Container to import all the **Kibana Dashboards and indexes** needed to visualize the data and the metrics.

Having Docker installed, deploying the pipeline will be pretty easy: we will need just a command.

```bash
docker-compose up --build -d
```

Let's see what this does: ```docker-compose up``` will say to docker to run all the containers defined in the docker-compose file with all the options indicated, ```--build``` will actually build all the images needed for the various containers, and ```-d``` will tell the CLI to not log all the containers in the current terminal

Once the containers are all up and running, we can enter into **Kibana** using a browser with the url [kibana:5601](http://kibana:5601), and start playing with our data! 

## Kubernetes

The project offers, other than a Docker-Compose solution, all the files needed to deploy it to a **Kubernetes Cluster**.

**[Kubernetes](https://kubernetes.io/)**, also known as **K8s**, is an open-source system for automating deployment, scaling, and management of containerized applications. Using the **kubectl** CLI, assumed that a K8s cluster is already up and running, we can deploy the project easily.

The first thing to do to deploy the pipeline to a Kubernete cluster, is to be sure all the **images** are built and updated. We can easily do that in a single command using the docker-compose file:
```bash
docker-compose build```

Once the images are ready, and a **Kubernetes Cluster** is online (es. using [**Minikube**](https://minikube.sigs.k8s.io/docs/) or the Docker Desktop Kubernetes cluster), we can deploy all the containers into **K8s pods** using kubectl:
```bash
kubectl apply -f kubernetes
```
The command will apply to the cluster all the services and deployments descripted in the files in the **kubernetes** folder. If a **Kubernetes Dashboard** is up, we can check that all is running smoothly there (es. Using minikube we can start it with the command ```minikube dashboard ```)

To actually access **Kibana**, we need to tell Kubernetes to forward its **port** to our host system. We can do that using kubectl.

```bash
kubectl port-forward kibana-pod-name 5601```

Once all is running, we can finally access our data with Kibana in our browser at the link [localhost:5601](localhost:5601)