# 📈 Building an AI Real-Time Trading System with Kafka & LlamaIndex

## 1) Download EUR/USD Daily Image

In [1]:
!pip install selenium
!apt-get install chromium-driver

Collecting selenium
  Downloading selenium-4.26.1-py3-none-any.whl.metadata (7.1 kB)
Collecting trio~=0.17 (from selenium)
  Downloading trio-0.27.0-py3-none-any.whl.metadata (8.6 kB)
Collecting trio-websocket~=0.9 (from selenium)
  Downloading trio_websocket-0.11.1-py3-none-any.whl.metadata (4.7 kB)
Collecting sortedcontainers (from trio~=0.17->selenium)
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting outcome (from trio~=0.17->selenium)
  Downloading outcome-1.3.0.post0-py2.py3-none-any.whl.metadata (2.6 kB)
Collecting wsproto>=0.14 (from trio-websocket~=0.9->selenium)
  Downloading wsproto-1.2.0-py3-none-any.whl.metadata (5.6 kB)
Downloading selenium-4.26.1-py3-none-any.whl (9.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m9.7/9.7 MB[0m [31m28.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading trio-0.27.0-py3-none-any.whl (481 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m481.7/481.7 kB[0m [31m22.7 MB/s

In [2]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

def initialize_web_driver():
    """Sets up and returns a configured Chrome WebDriver instance."""
    options = webdriver.ChromeOptions()
    options.add_argument('--verbose')
    options.add_argument('--no-sandbox')
    options.add_argument('--headless')
    options.add_argument('--disable-gpu')
    options.add_argument('--window-size=1920,1200')
    options.add_argument('--disable-dev-shm-usage')
    driver = webdriver.Chrome(options=options)
    return driver

driver = initialize_web_driver()

try:
    # Navigate to the page
    driver.get("https://www.tradingview.com/symbols/EURUSD/")

    # Wait a few seconds for the page to load fully
    time.sleep(4)  # Adjust sleep duration if necessary
    driver.refresh()
    time.sleep(4)

    # Locate the chart using an appropriate selector
    chart_element = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.CLASS_NAME, "container-nORFfEfo"))
    )

    # Capture a screenshot of the chart element
    time.sleep(4)
    chart_element.screenshot("eurusd_chart.png")
    print("Chart screenshot saved as 'eurusd_chart.png'.")

except Exception as e:
    print("An error occurred:", e)

finally:
    # Close the browser
    driver.quit()

Chart screenshot saved as 'eurusd_chart.png'.


## 2) Setup Kafka

In [1]:
!wget https://downloads.apache.org/kafka/3.8.1/kafka_2.13-3.8.1.tgz
!tar -xzf kafka_2.13-3.8.1.tgz
!./kafka_2.13-3.8.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.8.1/config/zookeeper.properties
!./kafka_2.13-3.8.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.8.1/config/server.properties
!sleep 10
#remove topic
!./kafka_2.13-3.8.1/bin/kafka-topics.sh --delete --topic eurusd_bidask --bootstrap-server localhost:9092
!./kafka_2.13-3.8.1/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic eurusd_bidask


--2024-11-12 12:25:58--  https://downloads.apache.org/kafka/3.8.1/kafka_2.13-3.8.1.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.208.237, 135.181.214.104, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.208.237|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 121178579 (116M) [application/x-gzip]
Saving to: ‘kafka_2.13-3.8.1.tgz.3’


2024-11-12 12:26:00 (83.2 MB/s) - ‘kafka_2.13-3.8.1.tgz.3’ saved [121178579/121178579]

Created topic eurusd_bidask.


Test Kafka before integrating in LlamaIndex

In [3]:
!pip install kafka-python nest_asyncio

import json
import requests
import time
import pandas as pd
from kafka import KafkaProducer, KafkaConsumer
from bs4 import BeautifulSoup
import asyncio
import nest_asyncio
import threading

# Apply nest_asyncio for environments with an already running event loop (e.g., Jupyter/Colab)
nest_asyncio.apply()

# Control variable to stop the loop
stop_flag = False

# Kafka Producer Configuration
async def kafka_producer():
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

    async def fetch_and_send_bid_ask():
        url = 'https://www.investing.com/currencies/eur-usd-spreads'
        headers = {"User-Agent": "Mozilla/5.0"}
        response = requests.get(url, headers=headers)

        # Check if the response is successful
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')
            bid_element = soup.find("span", class_="inlineblock pid-1-bid")
            ask_element = soup.find("span", class_="inlineblock pid-1-ask")

            # Check if bid and ask elements were found
            if bid_element and ask_element:
                bid_value = float(bid_element.text.replace(',', ''))
                ask_value = float(ask_element.text.replace(',', ''))
                message = {'bid': bid_value, 'ask': ask_value}

                # Send message to Kafka
                producer.send('eurusd_bidask', value=message)
                producer.flush()
                print(f"Producer sent bid: {bid_value}, ask: {ask_value}")
            else:
                print("Error: Could not find bid/ask elements on the page.")
        else:
            print(f"Error fetching data: Status code {response.status_code}")

    # Infinite loop to capture and send data every 5 seconds
    while not stop_flag:
        await fetch_and_send_bid_ask()
        await asyncio.sleep(5)

# Kafka Consumer Configuration
def kafka_consumer_bot():
    consumer = KafkaConsumer(
        'eurusd_bidask',
        bootstrap_servers=['localhost:9092'],
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        auto_offset_reset='latest',
        enable_auto_commit=False,
        group_id='my-group'
    )

    df = pd.DataFrame(columns=['Bid', 'Ask', 'Mid_Price'], dtype=float)

    print("Starting Kafka consumer... Listening for messages on the 'eurusd_bidask' topic.")

    for msg in consumer:
        if stop_flag:
            break

        # Debug message to show that a message has been received
        print(f"Consumer received raw message: {msg.value}")

        # Extract bid and ask values
        bid = float(msg.value['bid'])
        ask = float(msg.value['ask'])
        mid_price = (bid + ask) / 2

        # Add new row to DataFrame and print the updated DataFrame
        new_row = pd.DataFrame({'Bid': [bid], 'Ask': [ask], 'Mid_Price': [mid_price]})
        df = pd.concat([df, new_row], ignore_index=True)

        print(f"Consumer processed bid: {bid:.4f}, ask: {ask:.4f}, mid_price: {mid_price:.4f}")
        print("Updated DataFrame:")
        print(df)

        # Keep only the last 50 rows
        if len(df) > 50:
            df = df.iloc[-50:].reset_index(drop=True)

        # Small sleep to avoid rapid polling in this example
        time.sleep(5)

# Main function to run both producer and consumer
async def main():
    # Start the producer in the event loop
    producer_task = asyncio.create_task(kafka_producer())

    # Start the consumer in a separate thread
    consumer_thread = threading.Thread(target=kafka_consumer_bot, daemon=True)
    consumer_thread.start()

    try:
        await producer_task
    except asyncio.CancelledError:
        pass
    finally:
        consumer_thread.join()

# Run the main function
try:
    asyncio.run(main())
except KeyboardInterrupt:
    print("Program terminated by the user.")


[31mERROR: Operation cancelled by user[0m[31m
[0m^C
Starting Kafka consumer... Listening for messages on the 'eurusd_bidask' topic.
Consumer received raw message: {'bid': 1.0609, 'ask': 1.061}
Consumer processed bid: 1.0609, ask: 1.0610, mid_price: 1.0610
Updated DataFrame:
Producer sent bid: 1.0609, ask: 1.061
       Bid     Ask  Mid_Price
0   1.0610  1.0611    1.06105
1   1.0610  1.0611    1.06105
2   1.0610  1.0611    1.06105
3   1.0610  1.0611    1.06105
4   1.0610  1.0611    1.06105
5   1.0610  1.0611    1.06105
6   1.0610  1.0611    1.06105
7   1.0610  1.0611    1.06105
8   1.0610  1.0611    1.06105
9   1.0609  1.0610    1.06095
10  1.0609  1.0610    1.06095




Program terminated by the user.


## 3) Setup LlamaIndex Workflow

In [2]:
# Install necessary packages
!pip install llama-index-core llama-index-llms-openai llama-index-multi-modal-llms-openai
!pip install llama_index.readers.file
!pip install pandas_ta
!pip install kafka-python




Get the current library versions

In [4]:
!pip freeze
!pip show selenium
!pip show llama-index-core
!pip show llama-index-llms-openai
!pip show llama-index-multi-modal-llms-openai
!pip show llama_index.readers.file
!pip show pandas_ta
!pip show kafka-python


absl-py==1.4.0
accelerate==0.34.2
aiohappyeyeballs==2.4.3
aiohttp==3.10.10
aiosignal==1.3.1
alabaster==0.7.16
albucore==0.0.19
albumentations==1.4.20
altair==4.2.2
annotated-types==0.7.0
anyio==3.7.1
argon2-cffi==23.1.0
argon2-cffi-bindings==21.2.0
array_record==0.5.1
arviz==0.20.0
astropy==6.1.4
astropy-iers-data==0.2024.11.4.0.33.34
astunparse==1.6.3
async-timeout==4.0.3
atpublic==4.1.0
attrs==24.2.0
audioread==3.0.1
autograd==1.7.0
babel==2.16.0
backcall==0.2.0
beautifulsoup4==4.12.3
bigframes==1.25.0
bigquery-magics==0.4.0
bleach==6.2.0
blinker==1.4
blis==0.7.11
blosc2==2.0.0
bokeh==3.4.3
Bottleneck==1.4.2
bqplot==0.12.43
branca==0.8.0
CacheControl==0.14.1
cachetools==5.5.0
catalogue==2.0.10
certifi==2024.8.30
cffi==1.17.1
chardet==5.2.0
charset-normalizer==3.4.0
chex==0.1.87
clarabel==0.9.0
click==8.1.7
cloudpathlib==0.20.0
cloudpickle==3.1.0
cmake==3.30.5
cmdstanpy==1.2.4
colorcet==3.1.0
colorlover==0.3.0
colour==0.1.5
community==1.0.0b1
confection==0.1.5
cons==0.4.6
contourpy==1

In [3]:
import requests
from bs4 import BeautifulSoup
import json
import pandas as pd
import pandas_ta as ta
import asyncio
import threading
import nest_asyncio
import time
import os
from kafka import KafkaProducer, KafkaConsumer
from llama_index.core.program import LLMTextCompletionProgram
from llama_index.multi_modal_llms.openai import OpenAIMultiModal
from llama_index.core import SimpleDirectoryReader
from llama_index.core.workflow import Workflow, step, Event, Context
from llama_index.core.bridge.pydantic import BaseModel, Field
from llama_index.llms.openai import OpenAI
from typing import Optional, Any


# Setup for Colab environment compatibility
nest_asyncio.apply()

# Set OpenAI API key in the environment
os.environ["OPENAI_API_KEY"] = 'YOUR_OPENAI_API_KEY'  # Replace with your API


# Variable de control para detener el bucle
stop_flag = False


def stop_listener():
    global stop_flag
    while True:
        user_input = input("Write 'stop' to stop: ")
        if user_input.lower() == 'stop':
            stop_flag = True
            print("Stopping the program...")
            break

# Start a separate thread to listen for stop commands
threading.Thread(target=stop_listener, daemon=True).start()


class TradingDecisionResult(BaseModel):
    """
    Model to store the result of trading decisions.
    """
    decision: str = Field(description="Trading decision: 'buy', 'sell' o 'hold'.")
    reasoning: str = Field(description="Reasoning behind the decision.")

# Define the Workflow
class InvestmentBotWorkflow(Workflow):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.image_analysis_done = False  # Indicator to know if the image analysis has already been done

    @step
    async def analyze_image(self, ctx: Context, ev: Event) -> None:
        #  Track if the image analysis has already been completed
        if not self.image_analysis_done:
            print("Performing image analysis...")
            openai_mm_llm = OpenAIMultiModal(
                model="gpt-4o", api_key=os.environ["OPENAI_API_KEY"], max_new_tokens=512
            )
            image_path = "/content/eurusd_chart.png"
            image_documents = SimpleDirectoryReader(input_files=[image_path]).load_data()

            response = openai_mm_llm.complete(
                prompt="Analyze the following EUR/USD (daily) chart. Provide a detailed description of any patterns or trends observed, highlighting key price levels. This comment will support a real-time algorithm capturing bid-ask data every 5 seconds.",
                image_documents=image_documents,
            )



            # Extract response text and save it in the context
            response_text = response.text if hasattr(response, "text") else str(response)

            await ctx.set("image_analysis", response_text)
            self.image_analysis_done = True
            print("Image analysis completed: " + response_text)




    @step
    async def analyze_data(self, ctx: Context, ev: Event) -> None:
        """
        Perform data analysis and generate a trading decision based on technical indicators.
        """
        df = await ctx.get('df')
        if df is None or df.empty:
            print("DataFrame 'df' is unavailable or empty.")
            return

        # Retrieve stored image analysis
        image_analysis = await ctx.get("image_analysis", "Image analysis unavailable.")

        # Ensure sufficient data for technical indicators
        if len(df) < 6:
            print("Insufficient data for technical indicators.")
            return

        # Adjust indicator periods based on available data
        ema_period = min(5, len(df))
        rsi_period = min(5, len(df))
        bb_period = min(5, len(df))

        # Calculate technical indicators
        df['EMA_5'] = ta.ema(df['Mid_Price'], length=ema_period)
        df['RSI'] = ta.rsi(df['Mid_Price'], length=rsi_period)
        bb = ta.bbands(df['Mid_Price'], length=bb_period, std=1.5)

        if bb is not None and not bb.empty:
            # Get the names of the generated columns
            bb_columns = bb.columns.tolist()

            # Filter the required columns
            bbl_column = [col for col in bb_columns if col.startswith('BBL')][0]
            bbm_column = [col for col in bb_columns if col.startswith('BBM')][0]
            bbu_column = [col for col in bb_columns if col.startswith('BBU')][0]

            # Select columns and rename them
            bb_selected = bb[[bbl_column, bbm_column, bbu_column]]
            bb_selected.columns = ['BBL', 'BBM', 'BBU']

            # Concatenate with the original DataFrame
            df = pd.concat([df, bb_selected], axis=1)
        else:
            df['BBL'] = df['BBM'] = df['BBU'] = None

        # Get the last 5 prices
        last_prices = df['Mid_Price'].tail(5).tolist()

        # Get the latest technical indicators
        indicators = {}
        latest_data = df.iloc[-1]

        # Manage possible NaN in the indicators
        indicators['EMA_5'] = latest_data.get('EMA_5', 'Not available')
        indicators['RSI'] = latest_data.get('RSI', 'Not available')
        indicators['BBL'] = latest_data.get('BBL', 'Not available')
        indicators['BBM'] = latest_data.get('BBM', 'Not available')
        indicators['BBU'] = latest_data.get('BBU', 'Not available')

        for key, value in indicators.items():
            if pd.isna(value):
                indicators[key] = 'Not available'

        # Prepare the prompt for GPT-3
        prompt = (
            f"Analysis of the latest prices: {last_prices}\n"
            f"Latest technical indicators:\n"
            f"EMA_5: {indicators['EMA_5']}\n"
            f"RSI: {indicators['RSI']}\n"
            f"BBL: {indicators['BBL']}\n"
            f"BBM: {indicators['BBM']}\n"
            f"BBU: {indicators['BBU']}\n\n"
            f"EURUSD daily chart analysis:\n{image_analysis}\n\n"
            "Based on the above analysis, it provides a trading decision:('buy', 'sell' o 'hold') "
            "and explain your reasoning concisely."
        )

        # Use GPT-3.5 to obtain the decision
        try:

            llm_gpt3 = OpenAI(model="gpt-3.5-turbo", max_new_tokens=512)

            program = LLMTextCompletionProgram.from_defaults(
                output_cls=TradingDecisionResult,
                prompt_template_str=prompt,
                llm=llm_gpt3,
            )

            trading_decision_result = program()
            decision = trading_decision_result.decision
            reasoning = trading_decision_result.reasoning

            print(f"Trading decision: {decision}")
            print(f"Reasoning: {reasoning}")

        except Exception as e:
            print(f"Error during analysis with GPT-3: {e}")

# Kafka's producer configuration
async def kafka_producer():
    producer = KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

    async def fetch_and_send_bid_ask():
        url = 'https://www.investing.com/currencies/eur-usd-spreads'
        headers = {"User-Agent": "Mozilla/5.0"}
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')
            bid_element = soup.find("span", class_="inlineblock pid-1-bid")
            ask_element = soup.find("span", class_="inlineblock pid-1-ask")
            if bid_element and ask_element:
                bid_value = float(bid_element.text.replace(',', ''))
                ask_value = float(ask_element.text.replace(',', ''))
                message = {'bid': bid_value, 'ask': ask_value}
                producer.send('eurusd_bidask', value=message)
                producer.flush()
                print(f"Producer sent bid: {bid_value}, ask: {ask_value}")
        else:
            print(f"Error: {response.status_code}")

    # Infinite loop to capture and send data every 5 seconds
    while not stop_flag:
        await fetch_and_send_bid_ask()
        await asyncio.sleep(5)

# Kafka's consumer configuration
def kafka_consumer_bot():
    consumer = KafkaConsumer(
        'eurusd_bidask',
        bootstrap_servers=['localhost:9092'],
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        auto_offset_reset='latest',
        enable_auto_commit=False,
        group_id='my-group'
    )

    df = pd.DataFrame(columns=['Bid', 'Ask', 'Mid_Price'], dtype=float)

    #  Initialize Workflow
    bot_workflow = InvestmentBotWorkflow()
    context = Context(workflow=bot_workflow)

    # Performing image analysis before starting the loop
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(bot_workflow.analyze_image(context, Event()))
    loop.close()

    # Variable to control the start time
    data_collection_start_time = time.time()
    data_collection_duration = 30

    for msg in consumer:
        if stop_flag:
            break
        bid = float(msg.value['bid'])
        ask = float(msg.value['ask'])
        mid_price = (bid + ask) / 2
        new_row = pd.DataFrame({'Bid': [bid], 'Ask': [ask], 'Mid_Price': [mid_price]})
        df = pd.concat([df, new_row], ignore_index=True)

        print(f"Consumer received bid: {bid:.4f}, ask: {ask:.4f}, mid_price: {mid_price:.4f}")
        print(df)

        # Hold only the last 50 rows
        if len(df) > 50:
            df = df.iloc[-50:].reset_index(drop=True)

        # Wait until sufficient data is available
        if time.time() - data_collection_start_time < data_collection_duration:
            print("Collecting data... Waiting to start the analysis.")
            time.sleep(5)
            continue  # Do not run the analysis until you have sufficient data
        # Perform analysis every 30 seconds
        if len(df) > 4:  # Ensure that you have sufficient data for indicators
            # Pass the DataFrame to the context and run the analysis.
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            loop.run_until_complete(context.set('df', df.copy()))
            loop.run_until_complete(bot_workflow.analyze_data(context, Event()))
            loop.close()
            #  Reset timer for next analysis
            data_collection_start_time = time.time()
        else:
            print("Not enough data for analysis")

        time.sleep(5)

# Execute producer and consumer together
def main():
    # Execute the producer in the event loop
    loop = asyncio.get_event_loop()
    producer_task = loop.create_task(kafka_producer())

   # Start the consumer in a separate thread
    consumer_thread = threading.Thread(target=kafka_consumer_bot, daemon=True)
    consumer_thread.start()

    try:
        loop.run_until_complete(producer_task)
    except asyncio.CancelledError:
        pass
    finally:
        loop.close()
        consumer_thread.join()

# Runs the main process
try:
    main()
except KeyboardInterrupt:
    print("Program completed by the user.")

Performing image analysis...
Producer sent bid: 1.0625, ask: 1.0626
Producer sent bid: 1.0625, ask: 1.0626
Image analysis completed: The EUR/USD daily chart shows the following patterns and trends:

1. **Downtrend**: The overall trend is downward, with a series of lower highs and lower lows. This indicates a bearish market sentiment over the observed period.

2. **Key Resistance Levels**: 
   - Around 1.0570: The price has tested this level multiple times but failed to break above it, indicating strong resistance.

3. **Key Support Levels**:
   - Around 1.0450: The price has bounced off this level, suggesting it is a significant support zone.

4. **Recent Rebound**: There is a noticeable rebound from the support level around 1.0450, suggesting a potential short-term bullish correction within the larger downtrend.

5. **Volatility**: The chart shows periods of increased volatility, with sharp price movements, particularly during the rebounds.

6. **Potential Patterns**:
   - There might

RuntimeError: Cannot close a running event loop

Thanks for making it to the end, and happy coding! 🎉💻