<a href="https://colab.research.google.com/github/JotaBlanco/QuixStreamsNotebooks/blob/main/Conferences/PythonWebConference/Quix_Streams_PROCESS_CHAT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Quix Streams
Just use pip install to download the Quix Streams library. 

[Quix Streams](https://github.com/quixio/quix-streams) is an open source Python library for processing streaming data. It’s aimed at people who work with time-series data streams — from developers and ML engineers to data scientists and data engineers.

In [None]:
! pip install quixstreams

# Import the libraries
We will be using mainly pandas, quix, matplotlib and seaborn.

In [None]:
import pandas as pd
import quixstreams as qx

# 1 - Create client
Let's start by creating a Quix client that we'll use to publish and subscribe to Kafka topics.

In [None]:
# Initiating Quix managed token, but it could be your own kafka
token = 'sdk-296f2b9decff4770a525ff7d8855a78d'
client = qx.QuixStreamingClient(token)
client

# 2 - Clients
Create producer and consumer clients

In [None]:
topic_name = "chat-messages-enriched"
topic_producer = client.get_topic_producer(topic_name)
topic_producer

In [None]:
stream_id = "python web conference"
stream_out = topic_producer.get_or_create_stream(stream_id)
stream_out

In [None]:
topic_name = "chat-messages"
topic_consumer = client.get_topic_consumer(topic_name)
topic_consumer

# 3 - Listen to some data
Let's listen to some data

In [None]:
df= pd.DataFrame()

def on_stream_received_handler(stream_received: qx.StreamConsumer):
  stream_received.timeseries.on_dataframe_received = on_timeseries_data_received_handler

def on_timeseries_data_received_handler(stream: qx.StreamConsumer, df_i: pd.DataFrame):
  global df
  df = df.append(df_i)
  print("Data from stream " + stream.stream_id)
  display(df_i)

topic_consumer = client.get_topic_consumer(topic_name)
topic_consumer.on_stream_received = on_stream_received_handler
qx.App.run()

In [None]:
df

# 4 - Process data with Hugging Face

In [None]:
! pip install transformers

In [None]:
from transformers import pipeline

In [None]:
pipeline_model = pipeline(model='siebert/sentiment-roberta-large-english')

In [None]:
pd.DataFrame(pipeline_model(["This is analysing text", "Two messages"]))

## 4.1 - Processing without state

In [None]:
topic_producer = client.get_topic_producer("chat-messages-enriched")
stream_out = topic_producer.get_or_create_stream("python web conference")

def on_stream_received_handler(stream_received: qx.StreamConsumer):
  stream_received.timeseries.on_dataframe_received = on_timeseries_data_received_handler

def on_timeseries_data_received_handler(stream_in: qx.StreamConsumer, df: pd.DataFrame):
  
  # Add predictions
  df_prediction = pd.DataFrame(pipeline_model(list(df["chat-message"])))
  df = pd.concat([df, df_prediction], axis=1)
  
  # Sentiment column
  df["sentiment"] = df["score"]
  filter_negative = df["label"] == "NEGATIVE"
  df.loc[filter_negative, "sentiment"] = -df.loc[filter_negative, "score"]
  
  # Average
  #df["average_sentiment"] = df["sentiment"]
  display(df)
  stream_out.timeseries.publish(df)

topic_consumer = client.get_topic_consumer("chat-messages")
topic_consumer.on_stream_received = on_stream_received_handler
qx.App.run()

## 4.2 - Processing with state

In [None]:
topic_producer = client.get_topic_producer("chat-messages-enriched")
stream_out = topic_producer.get_or_create_stream("python web conference")

last_X_sent = []

def on_stream_received_handler(stream_received: qx.StreamConsumer):
  stream_received.timeseries.on_dataframe_received = on_timeseries_data_received_handler

def on_timeseries_data_received_handler(stream_in: qx.StreamConsumer, df: pd.DataFrame):
  global last_X_sent

  # Add predictions
  df_prediction = pd.DataFrame(pipeline_model(list(df["chat-message"])))
  df = pd.concat([df, df_prediction], axis=1)
  
  # Sentiment column
  df["sentiment"] = df["score"]
  filter_negative = df["label"] == "NEGATIVE"
  df.loc[filter_negative, "sentiment"] = -df.loc[filter_negative, "score"]
  
  # Average
  last_X_sent = last_X_sent + list(df["sentiment"])
  last_X_sent = last_X_sent[-5:]
  df["average_sentiment"] = sum(last_X_sent)/len(last_X_sent)
  display(df)

  df["Timestamp"] = pd.Timestamp.now()
  stream_out.timeseries.publish(df)

topic_consumer = client.get_topic_consumer("chat-messages")
topic_consumer.on_stream_received = on_stream_received_handler
qx.App.run()