# Stock Trading

The cell below defines the **abstract class** whose API you need to implement. **Do NOT modify it** - use the dedicated cell further below for your implementation instead.

In [1]:
# DO NOT MODIFY THIS CELL

from abc import ABC, abstractmethod  
      

# abstract class to represent a stock trading platform
class AbstractStockTradingPlatform(ABC):
    
    # constructor
    @abstractmethod
    def __init__(self):
        pass           
        
    # adds transactionRecord to the set of completed transactions
    @abstractmethod
    def logTransaction(self, transactionRecord):
        pass

    # returns a list with all transactions of a given stockName,
    # sorted by increasing trade value. 
    # stockName : str
    @abstractmethod
    def sortedTransactions(self, stockName): 
        sortedList = []
        return sortedList    
    
    # returns a list of transactions of a given stockName with minimum trade value
    # stockName : str
    @abstractmethod
    def minTransactions(self, stockName): 
        minList = []
        return minList    
    
    # returns a list of transactions of a given stockName with maximum trade value
    # stockName : str
    @abstractmethod
    def maxTransactions(self, stockName): 
        maxList = []
        return maxList    

    # returns a list of transactions of a given stockName, 
    # with the largest trade value below a given thresholdValue.  
    # stockName : str
    # thresholdValue : double
    @abstractmethod
    def floorTransactions(self, stockName, thresholdValue): 
        floorList = []
        return floorList    

    # returns a list of transactions of a given stockName, 
    # with the smallest trade value above a given thresholdValue.  
    # stockName : str
    # thresholdValue : double
    @abstractmethod
    def ceilingTransactions(self, stockName, thresholdValue): 
        ceilingList = []
        return ceilingList    

        
    # returns a list of transactions of a given stockName,  
    # whose trade value is within the range [fromValue, toValue].
    # stockName : str
    # fromValue : double
    # toValue : double
    @abstractmethod
    def rangeTransactions(self, stockName, fromValue, toValue): 
        rangeList = []
        return rangeList    

Use the cell below to define any data structure and auxiliary python function you may need. Leave the implementation of the main API to the next code cell instead.

In [2]:
# ADD AUXILIARY DATA STRUCTURE DEFINITIONS AND HELPER CODE HERE

class TransactionRecord:
    def __init__(self, price: float, quantity: int, time: str) -> None:
        self.price = price
        self.quantity = quantity
        self.time = time

    def trade_value(self) -> float:
        """
        Returns the value of the trade
        """
        return self.price * self.quantity

