Skip to content

Commit

Permalink
Merge pull request #30 from 10-academy-w-9/frontend
Browse files Browse the repository at this point in the history
Frontend init
  • Loading branch information
AbYT101 committed Jun 22, 2024
2 parents fe815e6 + a6cbefa commit 87eec81
Show file tree
Hide file tree
Showing 36 changed files with 20,687 additions and 1,237 deletions.
21 changes: 21 additions & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager
from flask_bcrypt import Bcrypt
import threading

db = SQLAlchemy()
jwt = JWTManager()
bcrypt = Bcrypt()

from app.services.backtest_service import run_backtest_by_id
from app.services.kafka_service import kafka_service

def create_app():
app = Flask(__name__)
app.config.from_object('app.config.Config')
Expand All @@ -23,3 +27,20 @@ def create_app():
db.create_all()

return app

def consume_backtest_scenes(app):
def callback(message):
with app.app_context():
backtest_id = message.get('backtest_id')
run_backtest_by_id(backtest_id)

kafka_service.consume('backtest_scenes', callback)

# Start consuming Kafka messages in a separate thread
def start_consumer_thread(app):
consumer_thread = threading.Thread(target=consume_backtest_scenes, args=(app,))
consumer_thread.daemon = True # Allow the thread to be killed when the main program exits
consumer_thread.start()

app = create_app()
start_consumer_thread(app)
2 changes: 1 addition & 1 deletion app/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
class Config:
SQLALCHEMY_DATABASE_URI = 'postgresql://test_user:password@localhost/test_db'
SQLALCHEMY_DATABASE_URI = ''
SQLALCHEMY_TRACK_MODIFICATIONS = False
JWT_SECRET_KEY = 'your_secret_key'
51 changes: 3 additions & 48 deletions app/routes/backtest.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from flask import Blueprint, request, jsonify
from app.models.backtest import Backtest, Parameter, Result
import threading
from flask import Blueprint, request, jsonify, current_app
from app.models.backtest import Backtest, Parameter
from app import db
from flask_jwt_extended import jwt_required
from app.services.backtest_service import run_backtest_by_id
from app.services.kafka_service import kafka_service

bp = Blueprint('backtest', __name__)


@bp.route('/backtest', methods=['POST'])
@jwt_required()
def run_backtest():
Expand Down Expand Up @@ -44,48 +44,3 @@ def run_backtest():
return jsonify({"msg": "Backtest created and published to Kafka", "backtest_id": new_backtest.id}), 201


def consume_backtest_scenes():
def callback(message):
backtest_id = message.get('backtest_id')
run_backtest_by_id(backtest_id)

kafka_service.consume('backtest_scenes', callback)


consume_backtest_scenes() # Start consuming Kafka messages in a separate thread

# from flask import Flask, request, jsonify
# import sys
# import os
# sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))

# from scripts.backtest_runner import run_backtest, RsiBollingerBandsStrategy
# app = Flask(__name__)

# @app.route('/backtest', methods=['GET','POST'])
# def backtest():
# data = request.get_json()
# symbol = data['symbol']
# start_date = data['start_date']
# end_date = data['end_date']
# strategy_name = data['strategy']

# strategies = {
# 'rsi_bollinger': RsiBollingerBandsStrategy
# }

# strategy = strategies.get(strategy_name.lower())
# if not strategy:
# return jsonify({'error': 'Invalid strategy name'}), 400

# try:
# stats = run_backtest(strategy, symbol, start_date, end_date)
# return jsonify(stats)
# except Exception as e:
# return jsonify({'error': str(e)}), 400

# # def backtest():
# # return 'Backtest endpoint accessed successfully'

# if __name__ == '__main__':
# app.run(debug=True)
3 changes: 2 additions & 1 deletion app/services/backtest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ def run_backtest_by_id(backtest_id):
backtest = Backtest.query.get(backtest_id)
if not backtest:
return



# Simulate backtest processing
result = Result(
backtest_id=backtest_id,
Expand Down
61 changes: 50 additions & 11 deletions app/services/kafka_service.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,69 @@
from confluent_kafka import Producer, Consumer, KafkaException
import logging
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import Producer, Consumer, KafkaException, KafkaError
import json
from decimal import Decimal

class KafkaService:
def __init__(self, brokers):
self.brokers = brokers
self.producer = Producer({'bootstrap.servers': brokers})
self.consumer = Consumer({
'bootstrap.servers': brokers,
'group.id': 'backtest_group',
'auto.offset.reset': 'earliest'
})
self.admin_client = AdminClient({'bootstrap.servers': brokers})

def create_topic(self, topic):
topic_metadata = self.admin_client.list_topics(timeout=10)
if topic not in topic_metadata.topics:
logging.info(f"Creating topic {topic}")
new_topic = NewTopic(topic, num_partitions=1, replication_factor=1)
fs = self.admin_client.create_topics([new_topic])
for topic, f in fs.items():
try:
f.result() # The result itself is None
logging.info(f"Topic {topic} created successfully")
except Exception as e:
logging.error(f"Failed to create topic {topic}: {str(e)}")
else:
logging.info(f"Topic {topic} already exists")

def json_serializer(self, obj):
if isinstance(obj, Decimal):
return float(obj)
raise TypeError("Type not serializable")

def produce(self, topic, message):
self.producer.produce(topic, key=None, value=json.dumps(message))
logging.info(f"Producing message to topic {topic}: {message}")
serialized_message = json.dumps(message, default=self.json_serializer)
self.producer.produce(topic, key=None, value=serialized_message)
self.producer.flush()
logging.info("Message produced successfully")

def consume(self, topic, callback):
self.create_topic(topic)
self.consumer.subscribe([topic])
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logging.info(f"Subscribed to topic {topic}")
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
logging.debug("No message received")
continue
else:
raise KafkaException(msg.error())
callback(json.loads(msg.value()))
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
logging.info("End of partition reached")
continue
else:
logging.error(f"Consumer error: {msg.error()}")
raise KafkaException(msg.error())
logging.info(f"Received message: {msg.value()}")
callback(json.loads(msg.value()))
except Exception as e:
logging.error(f"Error in Kafka consumer: {str(e)}")
finally:
self.consumer.close()

kafka_service = KafkaService(brokers='localhost:9092')
2 changes: 1 addition & 1 deletion app/services/mlflow_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ def log_metrics(self, run_name, metrics):
for key, value in metrics.items():
mlflow.log_metric(key, value)

mlflow_service = MLflowService(tracking_uri='http://localhost:5000')
mlflow_service = MLflowService(tracking_uri='http://localhost:5050')
25 changes: 0 additions & 25 deletions datas/binance/DOGEUSD_PERP-1h-2023-06-04.csv

This file was deleted.

Loading

0 comments on commit 87eec81

Please sign in to comment.