In [1]:
import os
os.chdir("../../../")
from src.backtesting.run_backtest import prepare_and_run_backtest

result=prepare_and_run_backtest()
result

[*********************100%%**********************]  1 of 1 completed


{'sharpe_ratio': -58.66063557429906,
 'return': 0.0016255774658640904,
 'max_drawdown': 0.023526728407521492,
 'win_trade': 7,
 'loss_trade': 3,
 'total_trade': 11,
 'start_portfolio': 100000,
 'final_portfolio': 100162.6899433136}

In [2]:
# The backtest results dictionary
results = {
    'sharpe_ratio': -58.66063557429906,
    'return': 0.0016255774658640904,
    'max_drawdown': 0.023526728407521492,
    'win_trade': 7,
    'loss_trade': 3,
    'total_trade': 11,
    'start_portfolio': 100000,
    'final_portfolio': 100162.6899433136
}


## Send backtest results to Kafka

In [5]:
from confluent_kafka import Producer
import json
import socket  # For getting hostname

# Kafka Configuration
conf = {
    'bootstrap.servers': 'localhost:9094',  # Kafka broker address
    'client.id': socket.gethostname(),     # Unique identifier for this producer
}

# Optional Producer Configurations (Best Practices)
conf.update({
    'acks': 'all',       # Strongest delivery guarantee (all brokers acknowledge)
    'retries': 3,         # Number of retries for failed messages
    'batch.size': 16384, # Batch messages for efficiency (can be adjusted)
    'linger.ms': 100,     # Wait briefly for more messages to batch (this can be adjusted)
})

# Create the Producer
producer = Producer(conf)

# Topic Name
topic_name = 'backtest_results_topic'

# Prepare Data
message_value = json.dumps(results) # Convert dictionary to JSON string

# Send the Message
try:
    producer.produce(topic_name, value=message_value)
    producer.flush()  # Ensure all messages are delivered
    print("Message sent to KAFKA successfully!")
except Exception as e:
    print(f"Error producing message: {e}")


Message sent to KAFKA successfully!


### Kafka Consumer for backtest results

In [2]:
from confluent_kafka import Consumer
import json

# Kafka Configuration
conf = {
    'bootstrap.servers': 'localhost:9094',
    'group.id': 'backtest_results_consumer_group',
    'auto.offset.reset': 'earliest'  # Start consuming from the beginning of the topic
}

topic_name = 'backtest_results'

# Create the Consumer
consumer = Consumer(conf)
consumer.subscribe([topic_name])

# Read the Messages
try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue

        message_value = json.loads(msg.value())
        print(f"Received message: {message_value}")
    # process the message_value in postgres
except KeyboardInterrupt:
    pass
finally:
    consumer.close()



Received message: {'sharpe_ratio': -58.66063557429906, 'return': 0.0016255774658640904, 'max_drawdown': 0.023526728407521492, 'win_trade': 7, 'loss_trade': 3, 'total_trade': 11, 'start_portfolio': 100000, 'final_portfolio': 100162.6899433136}
Received message: {'sharpe_ratio': -58.66063557429906, 'return': 0.0016255774658640904, 'max_drawdown': 0.023526728407521492, 'win_trade': 7, 'loss_trade': 3, 'total_trade': 11, 'start_portfolio': 100000, 'final_portfolio': 100162.6899433136}
Received message: {'sharpe_ratio': -58.66063557429906, 'return': 0.0016255774658640904, 'max_drawdown': 0.023526728407521492, 'win_trade': 7, 'loss_trade': 3, 'total_trade': 11, 'start_portfolio': 100000, 'final_portfolio': 100162.6899433136}


### Reusable functions for kafka producer and consumer

In [1]:
from api.app import create_kafka_producer, send_message_to_kafka, create_kafka_consumer, consume_messages

# Create Producer
producer = create_kafka_producer()

# Send Results
topic_name = 'backtest_results'

send_message_to_kafka(producer, 'backtest_results', results)


In [5]:
# Create Consumer
consumer = create_kafka_consumer()

# Read the Messages
for message in consume_messages(consumer, 'backtest_results'):
    print(f"Received message: {message}")
    # Process the message in postgres (insert into db)

Received message: {'sharpe_ratio': -58.66063557429906, 'return': 0.0016255774658640904, 'max_drawdown': 0.023526728407521492, 'win_trade': 7, 'loss_trade': 3, 'total_trade': 11, 'start_portfolio': 100000, 'final_portfolio': 100162.6899433136}
Received message: {'sharpe_ratio': -58.66063557429906, 'return': 0.0016255774658640904, 'max_drawdown': 0.023526728407521492, 'win_trade': 7, 'loss_trade': 3, 'total_trade': 11, 'start_portfolio': 100000, 'final_portfolio': 100162.6899433136}