class AVLNode:
    def __init__(self,  initial_pair = None):
        if initial_pair is None:
            self._key = None
            self._values = []
            self._left = None
            self._right = None
        else:
            self._key = initial_pair[0]
            self._values = [initial_pair[1]]
            self._left = AVLNode(None)
            self._right = AVLNode(None)


    def __repr__(self) -> str:
        def _internal_repr(node: AVLNode, indent: int) -> str:
            if node.is_leaf():
                return "None"

            spaces = " " * indent
            string = "\n"
            l_repr = _internal_repr(node._left, indent + 4)
            r_repr = _internal_repr(node._right, indent + 4)

            string += f"{spaces}{node._key} : {node._values}\n"
            string += f"{spaces}left: {l_repr}\n"
            string += f"{spaces}right: {r_repr}"

            return string

        return _internal_repr(self, 0)


    def balance(self, insert_path: [int]) -> None:
        """
        Recursively balances a tree
        """
        if len(insert_path) < 2:
            return

        *remaining_insert_path, child_insert_direction = insert_path
        grandchild_insert_direction = remaining_insert_path[-1]

        if child_insert_direction == -1:
            self._left.balance(remaining_insert_path)
        else:
            self._right.balance(remaining_insert_path)

        left_height  = self._left.height()
        right_height = self._right.height()

        # If unbalanced, rotate according to the path
        if abs(left_height - right_height) > 1:
            if child_insert_direction == -1:
                if grandchild_insert_direction == -1:
                    self.right_rotate()
                else:
                    self.left_right_rotate()
            else:
                if grandchild_insert_direction == -1:
                    self.right_left_rotate()
                else:
                    self.left_rotate()


    def floor_values(self, key) -> (float, list):
        """
        returns all values whose key the lowest that is above 'key' 
        """
        if self.is_leaf():
            return (None, [])
        elif self._key == key:
            return self._values
        elif self._key > key:
            return self._left.floor_values(key)
        else:
            (r_floor_key, r_floor_vals) = self._right.floor_values()

            if r_floor_key is not None and r_floor_key <= key:
                return (r_floor_key, r_floor_vals)
            else:
                return (self._key, self._values)


    def height(self) -> int:
        """
        Gets the height of a tree
        """
        if self.is_leaf():
            return 0
        else:
            return 1 + max(self._left.height(), self._right.height())


    def insert(self, key, value, comp) -> [int]:
        """
        Inserts a key/value pair into the tree

        returns a list containing the insertion path,
        adding amember as follows:
            if inserted into the left branch: -1
            if inserted into the right branch: 1
            otherwise, returns an empty list
        """
        path = []

        if self.is_leaf():
            self._key = key
            self._values = [value]
            self._left  = AVLNode(None)
            self._right = AVLNode(None)
        else:
            x = comp(key)
            y = comp(self._key)
            path = []

            if x < y:
                path = self._left.insert(key, value, comp)
                path.append(-1)
            elif x > y:
                path = self._right.insert(key, value, comp)
                path.append(1)
            else:
                self._values.append(value)

        return path


    def is_leaf(self) -> bool:
        """
        Leaves are internally represented as nodes whose value is None.
        This returns whether self._value is None
        """
        return self._key is None


    def left_rotate(self) -> None:
        """
        Performs a left rotation on self
        """
        if not self.is_leaf() and not self._right.is_leaf():
            new_left = AVLNode((self._key, self._values[0]))
            new_left._values = [v for v in self._values]
            new_left._left   = self._left
            new_left._right  = self._right._left

            self._key    = self._right._key
            self._values = [v for v in self._right._values]
            self._right  = self._right._right
            self._left   = new_left


    def right_rotate(self) -> None:
        """
        Performs a right rotation on self
        """
        if not self.is_leaf() and not self._left.is_leaf():
            new_right = AVLNode((self._key, self._values[0]))
            new_right._values = [v for v in self._values]
            new_right._right  = self._right
            new_right._left   = self._left._right

            self._key    = self._left._key
            self._values = [v for v in self._left._values]
            self._left   = self._left._left
            self._right  = new_right


    def left_right_rotate(self) -> None:
        """
        Performs a left right rotation on self
        """
        if not self.is_leaf() and not self._left.is_leaf():
            self._left.left_rotate()
            self.right_rotate()


    def right_left_rotate(self) -> None:
        """
        Performs a right left rotation on self
        """
        if not self.is_leaf() and not self._right.is_leaf():
            self._right.right_rotate()
            self.left_rotate()


class AVLTree:
    def __init__(self, comp = lambda x : x) -> None:
        self.root_node = AVLNode(None)
        self.comp = comp


    def __repr__(self) -> str:
        return f"{self.root_node}"


    def insert(self, key, value) -> None:
        """
        Adds a value to the tree 
        """
        path = self.root_node.insert(key, value, self.comp)
        self.root_node.balance(path)


    def above_eq_certain_key(self, key) -> list:
        """
        Returns all elements in the list >= 'key'
        """
        def rec_acv(node: AVLNode, comp, collected: [AVLNode]) -> None:
            if node.is_leaf():
                return

            comp_node_key = comp(node._key)

            if comp_node_key < key:
                rec_acv(node._right, comp, collected)
            elif comp_node_key == key:
                collected.append(node)
                rec_acv(node._right, comp, collected)
            else:
                rec_acv(node._left, comp, collected)
                collected.append(node)
                rec_acv(node._right, comp, collected)

        collected_nodes = []
        rec_acv(self.root_node, self.comp, collected_nodes)

        return collected_nodes


    def below_eq_certain_key(self, key) -> list:
        """
        Returns all elements in the list >= 'key'
        """
        def rec_acv(node: AVLNode, comp, collected: [AVLNode]) -> None:
            if node.is_leaf():
                return

            comp_node_key = comp(node._key)

            if comp_node_key > key:
                rec_acv(node._left, comp, collected)
            elif comp_node_key == key:
                collected.append(node)
                rec_acv(node._left, comp, collected)
            else:
                rec_acv(node._left, comp, collected)
                collected.append(node)
                rec_acv(node._right, comp, collected)

        collected_nodes = []
        rec_acv(self.root_node, self.comp, collected_nodes)

        return collected_nodes


    def make_sorted_list(self) -> list:
        """
        Creates a list from the tree in ascending order
        """
        def rec_msl(node: AVLNode, collected: [AVLNode]) -> None:
            if not node.is_leaf():
                rec_msl(node._left, collected)
                collected.append(node)
                rec_msl(node._right, collected)

        nodes = []
        rec_msl(self.root_node, nodes)

        return nodes


