Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strategies #31

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ venv/
ENV/
env.bak/
venv.bak/
Genv/


# Spyder project settings
.spyderproject
Expand Down
21 changes: 21 additions & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager
from flask_bcrypt import Bcrypt
import threading

db = SQLAlchemy()
jwt = JWTManager()
bcrypt = Bcrypt()

from app.services.backtest_service import run_backtest_by_id
from app.services.kafka_service import kafka_service

def create_app():
app = Flask(__name__)
app.config.from_object('app.config.Config')
Expand All @@ -23,3 +27,20 @@ def create_app():
db.create_all()

return app

def consume_backtest_scenes(app):
def callback(message):
with app.app_context():
backtest_id = message.get('backtest_id')
run_backtest_by_id(backtest_id)

kafka_service.consume('backtest_scenes', callback)

# Start consuming Kafka messages in a separate thread
def start_consumer_thread(app):
consumer_thread = threading.Thread(target=consume_backtest_scenes, args=(app,))
consumer_thread.daemon = True # Allow the thread to be killed when the main program exits
consumer_thread.start()

app = create_app()
start_consumer_thread(app)
2 changes: 1 addition & 1 deletion app/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class Config:
SQLALCHEMY_DATABASE_URI = 'postgresql://test_user:password@localhost/test_db'
SQLALCHEMY_DATABASE_URI = ''
SQLALCHEMY_TRACK_MODIFICATIONS = False
JWT_SECRET_KEY = 'your_secret_key'
15 changes: 3 additions & 12 deletions app/routes/backtest.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from flask import Blueprint, request, jsonify
from app.models.backtest import Backtest, Parameter, Result
import threading
from flask import Blueprint, request, jsonify, current_app
from app.models.backtest import Backtest, Parameter
from app import db
from flask_jwt_extended import jwt_required
from app.services.backtest_service import run_backtest_by_id
from app.services.kafka_service import kafka_service

bp = Blueprint('backtest', __name__)


@bp.route('/backtest', methods=['POST'])
@jwt_required()
def run_backtest():
Expand Down Expand Up @@ -44,12 +44,3 @@ def run_backtest():
return jsonify({"msg": "Backtest created and published to Kafka", "backtest_id": new_backtest.id}), 201


def consume_backtest_scenes():
def callback(message):
backtest_id = message.get('backtest_id')
run_backtest_by_id(backtest_id)

kafka_service.consume('backtest_scenes', callback)


consume_backtest_scenes() # Start consuming Kafka messages in a separate thread
3 changes: 2 additions & 1 deletion app/services/backtest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ def run_backtest_by_id(backtest_id):
backtest = Backtest.query.get(backtest_id)
if not backtest:
return



# Simulate backtest processing
result = Result(
backtest_id=backtest_id,
Expand Down
61 changes: 50 additions & 11 deletions app/services/kafka_service.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,69 @@
from confluent_kafka import Producer, Consumer, KafkaException
import logging
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
import json
from decimal import Decimal

class KafkaService:
def __init__(self, brokers):
self.brokers = brokers
self.producer = Producer({'bootstrap.servers': brokers})
self.consumer = Consumer({
'bootstrap.servers': brokers,
'group.id': 'backtest_group',
'auto.offset.reset': 'earliest'
})
self.admin_client = AdminClient({'bootstrap.servers': brokers})

def create_topic(self, topic):
topic_metadata = self.admin_client.list_topics(timeout=10)
if topic not in topic_metadata.topics:
logging.info(f"Creating topic {topic}")
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
fs = self.admin_client.create_topics([new_topic])
for topic, f in fs.items():
try:
f.result() # The result itself is None
logging.info(f"Topic {topic} created successfully")
except Exception as e:
logging.error(f"Failed to create topic {topic}: {str(e)}")
else:
logging.info(f"Topic {topic} already exists")

def json_serializer(self, obj):
if isinstance(obj, Decimal):
return float(obj)
raise TypeError("Type not serializable")

def produce(self, topic, message):
self.producer.produce(topic, key=None, value=json.dumps(message))
logging.info(f"Producing message to topic {topic}: {message}")
serialized_message = json.dumps(message, default=self.json_serializer)
self.producer.produce(topic, key=None, value=serialized_message)
self.producer.flush()
logging.info("Message produced successfully")

def consume(self, topic, callback):
self.create_topic(topic)
self.consumer.subscribe([topic])
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logging.info(f"Subscribed to topic {topic}")
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
logging.debug("No message received")
continue
else:
raise KafkaException(msg.error())
callback(json.loads(msg.value()))
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logging.info("End of partition reached")
continue
else:
logging.error(f"Consumer error: {msg.error()}")
raise KafkaException(msg.error())
logging.info(f"Received message: {msg.value()}")
callback(json.loads(msg.value()))
except Exception as e:
logging.error(f"Error in Kafka consumer: {str(e)}")
finally:
self.consumer.close()

kafka_service = KafkaService(brokers='localhost:9092')
2 changes: 1 addition & 1 deletion app/services/mlflow_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ def log_metrics(self, run_name, metrics):
for key, value in metrics.items():
mlflow.log_metric(key, value)

mlflow_service = MLflowService(tracking_uri='http://localhost:5000')
mlflow_service = MLflowService(tracking_uri='http://localhost:5050')
Loading