# Twitch AI Powered Chat Analizer
<img src="NotebookPictures/TwitchLogo.png" alt="Alternative text" width="500" height="340"/>

## Cos'è Twitch?

<img src="NotebookPictures/Twitch1.png" alt="Twitch Homepage" />

- Livestreams di varie categorie
- Gaming
- Talk Show
- Utenti: Abbonati (Sub) e Non Abbonati (Not Sub)

<img src="NotebookPictures/TwitchChat.png" alt="Twitch Chat" height="720" width="1280"/>

## Scopo del progetto

- Fornire uno strumento ai creator o utenti "curiosi"
- Arricchire i messaggi della chat analizzandone **sentiment** ed **emotion**
- Interfaccia semplice che dia informazioni sulla chat a colpo d'occhio

<img src="NotebookPictures/AIChat.png" alt="AI Powered Chat" height="720" width="1280"/>

Fornire **metriche** sull'andamento della chat:
- Variazione del sentiment del tempo, in generale, per i sub, e per i not sub
- Attività della chat
- Proporzione delle emotion
- Proporzione di messaggi ricevuti da Sub e non Sub

<img src="NotebookPictures/GeneralSentiment.png" alt="General Sentiment" height="720" width="1280"/>

<img src="NotebookPictures/SubSentiment.png" alt="Sub Sentiment" height="720" width="1280"/>

<img src="NotebookPictures/NotSubSentiment.png" alt="Not Sub Sentiment" height="720" width="1280"/>

<img src="NotebookPictures/ChatActivity.png" alt="Chat Activity" height="720" width="1280"/>

<img src="NotebookPictures/SubPortion.png" alt="Sub Portion" height="720" width="1280"/>

<img src="NotebookPictures/EmotionPortion.png" alt="Emotion Portion" height="720" width="1280"/>

# Live Demo!

# Struttura del progetto

Tecnologie usate:
- Logstash
- Kafka
- Kafka Streams
- Spark
- Elastic Search
- Kibana

Altri moduli/pacchetti utilizzati:
- Flask
- TwitchIO APIs
- Twitch REST APIs
- Java Watchservice
- OpenAI APIs (Chat GPT 3.5 Turbo)
- SocketIO

<img src="NotebookPictures/Struttura1.png" alt="Emotion Portion" height="720" width="1280"/>

<img src="NotebookPictures/Struttura2.png" alt="Emotion Portion" height="720" width="1280"/>

<img src="NotebookPictures/Struttura3.png" alt="Emotion Portion" height="720" width="1280"/>

<img src="NotebookPictures/Struttura4.png" alt="Emotion Portion" height="720" width="1280"/>

<img src="NotebookPictures/Struttura5.png" alt="Emotion Portion" height="720" width="1280"/>

<img src="NotebookPictures/Struttura6.png" alt="Emotion Portion" height="720" width="1280"/>

<img src="NotebookPictures/Struttura7.png" alt="Emotion Portion" height="720" width="1280"/>

<img src="NotebookPictures/Struttura8.png" alt="Emotion Portion" height="720" width="1280"/>

<img src="NotebookPictures/Struttura9.png" alt="Emotion Portion" height="720" width="1280"/>

# Approfondimenti sul codice

Tutto il sistema gira su container docker, i quali vengono creati con un docker compose:

In [None]:
version: '3.8'
services:

  zookeeper:
    hostname: zookeeper
    image: confluentinc/cp-zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka
    hostname: kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: host.docker.internal
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://host.docker.internal:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

In [None]:
twitch-chat-ingestor:
    hostname: twitch-chat-ingestor
    image: docker.elastic.co/logstash/logstash:8.13.0
    container_name: twitch-chat-ingestor
    environment:
      XPACK_MONITORING_ENABLED: "false"
    ports:
      - 9090:9090
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
    depends_on:
      - kafka

  init-kafka:
    image: confluentinc/cp-kafka
    entrypoint: ['/bin/sh', '-c']
    command: |
      "
      echo -e 'Creating kafka topics...'
      kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic general --replication-factor 1 --partitions 1
      kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic no_emotes_general --replication-factor 1 --partitions 1
      kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic enriched_chat --replication-factor 1 --partitions 1

      echo -e 'Successfully created the following topics:'
      kafka-topics --bootstrap-server kafka:9092 --list
      "
    depends_on:
      - kafka

In [None]:
kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
    depends_on:
      - kafka

  emote_downloader:
    build:
      context: ./python_emote_downloader
      dockerfile: Dockerfile
    container_name: emote_downloader
    hostname: emote_downloader
    ports:
      - "12345:12345"
    volumes:
      - E:\Uni\TAP\twitchsentimentanalisys/python_emote_downloader:/app
    command: >
      python getEmotes.py
    depends_on:
      - kafka