Use the cell below to implement the requested API. 

In [3]:
# IMPLEMENT HERE THE REQUESTED API

class StockTradingPlatform(AbstractStockTradingPlatform):
    
    def __init__(self):
        self.record_dict: { str : AVLTree } = {}
        

    def logTransaction(self, transactionRecord):
        """
        Adds a transaction to record_dict
        
        The stock name is used as the key in self.record_dict
        """
        name, *values = transactionRecord
        transaction = TransactionRecord(*values)

        if name not in self.record_dict:
            self.record_dict[name] = AVLTree()

        self.record_dict[name].insert(transaction.trade_value(), transaction)


    def sortedTransactions(self, stockName):
        """
        Returns a list of all transactions in order of their trade value
        """
        if stockName in self.record_dict:
            node_list = self.record_dict[stockName].make_sorted_list()
            return [tr for node in node_list for tr in node._values]
        else:
            return []

    
    def minTransactions(self, stockName):
        """
        Returns all transactions with the lowest trade value
        """
        if (tree := self.record_dict.get(stockName)) is not None:
            node_cursor = tree.root_node

            while not node_cursor._left.is_leaf():
                node_cursor = node_cursor._left

            return node_cursor._values
        else:
            return []

    
    def maxTransactions(self, stockName): 
        """
        Returns all transactions with the highest trade value
        """
        if (tree := self.record_dict.get(stockName)) is not None:
            node_cursor = tree.root_node

            while not node_cursor._right.is_leaf():
                node_cursor = node_cursor._right

            return node_cursor._values
        else:
            return []


    def floorTransactions(self, stockName, thresholdValue): 
        """
        Returns the transactions of stockName with the largest trade
        value below thresholdValue
        """
        if (tree := self.record_dict.get(stockName)) is not None:
            return tree.root_node.floor_values(thresholdValue)[1]
        else:
            return []


    def ceilingTransactions(self, stockName, thresholdValue): 
        """
        Returns the transactions of stockName with the lowest trade
        value above thresholdValue
        """
        if (tree := self.record_dict.get(stockName)) is not None:
            raise NotImplementedError()
        else:
            return []

    
    def rangeTransactions(self, stockName, fromValue, toValue): 
        rangeList = []
        # ADD YOUR CODE HERE

        
        
        
        return rangeList    

SyntaxError: invalid syntax (3289931267.py, line 91)

The cell below provides helper code that you can use within your experimental framework to generate random transaction data. **Do NOT modify it**.

In [None]:
# DO NOT MODIFY THIS CELL

import random
from datetime import timedelta
from datetime import datetime

class TransactionDataGenerator:
    def __init__(self):
        self.stockNames = ["Barclays", "HSBA", "Lloyds Banking Group", "NatWest Group", 
                      "Standard Chartered", "3i", "Abrdn", "Hargreaves Lansdown", 
                      "London Stock Exchange Group", "Pershing Square Holdings", 
                      "Schroders", "St. James's Place plc."]
        self.minTradeValue = 500.00
        self.maxTradeValue = 100000.00
        self.startDate = datetime.strptime('1/1/2022 1:00:00', '%d/%m/%Y %H:%M:%S')
        random.seed(20221603)
          
    # returns the name of a traded stock at random
    def getStockName(self):
        return random.choice(self.stockNames)

    # returns the trade value of a transaction at random
    def getTradeValue(self):
        return round(random.uniform(self.minTradeValue, self.maxTradeValue), 2)
    
    # returns a list of N randomly generated transactions,
    # where each transaction is represented as a list [stock name, price, quantity, timestamp]
    # N : int
    def generateTransactionData(self, N):   
        listTransactions = [[]]*N
        listDates = [self.startDate + timedelta(seconds=3*x) for x in range(0, N)]
        listDatesFormatted = [x.strftime('%d/%m/%Y %H:%M:%S') for x in listDates]
        for i in range(N):
            stockName = random.choice(self.stockNames)
            price = round(random.uniform(50.00, 100.00), 2)
            quantity = random.randint(10,1000)
            listTransactions[i] = [stockName, price, quantity, listDatesFormatted[i]]   
        return listTransactions

