# Realtime Setup

## Steps to launch a Kafka Server and start data streaming

**Step 1**
Spin up a Docker container that contains the Kafka environment

Type into terminal:

`docker-compose up -d`

**Step 2**
Check the Kafka Server
`http://localhost:9021`

**Step 3**
Start streaming data. 

This will stream data from energy_data.csv and weather_data.csv, simulating a continuous inflow of data.

**Step 4**

Check the Kafka Server again!

Which topics do you see?
`http://localhost:9021`

**Step 5** 

Stop the process

Press `ctrl + c` in Terminal

**Want to repeat the process?**
Go to `http://localhost:9021` and delete the topics `combined_data` and `feature_store` to start over again.

## Consuming Data from Kafka using Quix

In [2]:
# Test streaming

from quixstreams import Application

# Create an Application - the main configuration entry point
app = Application(
    broker_address="localhost:9092",
    consumer_group="demand_forecasting",
    auto_offset_reset="earliest",
)

# Define the input topic with JSON deserialization
input_topic = app.topic('combined_data', value_deserializer='json')

# Create a StreamingDataFrame connected to the input topic
sdf = app.dataframe(topic=input_topic)

# Use the .update() method to print each message to the console
sdf = sdf.update(print)

# Run the pipeline
app.run(sdf)

[2024-06-18 21:57:45,014] [INFO] : Starting the Application with the config: broker_address="localhost:9092" consumer_group="demand_forecasting" auto_offset_reset="earliest" commit_interval=5.0s
[2024-06-18 21:57:45,015] [INFO] : Topics required for this application: "combined_data"
[2024-06-18 21:57:45,039] [INFO] : Creating a new topic "combined_data" with config: "{'num_partitions': 1, 'replication_factor': 1, 'extra_config': {}}"
[2024-06-18 21:57:46,043] [INFO] : Topic "combined_data" has been created
[2024-06-18 21:57:46,044] [INFO] : Validating Kafka topics exist and are configured correctly...
[2024-06-18 21:57:46,050] [INFO] : Kafka topics validation complete
[2024-06-18 21:57:46,051] [INFO] : Initializing state directory at "/workspaces/lil-time-series/real-time/state/demand_forecasting"
[2024-06-18 21:57:46,052] [INFO] : Waiting for incoming messages
[2024-06-18 21:59:28,191] [INFO] : Committing a checkpoint force=False
[2024-06-18 21:59:28,192] [INFO] : Committed a checkpoi

{'Period': '2024-01-01T00', 'Type': 'energy', 'Value': 5536.0}
{'Period': '2024-01-01 00:00:00', 'Type': 'temperature', 'Value': -27.0}
{'Period': '2024-01-01T01', 'Type': 'energy', 'Value': 5417.0}


[2024-06-18 21:59:34,193] [INFO] : Committing a checkpoint force=False
[2024-06-18 21:59:34,195] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 01:00:00', 'Type': 'temperature', 'Value': -26.4}
{'Period': '2024-01-01T02', 'Type': 'energy', 'Value': 5257.0}


[2024-06-18 21:59:39,197] [INFO] : Committing a checkpoint force=False
[2024-06-18 21:59:39,199] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 02:00:00', 'Type': 'temperature', 'Value': -27.1}
{'Period': '2024-01-01T03', 'Type': 'energy', 'Value': 5112.0}


[2024-06-18 21:59:45,200] [INFO] : Committing a checkpoint force=False
[2024-06-18 21:59:45,202] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 03:00:00', 'Type': 'temperature', 'Value': -27.1}
{'Period': '2024-01-01T04', 'Type': 'energy', 'Value': 4956.0}


[2024-06-18 21:59:50,203] [INFO] : Committing a checkpoint force=False
[2024-06-18 21:59:50,205] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 04:00:00', 'Type': 'temperature', 'Value': -25.1}
{'Period': '2024-01-01T05', 'Type': 'energy', 'Value': 4795.0}


[2024-06-18 21:59:55,207] [INFO] : Committing a checkpoint force=False
[2024-06-18 21:59:55,209] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 05:00:00', 'Type': 'temperature', 'Value': -18.3}
{'Period': '2024-01-01T06', 'Type': 'energy', 'Value': 4650.0}


[2024-06-18 22:00:00,209] [INFO] : Committing a checkpoint force=False
[2024-06-18 22:00:00,212] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 06:00:00', 'Type': 'temperature', 'Value': -10.5}
{'Period': '2024-01-01T07', 'Type': 'energy', 'Value': 4538.0}