In [None]:
kafka-stream:
    hostname: kafka-stream
    container_name: kafka-stream
    build:
      context: ./filter_emotes
      dockerfile: Dockerfile
    volumes:
      - E:\Uni\TAP\TwitchSentimentAnalisys\filter_emotes\app\build\libs\app-1.0.jar:/app/app-1.0.jar
      - E:\Uni\TAP\twitchsentimentanalisys/python_emote_downloader:/app
    command: bash -c "ls && java -jar app-1.0.jar"
    depends_on:
      - emote_downloader
      - init-kafka

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.13.4
    container_name: elasticsearch
    ports: 
      - "9200:9200"
      - "9300:9300"
    environment:
      - "discovery.type=single-node"
      - "xpack.security.enabled=false"

In [None]:
kibana:
    hostname: kibana
    image: docker.elastic.co/kibana/kibana:8.13.4
    ports:
      - 5601:5601
    environment:
      - "xpack.security.enabled=false"
    volumes:
      - E:\Uni\TAP\twitchsentimentanalisys/spark_sentiment/kibana.yml:/usr/share/kibana/config/kibana.yml
    depends_on:
      - elasticsearch

  spark:
    build:
      context: ./spark_sentiment
      dockerfile: Dockerfile
    container_name: spark
    hostname: spark
    ports:
      - "4040:4040"
    volumes:
      - E:\Uni\TAP\twitchsentimentanalisys/spark_sentiment:/opt/twitchsentimentanalisys
    command: >
      /opt/spark/bin/spark-submit --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.2,org.elasticsearch:elasticsearch-spark-30_2.12:8.13.4 /opt/twitchsentimentanalisys/spark_sentiment.py
    depends_on:
      init-kafka:
        condition: service_completed_successfully
      elasticsearch:
        condition: service_started

In [None]:
flask:
    build:
      context: ./python_chat_listener
      dockerfile: Dockerfile
    ports:
      - "5000:5000"
    command: python web_interface.py
    volumes:
      - ./python_chat_listener:/app
      - ./python_emote_downloader/.env:/app/.env
    depends_on:
      spark:
        condition: service_started
      kafka:
        condition: service_started
      init-kafka:
        condition: service_completed_successfully
      elasticsearch:
        condition: service_started

## Web Server Flask

In [None]:
@app.route('/')
def home():
    return render_template('index.html')

@app.route('/script-status') # Endpoint per verificare lo stato del processo chat_listener
def get_script_status():
    if process is None:
        return jsonify(status=False)
    else:
        return jsonify(status=True)

@app.route('/update-channel', methods=['POST']) # Endpoint per aggiornare il canale nel file .env
def update_channel():
    new_channel = request.json['channel']
    with open('.env', 'r') as file:
        lines = file.readlines()
    with open('.env', 'w') as file:
        for line in lines:
            if line.startswith('CHANNEL='):
                file.write(f'CHANNEL={new_channel}\n')
            else:
                file.write(line)
    return jsonify(message='Canale aggiornato con successo!')

In [None]:
@app.route('/run-script') # Endpoint per avviare lo script chat_listener
def run_script():
    global process
    script_path = './chat_listener.py'
    
    try:
        # Esegui lo script in background
        if process is None:
            process = subprocess.Popen(['python', script_path])
            print(f'[WEB_INTERFACE] - Script path: {script_path}', flush=True)
            print(f'[WEB_INTERFACE] - Script avviato con PID: {process.pid}', flush=True)

            response = requests.get('http://host.docker.internal:12345/get_emotes') # Richiede al servizio emote_downloader di scaricare le emote relative al canale
            if response.status_code == 200:
                print('Emote ottenute con successo:', response.json())
            else:
                print('Errore nella richiesta delle emote:', response.status_code)

            return jsonify(message='Script avviato in background.'), 200
        else: 
            print('Script già in esecuzione', flush=True)
            return jsonify(message='Script già in esecuzione.'), 400
        
    except Exception as e:
        print('Errore durante l\'esecuzione del script:',str(e), flush=True)
        return jsonify(message=str(e)), 500
    
@app.route('/stop-script') # Endpoint per fermare lo script chat_listener
def stop_script():
    global process
    if process is not None:
        process.terminate()  
        process.wait()  
        process = None
        return jsonify(message='Script fermato'), 200
    else:
        return jsonify(message='Nessun script in esecuzione'), 400

Thread kafka consumer

In [None]:
@socketio.on('connect')
def handle_connect():
    emit('status', {'message': 'Connected to WebSocket'})