Use the cell below for the python code needed to realise your **experimental framework** (i.e., to generate test data, to instante the `StockTrading` class, to thorouhgly experiment with its API functions, and to experimentally measure their performance). You may use the previously provided ``TransactionDataGenerator`` class to generate random transaction data.

In [None]:
import random
import timeit

# ADD YOUR EXPERIMENTAL FRAMEWORK CODE HERE
platform = StockTradingPlatform()
test_data_generator = TransactionDataGenerator()
test_data = test_data_generator.generateTransactionData(10_000)

for transaction in test_data:
    platform.logTransaction(transaction)

trade_values = [t.trade_value() for t in platform.sortedTransactions("Barclays")[:20]]
print(trade_values)
print([t.trade_value() for t in platform.floorTransactions("Barclays", trade_values[3])])
print([t.trade_value() for t in platform.ceilingTransactions("Barclays", trade_values[3])])


The cell below exemplifies **debug** code I will invoke on your submission - it does not represent an experimental framework (which should me much more comprehensive). **Do NOT modify it**. 

In [None]:
# DO NOT MODIFY THIS CELL

import timeit

testPlatform = StockTradingPlatform()
testDataGen = TransactionDataGenerator()

numTransactions = 1000000
testData = testDataGen.generateTransactionData(numTransactions)

numRuns = 100

print("Examples of transactions:", testData[0], testData[numTransactions//2], testData[numTransactions-1])

#
# testing the logTransaction() API 
#
starttime = timeit.default_timer()
for i in range(numTransactions):
    testPlatform.logTransaction(testData[i])
endtime = timeit.default_timer()
print("\nExecution time to load", numTransactions, "transactions:", round(endtime-starttime,4))

#
# testing the various API functions
#
starttime = timeit.default_timer()
for i in range(numRuns):
    output = testPlatform.sortedTransactions(testDataGen.getStockName())
endtime = timeit.default_timer()
print("\nMean execution time sortedTransactions:", round((endtime-starttime)/numRuns,4))

starttime = timeit.default_timer()
for i in range(numRuns):
    output = testPlatform.minTransactions(testDataGen.getStockName())
endtime = timeit.default_timer()
print("\nMean execution time minTransactions:", round((endtime-starttime)/numRuns,4))

starttime = timeit.default_timer()
for i in range(numRuns):
    output = testPlatform.maxTransactions(testDataGen.getStockName())
endtime = timeit.default_timer()
print("\nMean execution time maxTransactions:", round((endtime-starttime)/numRuns,4))


starttime = timeit.default_timer()
for i in range(numRuns):
    output = testPlatform.floorTransactions(testDataGen.getStockName(), testDataGen.getTradeValue())
endtime = timeit.default_timer()
print("\nMean execution time floorTransactions:", round((endtime-starttime)/numRuns,4))


starttime = timeit.default_timer()
for i in range(numRuns):
    output = testPlatform.ceilingTransactions(testDataGen.getStockName(), testDataGen.getTradeValue())
endtime = timeit.default_timer()
print("\nMean execution time ceilingTransactions:", round((endtime-starttime)/numRuns,4))


starttime = timeit.default_timer()
for i in range(numRuns):
    rangeValues = sorted([testDataGen.getTradeValue(), testDataGen.getTradeValue()])
    output = testPlatform.rangeTransactions(testDataGen.getStockName(), rangeValues[0], rangeValues[1])
endtime = timeit.default_timer()
print("\nMean execution time rangeTransactions:", round((endtime-starttime)/numRuns,4))