# Multi-Stream-Live-Chat-Analysis

###### Incardona Biagio

## What is it?

Twitch-Youtube-Chat-Analysis is a university project made by the student Incardona Biagio for the **TECHNOLOGIES FOR ADVANCED PROGRAMMING** class. The project consists of a real-time analysis of the viewers' interactions of a live stream on YouTube and/or Twitch.


## For Who?

The project is aimed to streamers especially those who streams on multiple platforms in parallel (in this case: Twitch and YouTube).

The project can also be used by potential investors to identify whether the streamer has a good group of viewers or not



## Goal

The main goal is to have a simply interface to analyze the viewers behavior as the chats in a live stream can be really confusing.
That issue became worse if parallel streams in different platforms are made


A dashboard will then be provided containing the main analysis with the possibility of:
  1. Having a summary analysis concerning all the platforms simultaneously
  2. Having an analysis aimed at the single platform

## Structure

The project can be split into 5 different phases:
    
   1. [Ingestion](#Ingestion)
   2. [Streaming](#Streaming)
   3. [Processing](#Processing)
   4. [Indexing](#Indexing)
   5. [Visualization](#Visualization)

## Ingestion

The ingestion phase consists in the collection of the data from external sources, in this case our sources will be YouTube and Twitch.

In order to get an easily expandable project, it has been decided to collect data using APIs and web scraping, then these data have been ingested using **Logstash**


### Twitch

To retain data from Twitch, its API has been used.

### YouTube

Even if YouTube exposes good APIs, it doesn't allow to have access to live chat resources without having a GUI, since the project must work entirely in batch **selenium** has been used to scrape the data from YouTube.

Google Chrome's driver has been used.  

#### Let's see how

```python
    chrome_options = webdriver.ChromeOptions()
    chrome_options.add_argument('--disable-gpu')
    chrome_options.add_argument('--no-sandbox')
    chrome_options.add_argument('--headless')
    chrome_options.add_argument('--disable-extensions')
    browser = webdriver.Chrome(chrome_options=chrome_options)
    browser.get("https://www.youtube.com/live_chat?v=" + channel)
    chats = []
    count = 0
    rate = 0.0
    start_time = time.time()
    while True:
        for chat in browser.find_elements_by_css_selector('yt-live-chat-text-message-renderer'):
            usr = chat.find_element_by_css_selector("#author-name").get_attribute('innerHTML')
            mex = chat.find_element_by_css_selector("#message").get_attribute('innerHTML')
            usr = str(usr).split('<')[0]

            obj = json.dumps({'author_name': usr, 'message': mex})
            x = json.loads(obj)
```

### Preprocessing

One of our goals is to extract public engagement.

For our purpose, we define engagement as the time elapsed between sending one message and another.

Such a value will be calculated with the following function.

```python
rate = 6/(end_time-start_time)
```

### Logstash

Logstash is a tool for collecting, processing, and forwarding log events and messages. The collection is done through configurable input plug-ins. Once an input plug-in has collected the data, it can be processed by any number of filters that modify and annotate the event data. Finally Logstash routes events to output plugins which can forward events to a variety of external programs including Elasticsearch, local files, Kafka, and others.

For our purpose, Logstash will collect the data from the two scrapers in JSON format over a TCP connection and then forward it to Kafka using the "data" topic.

To allow the passage of data between Logstash and the two scrapers, a small protocol is needed in order to send homogeneous data.

```
{
    'username' : username of the sender,
    'message' : body of the message,
    'engagement' : engagement,
    'timestamp' : timestamp of when the message arrives into logstash,
    'source' : source name
}
```

## Streaming

To carry out the data streaming Apache Kafka has been used.

### Apache Kafka

Apache Kafka is a distributed data streaming platform that allows you to publish, subscribe, archive, and process streams of records in real-time. It is designed to manage data streams from multiple sources by distributing them to multiple consumers. In short, it allows you to move large amounts of data from one point to another at the same time.

## Processing

Once we have collected the data through Kafka we want to manipulate and obtain additional information from this data.

For this purpose, I relied on Apache Spark.

### Apache Spark

Apache Spark is a distributed computing framework that runs 100 times faster than its main competitor (MapReduce).

Spark is optimized for machine learning algorithms, so it is obvious that my choice falls on him.

For the creation and manipulation of DataFrame on the fly we made use of PySpark, the API provided by Spark to interface with him via Python.


### Machine Learning
By relying on Spark we can apply ML algorithms very quickly.

Our goals are 2:
    
     1. Sentiment analysis of chats
     2. Keyword extraction of chats
    
#### 1. Sentiment analysis


In [12]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer 
sid = SentimentIntensityAnalyzer()

def get_sentiment(text):
    value = sid.polarity_scores(text)
    value = value['compound']
    return value

print(get_sentiment("i'm ugly :("))
print(get_sentiment("i'm pretty :)"))

-0.7351
0.7351


#### 2. Keyword extraction

To carry out the extraction of the keywords in each message I used one of the most powerful libraries of multi-language natural language processing currently existing for Python: **SpaCy**.

Also providing 3 versions of the library: small (11MB), medium (~ 100MB), and large (~ 1GB), it can also be adapted to any memory-related problems.

I obviously used the "large" version.

##### Code
```python
def get_keyword(text):
    result = []
    pos_tag = ['PROPN', 'ADJ', 'NOUN']
    doc = nlp(text.lower())
    for token in doc:
        if(token.text in nlp.Defaults.stop_words or token.text in punctuation):
            continue
        if(token.pos_ in pos_tag):
            result.append(token.text)

    back = [x for x in Counter(result).most_common(1)]       
    if len(back) == 0:
        return None 
    return back[0][0]
```

---
---

At the end of the day, a JSON file structured in the following way will be provided

```
{
    'username' : username dell utente,
    'timestamp' : timestamp di quando un messaggio e arrivato,
    'mex' : contenuto del messaggio,
    'engagement' : engagement,
    'source' : nome della sorgente,
    'mex_sentiment' : valore della sentiment analysis del messaggio,
    'keyword' : keyword rappresentativa del messaggio
}
```

## Indexing

Our data will be indexed in order to reduce research costs. For this purpose, we will make use of **ElasticSearch**.

### ElasticSearch

ElasticSearch is a search engine based on Lucene, the fastest Open Source data retrieval API used for the development of search engines.

All the functionalities are natively exposed through the RESTful interface, while the information is managed as JSON documents.

In the project, we will use only one index: "data"

The use of a single index allows us to switch from one platform to another or work together in a very simple and intuitive way

## Visualization

Kibana is an open-source data visualization dashboard for Elasticsearch.
sending the functionality of viewing indexed content on Elasticsearch. Users can create bar, line, and pie charts and maps over large volumes of data.

In addition to running graphs, Kibana allows the processing of data in real-time!

<p align="center">
  <img src="./dashboard.png" width="1000" title="hover text">
</p>