[2024-06-18 22:00:05,212] [INFO] : Committing a checkpoint force=False
[2024-06-18 22:00:05,214] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 07:00:00', 'Type': 'temperature', 'Value': -10.8}
{'Period': '2024-01-01T08', 'Type': 'energy', 'Value': 4396.0}


[2024-06-18 22:00:10,216] [INFO] : Committing a checkpoint force=False
[2024-06-18 22:00:10,218] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 08:00:00', 'Type': 'temperature', 'Value': -9.7}
{'Period': '2024-01-01T09', 'Type': 'energy', 'Value': 4295.0}


[2024-06-18 22:00:15,220] [INFO] : Committing a checkpoint force=False
[2024-06-18 22:00:15,222] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 09:00:00', 'Type': 'temperature', 'Value': -8.3}
{'Period': '2024-01-01T10', 'Type': 'energy', 'Value': 4227.0}


[2024-06-18 22:00:20,224] [INFO] : Committing a checkpoint force=False
[2024-06-18 22:00:20,226] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 10:00:00', 'Type': 'temperature', 'Value': -7.7}
{'Period': '2024-01-01T11', 'Type': 'energy', 'Value': 4285.0}


[2024-06-18 22:00:25,228] [INFO] : Committing a checkpoint force=False
[2024-06-18 22:00:25,230] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 11:00:00', 'Type': 'temperature', 'Value': -8.6}
{'Period': '2024-01-01T12', 'Type': 'energy', 'Value': 4412.0}


[2024-06-18 22:00:30,233] [INFO] : Committing a checkpoint force=False
[2024-06-18 22:00:30,235] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 12:00:00', 'Type': 'temperature', 'Value': -9.0}
{'Period': '2024-01-01T13', 'Type': 'energy', 'Value': 4490.0}


[2024-06-18 22:00:35,237] [INFO] : Committing a checkpoint force=False
[2024-06-18 22:00:35,239] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 13:00:00', 'Type': 'temperature', 'Value': -10.0}
{'Period': '2024-01-01T14', 'Type': 'energy', 'Value': 4613.0}


[2024-06-18 22:00:40,241] [INFO] : Committing a checkpoint force=False
[2024-06-18 22:00:40,243] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 14:00:00', 'Type': 'temperature', 'Value': -11.3}
{'Period': '2024-01-01T15', 'Type': 'energy', 'Value': 4724.0}


[2024-06-18 22:00:45,245] [INFO] : Committing a checkpoint force=False
[2024-06-18 22:00:45,247] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 15:00:00', 'Type': 'temperature', 'Value': -12.8}
{'Period': '2024-01-01T16', 'Type': 'energy', 'Value': 4855.0}


[2024-06-18 22:00:50,248] [INFO] : Committing a checkpoint force=False
[2024-06-18 22:00:50,250] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 16:00:00', 'Type': 'temperature', 'Value': -14.2}
{'Period': '2024-01-01T17', 'Type': 'energy', 'Value': 4886.0}


[2024-06-18 22:00:55,251] [INFO] : Committing a checkpoint force=False
[2024-06-18 22:00:55,253] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 17:00:00', 'Type': 'temperature', 'Value': -15.0}
{'Period': '2024-01-01T18', 'Type': 'energy', 'Value': 4927.0}


[2024-06-18 22:01:00,256] [INFO] : Committing a checkpoint force=False
[2024-06-18 22:01:00,258] [INFO] : Committed a checkpoint force=False time_elapsed=0.0s


{'Period': '2024-01-01 18:00:00', 'Type': 'temperature', 'Value': -15.7}


[2024-06-18 22:01:01,259] [INFO] : Stop processing of StreamingDataFrame
[2024-06-18 22:01:01,260] [INFO] : Committing a checkpoint force=True
[2024-06-18 22:01:01,262] [INFO] : Committed a checkpoint force=True time_elapsed=0.0s


## Create Real-time Feature Store

The following pipeline will listen to incoming messages, apply a window function, create our features and store them to a new topic.

In [3]:
from quixstreams import Application
from datetime import datetime
import pandas as pd
import json

from datetime import datetime