def consume_messages(): # Funzione per consumare i messaggi dal servizio Kafka
    consumer = kafka.KafkaConsumer('enriched_chat', bootstrap_servers=['host.docker.internal:9092'])
    for message in consumer:
        socketio.emit('new_message', {'message': message.value.decode()})
        print(f'[CONSUMER] - {message.value.decode()}', flush=True)

if __name__ == '__main__':

    thread = threading.Thread(target=consume_messages) # Avvia il thread per consumare messaggi da Kafka
    thread.daemon = True  # Imposta il thread come daemon così si chiude con il programma
    thread.start()

    # Avvia il server Flask
    socketio.run(app, debug=True, allow_unsafe_werkzeug=True, host='0.0.0.0')

## Chat Listener Bot

In [None]:
    async def event_message(self, message):
        #if message.echo:
        #    return

        # Definisco il messaggio
        msg = { "channel" : message.channel.name,
                "timestamp" : message.timestamp.strftime('%d-%m-%Y %H:%M:%S'),
                "chatter_name" : message.author.display_name,
                "is_broadcaster" : message.author.is_broadcaster,
                "is_mod" : message.author.is_mod,
                "is_subscriber" : message.author.is_subscriber,
                "is_vip" : message.author.is_vip,
                "content" : message.content
                }

        print(msg, flush=True)
        
        try:
            # Invia il messaggio al servizio di ingestion (logstash)
            async with aiohttp.ClientSession() as session:
                async with session.post(os.getenv('INGESTOR_URL'), json=msg) as response:
                    response_text = await response.text()
                    print(f"Sent msg to '{os.getenv('INGESTOR_URL')}', RESPONSE = {response_text}", flush=True)

## Configurazione Logstash

In [None]:
input {
  http {
    id => "kart_http_in"
    port => 9090
  }
}

filter {
    if [http][method] != "POST"{
        drop {}
    }

    mutate {
      remove_field => ["user_agent", "event", "url", "http", "@timestamp", "host", "@version"]
    }
}

output {
  kafka {
    codec => json
    topic_id => "general"
    bootstrap_servers => "kafka:9092"
  }
}

## Emote Downloader

In [None]:
@app.route('/get_emotes')
def get_emotes():

    emotes_list.clear()

    dotenv_file = dotenv.find_dotenv('.env')
    dotenv.load_dotenv(dotenv_file, override=True)
    client_id = os.getenv('CLIENT_ID')
    client_secret = os.getenv('SECRET')
    nome_canale = os.getenv('CHANNEL')
    print(nome_canale, client_id, client_secret)

    channel_id = get_channel_id(nome_canale, client_id, client_secret)

    get_twitch_emotes(channel_id, client_id, client_secret)
    get_7tv_emotes(channel_id)
    print('Done!', emotes_list, flush=True)

    with open('emotes.json', 'w') as file:
        json.dump(emotes_list, file, indent=4) # Dump on file

    print('Emotes wrote on file "emotes.json"', flush=True)

    return jsonify(emotes_list), 200

## Kafka Streams

In [None]:
private static void watchFileChanges(Path path) throws IOException, InterruptedException {
        Path dirPath = path.toAbsolutePath().getParent();
        if (dirPath == null) {
            System.err.println("Il percorso assoluto non ha un genitore.");
            return;
        }

        WatchService watchService = FileSystems.getDefault().newWatchService();
        dirPath.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY);

        long lastModifiedTime = 0;
        final long debounceTime = 3000; // Tempo di debounce in millisecondi

        while (true) {
            WatchKey key = watchService.take();
            for (WatchEvent<?> event : key.pollEvents()) {
                WatchEvent.Kind<?> kind = event.kind();
                if (kind.equals(StandardWatchEventKinds.ENTRY_MODIFY)) {
                    Path changed = (Path) event.context();
                    if (changed.endsWith(path.getFileName())) {
                        long currentTime = System.currentTimeMillis();
                        if (currentTime - lastModifiedTime > debounceTime) {
                            System.out.println("File emotes.json modificato. Riavvio del servizio...");
                            try {
                                restartKafkaService(); // Se il file viene modificato riavvia il servizio
                            } catch (Exception e) {
                                System.err.println("Errore nel riavvio del servizio Kafka: " + e.getMessage());
                                e.printStackTrace();
                            }
                            lastModifiedTime = currentTime;
                        }
                    }
                }
            }
            if (!key.reset()) {
                break;
            }
        }
    }

