Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel $\rightarrow$ Restart) and then **run all cells** (in the menubar, select Cell $\rightarrow$ Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE", as well as your name and collaborators below:

In [None]:
NAME = ""
COLLABORATORS = ""

---

# Assignment 9: Paper Trading - SMA Algorithm

## 0 Introduction
In this assignment, we demonstrate the SMA algorithm's paper trading step. We define the SMA Algorithm similarly to the previous assignment. Still, we restructure the code slightly to accommodate the paper trading step when the real-time price is used instead of the historical price.

Often, the live trading version of the algorithm code is more similar to the paper trading version than the backtesting one. Differences in implementation from historical prices to real-time prices lead to performance mismatches. Deploying the algorithm to a live trade system also poses some challenges and hurdles.

One of the needs in Algorithmic Trading is to design a framework such that the differences between backtesting and live trade code are minimal, or even better, no differences. It would increase the productivity of Algorithmic Traders by a significant amount if they could only have to specify (or code) the algorithm once, and it can be used in both backtesting and live trade. Nevertheless, it would be a whole different topic. This assignment wants to show you how paper trading would work in the process of **Trading Algorithm Development**.

First, we import some of the required packages:

In [None]:
import json
import logging
import time

import kafka
import pytz

import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from datetime import datetime
from logging.handlers import RotatingFileHandler
from typing import List, Tuple
from multiprocessing import Process, Queue


from kafka import KafkaConsumer, KafkaAdminClient, TopicPartition

TIMEZONE = pytz.timezone('Asia/Ho_Chi_Minh')

## 1 Kafka Configuration
We will use `Kafka` as a [message broker](https://en.wikipedia.org/wiki/Message_broker) to deliver the real-time market price to our `SMA Algorithm`. To begin, we load the configuration of `Kafka` broker.

In [None]:
# Load the Kafka configuration
with open("kafka-config.json", 'r') as f:
    config = json.load(f)

# Print the configuration
print(config)

### 1.1 Connect to Kafka
... and check the availability. We can connect to the `KafkaAdminClient` to test the connection and also get the topics we want to subscribe the message.

In [None]:
# First, we connect to Kafka as a KafkaAdminClient
admin_client = KafkaAdminClient(**config)

# Then we get the topics
topics = admin_client.list_topics()

# Pick out the random five topics:
topics[:5]

Suppose we would like to find the correct topic name of the instrument `VN30F1M` from the list of topics, we can do:

In [None]:
# Select the correct topic name of the instrument VN30F1M only:
list(filter(lambda x: 'VN30F1M' in x, topics))

## 1.2 Connect to the Message Channel of `VN30F1M` through Kafka

In [None]:
# NOTE: Change the ticker symbol to the appropriate future contract name based on time.
# For example, if today is 25.02.2025 the VN30F1M ticker symbol is VN30F2403.

# The topic name is just a convention in this system; often, 'HNXDS' is the code of Derivative Exchange, and `VN30F1M` is the ticker symbol.
F1M_TOPIC = 'HNXDS.VN30F1M'

Then, we define the consumer to consume messages from the message queue.

In [None]:
# Define an example function to consume messages from Kafka
def consume_message(
    config: dict,
    topic: str
):
    """Consumes messages from Kafka.

    Run for one minute, then stop. Demonstration only.

    Args:
        config (dict): The Kafka configuration.
        topic (str): The topic to stream the price message.
    """
    # Create the Kafka consumer
    consumer = KafkaConsumer(
        topic,  # specify topic
        **config,
        group_id="algotrade",
        auto_offset_reset="earliest",
        value_deserializer=lambda v: json.loads(v.decode('utf-8')),  # Serialize the message from Kafka
    )

    print("Consumer started, waiting for messages...")
    # Consume the message

    i = 0
    for message in consumer:
        # The consumer information
        print(f"Topic: {message.topic}")
        print(f"Partition: {message.partition}")
        print(f"Offset: {message.offset}")

        # The message value
        # print(f"Value: {message.value}")
        print('latest_matched_price', message.value['latest_matched_price']['value'])
        print('latest_matched_quantity', message.value['latest_matched_quantity']['value'])
        print("---")

        # Sleep for one second to not overload the output data of Jupyter Notebook only
        # In running on other platforms, this code can be removed.
        time.sleep(1)

        # Run for one minute then stop
        i += 1
        if i >= 60:
            return

In the next step, we can use this consumer structure to stream price information from `Kafka` to paper trade our algorithm. We need to implement the paper trading version of the algorithm and then add a mechanism to get the real-time price. The consumer structure of `Kafka` can help us with that. We will lay out the details in the next section.

## 2 The SMA Algorithm class
To set up the SMA algorithm for paper trading conveniently, we need to restructure the code into a class. A class helps with grouping required variables of the SMA Algorithm under the same namespace. Doing so helps us conveniently access the needed resources. It also allows the code to be more lean. We can also easily define a `callback` function later to consume the price message from the message broker.

In [None]:
class SMA:
    """Defines the SMA Trading Algorithm.

    Attributes:
        sma_window_length (int): The SMA window length of the trading algorithm.
        take_profit_thres (int): Take profit threshold.
        cut_loss_thres (int): Cut loss threshold.
        nav_value (float): The initial nav value. Updating along the way.
    """
    def __init__(
        self,
        sma_window_length: int,
        take_profit_thres: float,
        cut_loss_thres: float,
        nav_value: float = 1000,
        logger = None,
        data_queue: Queue = None
    ):
        self.sma_window_length = sma_window_length
        self.take_profit_thres = take_profit_thres
        self.cut_loss_thres = cut_loss_thres
        self.nav_value = nav_value
        self.logger = logger

        if not self.logger:
            self.logger = logging.getLogger(__name__)
            
        # Date list to store the date data
        self.date_list = []
        # The price line
        self.price_line = []
        # The sma_line
        self.sma_line = []
        # The holdings to store the current positions of the algorithm
        self.holdings = []
        
        # NAV history object to store the evolution of the asset
        self.nav_history = []
        # An accounting queue object to communicate the asset
        self.data_queue = data_queue
        # latest_price timestamp to see if the price information is actually late
        self.latest_price_timestamp = None
    
    # Open position when there is a trading signal, and add position to holdings.
    # Entry point is the price point at which a position is open.
    def open_position(
        self,
        position_type: str,
        entry_point: float,
    ):
        """Opens position and add to the holding.
    
        Args:
            position_type (str): The position type. The algorithm has only two positions: `LONG` or `SHORT`.
            entry_point (float): The entry point of the position. The entry point is the price point at which the position is opened.
        """
        # Add position into the holdings. Position entry point:
        position = [position_type, entry_point]
        self.holdings.append(position)

    # close position when there is trading signal. Remove the position from holdings.
    def close_positions(
        self,
        cur_price: float
    ) -> Tuple[float, float]:
        """Closes the position and remove from the holding. Also, performs some accounting tasks such as calculating realized and unrealized profit and loss (PnL).
        
        Args:
            cur_price (float): Current price point.
    
        Returns:
            The total realized and unrealized profit and loss.
        """
    
        total_realized_pnl = 0
        total_unrealized_pnl = 0
    
        for position_type, entry_point in self.holdings[:]:
            self.logger.info(f'{position_type}, {entry_point}')
        # Loop through the opened position 
        # and check if the position has reached the take_profit_thres or cut_loss_thres
        # then close (remove) the position from the holdings.
            if position_type == 'LONG':
                unrealized_pnl = (cur_price - entry_point)
    
            if position_type == 'SHORT':
                unrealized_pnl = -(cur_price - entry_point)
            
            if (
                unrealized_pnl >= self.take_profit_thres or
                unrealized_pnl < self.cut_loss_thres
            ):
                self.logger.info(f'{unrealized_pnl}, {self.take_profit_thres}, {self.cut_loss_thres}')
                # Remove the position from the holdings. Position exit point:
                self.holdings.remove([position_type, entry_point])
    
                total_realized_pnl += unrealized_pnl
            else:
                total_unrealized_pnl += unrealized_pnl
        
        return total_realized_pnl, total_unrealized_pnl
                
    def calculate_sma(
        self,
        cur_price: float
    ) -> float:
        """Calculates the SMA with the current price.
        
        Args:
            cur_price (float): The current price.

        Returns:
            The current SMA value.
        """
        if len(self.price_line) < self.sma_window_length:
            return

        cur_sma = round(np.average(self.price_line[-self.sma_window_length:]), 2)
        
        return cur_sma
    
    def run(self, message: dict):
        """Runs the SMA algorithm with the current price stream from the message broker.
        This can also be used as a callback in Message Broker (e.g. Kafka, Redis) Pub/Sub APIS.

        In this example, we use Kafka as a message broker.

        Args:
            redis_message (dict): The redis message contains the current tick data.
        """
        cur_price = message.value['latest_matched_price']['value']

        # Skip if the current price is None
        if cur_price is None:
            return

        # Check if the latest price is actually the latest based on the timestamp
        # Since messages can be duplicated information
        latest_price_timestamp = message.value['latest_matched_price']['last_updated']
        if not self.latest_price_timestamp:
            self.latest_price_timestamp = latest_price_timestamp
        else:
            if latest_price_timestamp <= self.latest_price_timestamp:
                return
            else:
                self.latest_price_timestamp = latest_price_timestamp

        now = datetime.fromtimestamp(
            message.value['latest_matched_price']['last_updated']
        ).astimezone(TIMEZONE)

        self.date_list.append(now)
        self.price_line.append(cur_price)
        # Calculate the SMA
        cur_sma = self.calculate_sma(cur_price)

        if not cur_sma:
            self.logger.info(f'{now} Calculating SMA({self.sma_window_length})...')
        
        self.sma_line.append(cur_sma)

        # If required information is not provided, skip for now
        if len(self.price_line) < 2 or len(self.sma_line) < 2 or cur_sma is None:
            return

        # Got the required information
        prev_price = self.price_line[-2]
        prev_sma = self.sma_line[-2]

        # If there is not required information, skip:
        if not prev_sma:
            return

        # Printing for debugging
        self.logger.info(
            f'{now} price(t-1)={prev_price}; '
            f'sma(t-1)={prev_sma}; '
            f'price(t)={cur_price}; '
            f'sma(t)={cur_sma}'
        )        

        # Close positions
        # Determine if any positions need to be closed
        total_realized_pnl, total_unrealized_pnl = self.close_positions(cur_price)
        # Update the asset_value
        self.nav_value = self.nav_value + total_realized_pnl
        nav_history = self.nav_value
        
        if total_realized_pnl == 0:
            nav_history = self.nav_value + total_unrealized_pnl

        shared_data = {
            'date': now,
            'price': cur_price,
            'sma': cur_sma,
            'nav': nav_history
        }
        
        self.nav_history.append(nav_history)
        # Put accounting_value in accounting_queue to share if queue is available
        if self.data_queue:
            self.data_queue.put(shared_data)
        
        # NOTE: open one contract only
        if self.holdings:
            self.logger.info(f'{now} UNREALIZED PNL: {round(total_unrealized_pnl, 2)}')
            return

        # Open a LONG position when there are signals and add to the holdings
        if prev_price < prev_sma and cur_price >= cur_sma:
            self.open_position('LONG', cur_price)

        # Open a SHORT position when there are signals and add to the holdings
        if prev_price > prev_sma and cur_price <= cur_sma:
            self.open_position('SHORT', cur_price)

### 2.1 Create The SMA object and a Consumer

We can create the consumer as follows:

In [None]:
# Define the consumer
# Uncomment this section with the correct group_id to consume the message
consumer = KafkaConsumer(
    F1M_TOPIC,
    **config,
    # ***IMPORTANT NOTE***
    # You should change the group_id into your student id (e.g., "21125157" (str)) to correctly consume the message.
    # Each student will have a different stream from which to consume messages to not interfere with each other.
    group_id="paper-trading-01",
    auto_offset_reset="latest",
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),  # Serialize the message from Kafka
)

tp = TopicPartition(topic='HNXDS.VN30F1M', partition=0)
consumer.assign([tp])
consumer.seek_to_beginning()

In Kafka, there is a way to "playback" the message from the stream to test your algorithm in from the previous message. You can define and config your Consumer follow the code snippet below. We use the concept of offset in the partition of Kafka to enable that. You can read more about it [here](https://www.redpanda.com/guides/kafka-architecture-kafka-offset).

In [None]:
# Define the consumer a little bit different than the above
# Uncomment to use the code. Remember, we can define the consumer in only one way.
# If you choose the one, please comment or remove the other code for defining the consumer.

# consumer = KafkaConsumer(
#     **config,
#     # ***IMPORTANT NOTE***
#     # You should change the group_id into your student id (e.g., "21125157" (str)) to correctly consume the message.
#     # Each student will have a different stream from which to consume messages to not interfere with each other.
#     group_id="paper-trading-01",
#     value_deserializer=lambda v: json.loads(v.decode('utf-8')),  # Serialize the message from Kafka
# )

# # Define the TopicPartition for the consumer
# tp = TopicPartition(topic=F1M_TOPIC, partition=0)
# # Assign the TopicPartition to the consumer.
# consumer.assign([tp])
# # Reset the offset to the beginning of the stream
# consumer.seek_to_beginning()

After define the consumer, we create a SMA object to store the SMA algorithm

In [None]:
file_handler = RotatingFileHandler('sma.log', maxBytes=1024*100, backupCount=1)
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, handlers=[file_handler])

q = Queue()

sma = SMA(
    sma_window_length=5,
    take_profit_thres=8.8,
    cut_loss_thres=-5.0,
    logger=logger,
    data_queue=q
)

Then, we create a function to run the SMA algorithm with the Kafka Consumer. The function will help us run the procedure in thread to help with the management (start-stop).


In [None]:
def run_sma_with_kafka(
    sma: SMA,
    kafka_consumer: KafkaConsumer
):
    """Runs the SMA with Kafka Consumer.

    Args:
        sma (SMA): The SMA algorithm object.
        kafka_consumer (KafkaConsumer): The message consumer of Kafka.
    """
    for message in kafka_consumer:
        sma.run(message)

Then we can put the procedure into a Python process and start

In [None]:
# Define the SMA process
sma_process = Process(
    target=run_sma_with_kafka,
    args=(sma, consumer,),
    daemon=True
)

In [None]:
# Run the SMA thread
sma_process.start()

In [None]:
# Let the process run for a while, then terminate if necessary
# If you want to paper trade continuously, you can disable this by commenting this section
# time.sleep(600)
# sma_process.terminate()

## 3. Evaluation

We can access the property `accounting` and others (such as `holdings`, `price_line`, and `sma_line` to assess the performance of the algorithm)

We can define the Evaluation class to calculate the desired metrics

In [None]:
class Evaluator:
    """Evaluator class contains the possible evaluation of a trading algorithm"""

    def __init__(
        self,
        date_list: List = [],
        price_line: List = [],
        sma_line: List = [],
        nav: List = []
    ):
        self.data = pd.DataFrame.from_dict(
            {
                date: [price_line[i], sma_line[i], nav[i]]
                for i, date in enumerate(date_list)
            },
            orient='index', columns=['Price', 'SMA', 'NAV']
        )

    def plot_price_sma(self):
        """Plots Price vs SMA"""
        fig = plt.figure()
        ax = fig.add_subplot(111)
        self.data['Price'].plot(kind='line', figsize=(8, 4), title=f'Price vs. SMA', ax=ax)
        self.data['SMA'].plot(kind='line', figsize=(8, 4), ax=ax, legend=True)
        plt.gca().spines[['top', 'right']].set_visible(False)

    def plot_nav(self):
        """Plots the NAV chart"""
        self.data['NAV'].plot(kind='line', figsize=(8, 4), title='NAV Chart')
        plt.gca().spines[['top', 'right']].set_visible(False)

    def get_period_return(self):
        """Gets the period return"""
        cur_asset_value = self.data['NAV'].iloc[-1]
        init_asset_value = self.data['NAV'].iloc[0]

        accum_return_rate = (cur_asset_value / init_asset_value - 1) * 100

        return round(accum_return_rate, 2)

    def get_mdd(self):
        """Gets the MDD"""
        # For each day, calculate the peak of asset value since inception
        self.data['peak'] = self.data.apply(lambda row: self.data.loc[:row.name, 'NAV'].max(), axis=1)
        
        # for each day, calculate asset drawdown
        self.data['drawdown'] = self.data['NAV']/self.data['peak'] - 1
        
        # max drawdown is the most negative value
        mdd = self.data['drawdown'].min() * 100

        return round(mdd, 2)

    def update_data_from_queue(self, data_queue: Queue):
        """Updates data from queue element.

            Queue element format:
            {
                "date": <datetime string>,
                "price": float,
                "sma": float,
                "nav": float
            }
        """
        while not q.empty():
            data = q.get()
            self.data.loc[data['date']] = {
                'Price': data['price'],
                'SMA': data['sma'],
                'NAV': data['nav']
            }

In [None]:
# And we can initiate the evaluator
evaluator = Evaluator()

In [None]:
evaluator.update_data_from_queue(q)

### 3.1 Price and SMA
We can plot the Price and SMA chart

In [None]:
evaluator.plot_price_sma()

### 3.2 The NAV Chart

Then we can also plot the NAV Chart

In [None]:
evaluator.plot_nav()

### 3.3 The period return

We can show the period return

In [None]:
print(f'Period Return: {evaluator.get_period_return()}')

### 3.4 The MDD

And we can also get the MDD

In [None]:
print(f'MDD: {evaluator.get_mdd()}')