def parse_period(period_str):
    # Define possible date formats
    date_formats = [
        '%Y-%m-%d %H:%M:%S',  # Format with seconds
        '%Y-%m-%d %H:%M',     # Format without seconds
        '%Y-%m-%dT%H',        # ISO-like format without minutes and seconds
        '%Y-%m-%dT%H:%M:%S'   # Full ISO-like format
    ]

    for date_format in date_formats:
        try:
            return datetime.strptime(period_str, date_format)
        except ValueError:
            continue  # Try the next format

    # If all formats fail, return None or log an error
    print(f"Failed to parse date: {period_str}")
    return None

def process_data(value, state, producer, feature_store_topic):
    period_dt = parse_period(value['Period'])
    
    if period_dt is None:
        print("Error parsing the period date.")
        return  # Skip processing for this message due to parsing error
    
    # Define the ID for the feature store as the DateTimeHour
    feature_record_id = period_dt.strftime('%Y-%m-%d %H')

    if value['Type'] == 'temperature':
        # Directly produce the temperature to the feature store
        message = {
            'id': feature_record_id,
            'temperature_forecast': value['Value'],
            'hour': period_dt.hour,
            'day_of_week': period_dt.weekday(),
            'month': period_dt.month
        }

        producer.produce(topic=feature_store_topic.name, key=feature_record_id, value=json.dumps(message))
        #print("Message for temperature:", message)

    elif value['Type'] == 'energy':
        
        # Manage the energy window
        window = state.get('energy_window', [])
        window.append(value['Value'])
        if len(window) > 25:
            window.pop(0)
        state.set('energy_window', window)

        # Compute features
        features = calculate_lags(window, period_dt)

        message = {
            "id": feature_record_id,
            **features
        }
        #print("Message for energy:", message)

        # Serialize and produce the features to Kafka
        producer.produce(topic=feature_store_topic.name, key=feature_record_id, value=json.dumps(message))

def calculate_lags(window, period_dt):
    # Convert window to a DataFrame
    df = pd.DataFrame({'value': window})
    
    features = {
        'lag_1': float(df['value'].shift(1).iloc[-1]) if len(df) > 1 else None,
        'lag_2': float(df['value'].shift(2).iloc[-1]) if len(df) > 2 else None,
        'lag_6': float(df['value'].shift(6).iloc[-1]) if len(df) > 6 else None,
        'lag_12': float(df['value'].shift(12).iloc[-1]) if len(df) > 12 else None,
        'lag_24': float(df['value'].shift(24).iloc[-1]) if len(df) > 24 else None,
        'rolling_mean_7': float(df['value'].rolling(window=7).mean().iloc[-1]) if len(df) >= 7 else None,
        'rolling_std_7': float(df['value'].rolling(window=7).std().iloc[-1]) if len(df) >= 7 else None,
    }
    
    return features


app = Application(broker_address='localhost:9092', consumer_group='example')

# Define a topic for real-time feature storage with JSON serialization
feature_store_topic = app.topic(name='feature_store', value_serializer='json')

# Combined data topic setup
combined_data_topic = app.topic(name='combined_data', value_deserializer='json')

# Producer for sending feature data
with app.get_producer() as producer:
    # Simulate or integrate data handling and processing
    data_df = app.dataframe(topic=combined_data_topic).update(
        lambda value, state: process_data(value, state, producer, feature_store_topic), 
        stateful=True
    )
    app.run(data_df)

[2024-06-18 22:01:31,694] [INFO] : Topics required for this application: "feature_store", "combined_data"
[2024-06-18 22:01:31,698] [INFO] : Creating a new topic "feature_store" with config: "{'num_partitions': 1, 'replication_factor': 1, 'extra_config': {}}"
[2024-06-18 22:01:32,700] [INFO] : Topic "feature_store" has been created
[2024-06-18 22:01:32,701] [INFO] : Validating Kafka topics exist and are configured correctly...
[2024-06-18 22:01:32,708] [INFO] : Kafka topics validation complete
[2024-06-18 22:01:32,713] [INFO] : Starting the Application with the config: broker_address="localhost:9092" consumer_group="example" auto_offset_reset="latest" commit_interval=5.0s
[2024-06-18 22:01:32,714] [INFO] : Topics required for this application: "feature_store", "combined_data", "changelog__example--combined_data--default"
[2024-06-18 22:01:32,718] [INFO] : Creating a new topic "changelog__example--combined_data--default" with config: "{'num_partitions': 1, 'replication_factor': 1, 'extr