In [None]:
private static void startKafkaService() throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        ObjectMapper objectMapper = new ObjectMapper();
        ObjectMapper mapper = new ObjectMapper();

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "host.docker.internal:9092");
        properties.put("application.id", "emoji-filter-app");

        try {

            // Carica le emote dal file emotes.json
            final Set<String>[] emojis = new Set[1];
            emojis[0] = mapper.readValue(new File("emotes.json"), Set.class);

            System.out.println(emojis[0]);

            // Filtra i messaggi che contengono emote
            builder.<String, String>stream("general", Consumed.with(Serdes.String(), Serdes.String()))
                    .filter((key, value) -> {
                        String content = "";
                        try {
                            JsonNode jsonNode = objectMapper.readTree(value);
                            content = jsonNode.get("content").asText();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        System.out.println('\n' + content + '\n');
                        return Arrays.stream(content.split("\\s+"))
                                .noneMatch(word -> emojis[0].contains(word));
                    })
                    .to("no_emotes_general");

        } catch (IOException e) {
            e.printStackTrace();
        }

        try {
            streams = new KafkaStreams(builder.build(), properties);
            streams.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

## Spark

Configurazione di elastic search

In [None]:
es = Elasticsearch("http://elasticsearch:9200")
sparkConf = SparkConf().set("es.nodes", "elasticsearch") \
                        .set("es.port", "9200") \
                        .set("es.index.auto.create", "true")

mapping = {
    "mappings": {
        "properties": {
            "timestamp": {
                "type": "date",  # Definisce il campo come tipo date
                "format": "epoch_millis"  # Formati di data accettati
            }
        }
    }
}

elastic_index="twitchmessages"

if not es.indices.exists(index=elastic_index):
    es.indices.create(index=elastic_index, body=mapping)
else:
    # Se l'indice esiste già e vuoi aggiornare il mapping
    es.indices.put_mapping(index=elastic_index, body=mapping['mappings'])

Configurazione per leggere lo stream kafka

In [None]:
kafkaServer = "host.docker.internal:9092"
inputTopic = "no_emotes_general"
kafkaOutputServer = "host.docker.internal:9092"
outputTopic = "enriched_chat"

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafkaServer) \
    .option("subscribe", inputTopic) \
    .load()

Operazioni sullo stream di messaggi

In [None]:
# Select the value and timestamp fields and cast them to string
df = df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")

# Parse the JSON content and extract fields
json_df = df.withColumn("json_data", from_json(col("value"), json_schema)) \
            .select("json_data.*", col("timestamp"))

Arricchimento con ChatGPT

In [None]:
def chatgpt_sentiment(text):
    if text is not None and text.strip() != "":
        client = OpenAI(api_key=OPENAI_API_KEY)   
        response = client.chat.completions.create(
            model='gpt-3.5-turbo',
            messages=[{'role': 'system', 'content': 'Classify the sentiment of the following message. Sentiment is a float from -1 to 1, where -1 is negative, 0 is neutral and 1 is positive. Emotion is a word representing the emotion of the message, can be one of the following: happy, sad, angry, neutral, surprised, disgusted, fearful. The response must be a JSON with two fields: sentiment and emotion'}, {"role": "user", "content": text}],
            max_tokens=100,
            temperature=0
        )

        data = json.loads(response.model_dump_json())
        print(json.dumps(data, indent=4))
        
        # Estrai il sentiment o l'emozione dalla risposta
        parsed_data = json.loads(data['choices'][0]['message']['content'])
        return f"{parsed_data['sentiment']}, {parsed_data['emotion']}"
    else:
        return "0, neutral"
    
chatgpt_sentiment_udf = udf(chatgpt_sentiment, StringType())
    
json_df = json_df.withColumn("gpt_sentiment", chatgpt_sentiment_udf(col("content")))

# Split the gpt_sentiment column into sentiment and emotion columns
split_col = split(col("gpt_sentiment"), ", ")
json_df = json_df.withColumn("sentiment", split_col.getItem(0).cast("float")) \
                 .withColumn("emotion", split_col.getItem(1))

# Rimuovi la colonna gpt_sentiment
json_df = json_df.drop("gpt_sentiment")

In [None]:
kafka_compatible_df = json_df.select(to_json(struct(*[col for col in json_df.columns])).alias("value")) # Creazione di un df compatibile con Kafka

# Scrivi il DataFrame a Elasticsearch
elasticQuery = json_df.writeStream \
   .option("checkpointLocation", "/tmp/") \
   .format("es") \
   .start(elastic_index)

kafkaQuery = kafka_compatible_df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafkaOutputServer) \
    .option("topic", outputTopic) \
    .option("checkpointLocation", "/tmp/kafka_checkpoint") \
    .start()

elasticQuery.awaitTermination()
kafkaQuery.awaitTermination()

# Conclusioni

- Affidabilità grazie a tecnologie quali Logstash e Kafka

- Scalabilità

- Possibilità di ampliare le capacità del modello passando a GPT-4

- Costi ridotti: ~ $0.40/h analizzando una livestream di media affluenza (GPT 3.5).