Skip to content

Commit

Permalink
Updates for postgresql tables
Browse files Browse the repository at this point in the history
  • Loading branch information
powprashant committed Sep 22, 2023
1 parent 57a7953 commit 0ad59e1
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 17 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ pylint==2.17.5
psycopg2==2.9.7
matplotlib==3.7.2
mplfinance==0.12.10b0
scipy==1.11.2
11 changes: 7 additions & 4 deletions resources/order_manager.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
-- create database order_manager;
-- \c order_manager

-- Define ENUM types
CREATE TYPE order_status AS ENUM (
'Placed', 'Executed', 'Cancelled', 'Rejected', 'Partially Executed'
Expand Down Expand Up @@ -57,10 +60,10 @@ CREATE OR REPLACE FUNCTION handle_order_status_change()
RETURNS TRIGGER AS $$
BEGIN
-- Create scrip entry when order status is "Placed"
IF NEW.status = 'Placed' THEN
INSERT INTO scrips (script_code, script_name, exchange, exchange_segment)
VALUES (NEW.script_code, 'Placeholder Name', 'NSE', 'Equity');
END IF;
-- IF NEW.status = 'Placed' THEN
-- INSERT INTO scrips (script_code, script_name, exchange, exchange_segment)
-- VALUES (NEW.script_code, 'Placeholder Name', 'NSE', 'Equity');
-- END IF;

-- Insert into live_scrips when order status is "Executed"
IF NEW.status = 'Executed' THEN
Expand Down
112 changes: 106 additions & 6 deletions src/common/db_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import traceback
from enum import Enum
from typing import List, Optional, Union

Expand All @@ -14,22 +15,34 @@ class OrderStatus(Enum):
REJECTED = "Rejected"
PARTIALLY_EXECUTED = "Partially Executed"

def __str__(self):
return self.value


class ExchangeType(Enum):
NSE = "NSE"
BSE = "BSE"

def __str__(self):
return self.value


class OrderType(Enum):
REGULAR = "R"
STOPLOSS = "SL"

def __str__(self):
return self.value


class ExchangeSegmentType(Enum):
DERIVATIVE = "Derivative"
EQUITY = "Equity"
CURRENCY = "Currency"

def __str__(self):
return self.value


# pylint: disable=too-many-arguments,too-few-public-methods
class Order:
Expand Down Expand Up @@ -64,10 +77,33 @@ def __init__(
self.redis_client = redis_client
self.logger = logging.getLogger(__name__)

def insert_order(self, order: Order) -> int:
def insert_order(self, order: Order, **kwargs) -> int:
try:
with self.connection:
cursor = self.connection.cursor()

## get scrip name exchangeTye and exchange segment from kwargs, use null if not present
scrip_name = kwargs.get("scrip_name", None)
exchange_type = ExchangeType(kwargs.get("exchange_type", None))
exchange_segment = ExchangeSegmentType(
kwargs.get("exchange_segment", None)
)

## update scrips table on conflict disregard
scrip_sql = """
INSERT INTO scrips (script_code, script_name, exchange, exchange_segment)
VALUES (%s, %s, %s, %s)
ON CONFLICT (script_code) DO NOTHING
"""
cursor.execute(
scrip_sql,
(
order.script_code,
scrip_name,
exchange_type.value,
exchange_segment.value,
),
)
sql = """
INSERT INTO orders (remote_order_id, exchange_order_id,
script_code, quantity, buy_sell, avg_price,
Expand Down Expand Up @@ -101,7 +137,7 @@ def insert_order(self, order: Order) -> int:
def fetch_orders(self, search: Optional[Union[str, int]] = None) -> List[Order]:
try:
cached_orders = self.redis_client.get(self._get_cache_key(search))
if cached_orders:
if False and cached_orders:
orders = self._deserialize_orders(cached_orders)
else:
orders = self._fetch_orders_from_database(search)
Expand All @@ -125,8 +161,7 @@ def _cache_orders(self, orders: List[Order], search: Optional[Union[str, int]]):
self.logger.info("Cached orders for search: %s", search)

def _serialize_orders(self, orders: List[Order]) -> str:
serialized_orders = [json.dumps(order.__dict__) for order in orders]
return json.dumps(serialized_orders)
return json.dumps([json.dumps(order.__dict__) for order in orders])

def _deserialize_orders(self, serialized_orders: str) -> List[Order]:
orders_data = json.loads(serialized_orders)
Expand All @@ -146,6 +181,7 @@ def _fetch_orders_from_database(
else:
sql = "SELECT * FROM orders"
params = None
self.logger.info("Executing query: %s | %s", sql, params)

cursor.execute(sql, params)
rows = cursor.fetchall()
Expand All @@ -166,6 +202,31 @@ def _fetch_orders_from_database(
return orders
except Exception as exp:
self.logger.error("Failed to fetch orders from database: %s", exp)
self.logger.error("Stack Trace :%s", traceback.format_exc())
raise

def update_order(self, **kwargs):
try:
with self.connection:
cursor = self.connection.cursor()
remote_order_id = kwargs.get("remote_order_id", None)
comment = kwargs.get("comment", None)
avg_price = float(kwargs.get("avg_price", None))
qunantity = int(kwargs.get("quantity", None))
status = OrderStatus(kwargs.get("status", None))
sql = """
UPDATE orders SET status = %s, avg_price = %s, comment = %s, quantity = %s
WHERE remote_order_id = %s
"""
cursor.execute(
sql,
(status.value, avg_price, comment, qunantity, remote_order_id),
)
self._clear_cached_orders()
self.logger.info("Updated order with ID: %s", order.remote_order_id)
except Exception as exp:
self.connection.rollback()
self.logger.error("Failed to update order: %s", exp)
raise


Expand All @@ -177,8 +238,8 @@ def __new__(cls):
if cls._instance is None:
cls.db_params = {
"dbname": "order_manager",
"user": "postgres",
"password": "postgres",
"user": "admin",
"password": "admin",
"host": "localhost",
"port": "5432",
}
Expand All @@ -187,3 +248,42 @@ def __new__(cls):
password={cls.db_params['password']}"
cls._instance.connection = psycopg2.connect(conn_string)
return cls._instance


## Example usage:
if __name__ == "__main__":
db_conn = DatabaseConnection()
order_repo = OrderRepository(db_conn.connection, redis.Redis(host="127.0.0.1"))
order = Order(
remote_order_id="straddle_1234",
exchange_order_id="4242123618",
code=61842,
quantity=400,
buy_sell="S",
avg_price=123.45,
status=OrderStatus.EXECUTED,
order_type=OrderType.REGULAR,
)
# order_repo.insert_order(order, scrip_name="NIFTY CE SEP 28 2023 20500.00",
# exchange_type=ExchangeType.NSE,
# exchange_segment=ExchangeSegmentType.DERIVATIVE)
orders = order_repo.fetch_orders()
## display orders
for order in orders:
print(order.__dict__)
# orders = order_repo.fetch_orders(search=123)
# print(orders)
# orders = order_repo.fetch_orders(search="1234")
# print(orders)
# orders = order_repo.fetch_orders(search="12345")
# print(orders)
# orders = order_repo.fetch_orders(search=12345)
# print(orders)
# orders = order_repo.fetch_orders(search=1234)

order_repo.update_order(
remote_order_id="12345",
status=OrderStatus.CANCELLED,
quantity=200,
avg_price=123.45,
)
27 changes: 20 additions & 7 deletions src/simulator/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,28 @@ services:
ports:
- "27017:27017"

timescaledb:
image: timescale/timescaledb:latest-pg15
restart: always
container_name: minimal-timescaledb
ports:
- "5432:5432"
# timescaledb:
# image: timescale/timescaledb:latest-pg15
# restart: always
# container_name: minimal-timescaledb
# ports:
# - "5432:5432"
# environment:
# POSTGRES_USER: 'admin'
# POSTGRES_PASSWORD: 'admin'
# volumes:
# - /data:/var/lib/postgresql/data

postgres:
image: postgres:15.0
restart: always
container_name: minimal-postgres
environment:
POSTGRES_USER: 'admin'
POSTGRES_PASSWORD: 'admin'
volumes:
- /data:/var/lib/postgresql/data
- ./data:/var/lib/postgresql/data
ports:
- "5432:5432" # Map container port 5432 to the host


0 comments on commit 0ad59e1

Please sign in to comment.