<a href="https://colab.research.google.com/github/JotaBlanco/TheValley/blob/main/Mini%20Datathon/Notebooks/Quix_Streams_SUB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install Quix Streams
Just use pip install to download the Quix Streams library. 

[Quix Streams](https://github.com/quixio/quix-streams) is an open source Python library for processing streaming data. It’s aimed at people who work with time-series data streams — from developers and ML engineers to data scientists and data engineers.

In [5]:
! pip install quixstreams

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting quixstreams
  Downloading quixstreams-0.5.1-py3-none-manylinux2014_x86_64.whl (47.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m47.7/47.7 MB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
Collecting Deprecated<2,>=1.1
  Downloading Deprecated-1.2.13-py2.py3-none-any.whl (9.6 kB)
Installing collected packages: Deprecated, quixstreams
Successfully installed Deprecated-1.2.13 quixstreams-0.5.1


# Import the libraries
We will be using mainly pandas, quix, matplotlib and seaborn.

In [6]:
import pandas as pd
import quixstreams as qx

# 1 - Create client
Let's start by creating a Quix client that we'll use to publish and subscribe to Kafka topics.

In [7]:
token = 'sdk-296f2b9decff4770a525ff7d8855a78d'
client = qx.QuixStreamingClient(token)
client

<quixstreams.quixstreamingclient.QuixStreamingClient at 0x7f14d18a73d0>

# 2 - Consumer client
To suscribe to data from one topic, we will need to create a consumer client pointing to that topic.

In [8]:
topic_name = "test-topic"
topic_consumer = client.get_topic_consumer(topic_name)
topic_consumer

<quixstreams.topicconsumer.TopicConsumer at 0x7f14f8c3ae50>

# 3 - Suscribing to topics
Once you have the TopicConsumer instance you can start receiving data. These are the steps:

## 3.1 - Subscribing to streams
For each stream received, the TopicConsumer will execute the callback you define. This callback will be invoked every time you receive a new stream.

In [9]:
def on_stream_received_handler(stream_received: qx.StreamConsumer):
  """
  My callback to new streams received is defined here
  """
  print("New stream just received: " + stream_received.stream_id)

topic_consumer = client.get_topic_consumer(topic_name)
topic_consumer.on_stream_received = on_stream_received_handler
qx.App.run()

## 3.2 - Subscribing to Timeseries data
You can subscribe to time-series data from streams using the on_data_received callback of the StreamConsumer instance.

### 3.2.1 - qx.TimeseriesData
This is how you read the data in the standard TimeseriesData format:

In [None]:
def on_stream_received_handler(stream_received: qx.StreamConsumer):
    stream_received.timeseries.on_data_received = on_timeseries_data_received_handler

def on_timeseries_data_received_handler(stream: qx.StreamConsumer, data: qx.TimeseriesData):
  print("Data from stream " + stream.stream_id)
  with data:
    print(data)

topic_consumer = client.get_topic_consumer(topic_name)
topic_consumer.on_stream_received = on_stream_received_handler
qx.App.run()

### 3.2.2 - pd.DataFrame
This is how you read the data in pandas dataframe format.

In [None]:
def on_stream_received_handler(stream_received: qx.StreamConsumer):
  stream_received.timeseries.on_dataframe_received = on_timeseries_data_received_handler

def on_timeseries_data_received_handler(stream: qx.StreamConsumer, df: pd.DataFrame):
  print("Data from stream " + stream.stream_id)
  display(df)

topic_consumer = client.get_topic_consumer(topic_name)
topic_consumer.on_stream_received = on_stream_received_handler
qx.App.run()

In [None]:
df= pd.DataFrame()

def on_stream_received_handler(stream_received: qx.StreamConsumer):
  stream_received.timeseries.on_dataframe_received = on_timeseries_data_received_handler

def on_timeseries_data_received_handler(stream: qx.StreamConsumer, df_i: pd.DataFrame):
  global df
  df = df.append(df_i)
  print("Data from stream " + stream.stream_id)
  display(df_i)

topic_consumer = client.get_topic_consumer(topic_name)
topic_consumer.on_stream_received = on_stream_received_handler
qx.App.run()

In [None]:
df