In [1]:
# Notebook for performing technical analysis and backtesting on Sharadar data (alpha)
# Note: code produced with some assistance from Google Gemini



In [2]:
import pandas as pd
import numpy as np
import datetime
import typing
import unittest
import abc
import re
import typing
import inspect
import textwrap
import functools

In [3]:
sharadar_etfs = pd.read_csv("/kaggle/input/SFP.csv").sort_values(by=["ticker", "date"]).reset_index(drop=True)

In [4]:
display(sharadar_etfs.head())
display(sharadar_etfs.tail())

Unnamed: 0,ticker,date,open,high,low,close,volume,closeadj,closeunadj,lastupdated
0,AAA,2020-09-09,25.1,25.119,25.07,25.07,17327.0,21.144,25.07,2025-02-07
1,AAA,2020-09-10,25.06,25.07,25.046,25.068,23485.0,21.142,25.068,2025-02-07
2,AAA,2020-09-11,25.04,25.05,25.02,25.035,33362.0,21.114,25.035,2025-02-07
3,AAA,2020-09-14,25.01,25.06,25.01,25.02,13146.0,21.102,25.02,2025-02-07
4,AAA,2020-09-15,25.02,25.03,25.01,25.01,12069.0,21.093,25.01,2025-02-07


Unnamed: 0,ticker,date,open,high,low,close,volume,closeadj,closeunadj,lastupdated
13501536,^VIX,2025-02-19,15.14,15.96,15.05,15.27,0.0,15.27,15.27,2025-02-19
13501537,^VIX,2025-02-20,15.61,16.63,15.12,15.66,0.0,15.66,15.66,2025-02-20
13501538,^VIX,2025-02-21,15.63,19.03,15.28,18.21,0.0,18.21,18.21,2025-02-21
13501539,^VIX,2025-02-24,18.08,20.24,17.31,18.98,0.0,18.98,18.98,2025-02-24
13501540,^VIX,2025-02-25,19.09,21.48,18.85,19.43,0.0,19.43,19.43,2025-02-25


In [5]:
class Token:
    def __init__(self, type: str, value: str):
        self.type = type
        self.value = value

    def __repr__(self):
        return f"Token({self.type}, '{self.value}')"

In [6]:
class ParseTreeNode:
    def __init__(self, type: str, value: typing.Optional[str] = None, children: typing.Optional[typing.List["ParseTreeNode"]] = None):
        self.type = type
        self.value = value
        self.children = children or []
        self.start_index = None
        self.end_index = None

    def __repr__(self):
        return f"ParseTreeNode({self.type}, value={self.value}, children={self.children}, start_index={self.start_index}, end_index={self.end_index})"
    
    def reify(self, function_factory):
        if (self.type == "expression") and len(self.children) == 3 and self.children[0].type == "term" and self.children[1].value == "+" and self.children[2].type == "term":
            arg0 = self.children[0].reify(function_factory)
            arg1 = self.children[2].reify(function_factory)
            if(isinstance(arg0, int) or isinstance(arg0, float)) and (isinstance(arg1, int) or isinstance(arg1, float)):
                return arg0 + arg1
            else:
                # we need to use late binding
                f = function_factory.get("Add")
                params = {}
                pkeys = [a for a in f.parameters.keys()]
                params[pkeys[0]] = arg0
                params[pkeys[1]] = arg1
                return f.create_function(**params)
        # TODO: this doesn't have the associative property, so it's necessary to flatten the tree if we want to do away with the requirement for parentheses
        if (self.type == "expression") and len(self.children) == 3 and self.children[0].type == "term" and self.children[1].value == "-" and self.children[2].type == "term":
            arg0 = self.children[0].reify(function_factory)
            arg1 = self.children[2].reify(function_factory)
            if(isinstance(arg0, int) or isinstance(arg0, float)) and (isinstance(arg1, int) or isinstance(arg1, float)):
                return arg0 - arg1
            else:
                # we need to use late binding
                f = function_factory.get("Sub")
                params = {}
                pkeys = [a for a in f.parameters.keys()]
                params[pkeys[0]] = arg0
                params[pkeys[1]] = arg1
                return f.create_function(**params)
        if(self.type == "expression") and len(self.children) == 1:
            return self.children[0].reify(function_factory)
        if (self.type == "expression") and len(self.children) == 3 and self.children[0].type == "term" and self.children[1].value == "*" and self.children[2].type == "term":
            arg0 = self.children[0].reify(function_factory)
            arg1 = self.children[2].reify(function_factory)
            if(isinstance(arg0, int) or isinstance(arg0, float)) and (isinstance(arg1, int) or isinstance(arg1, float)):
                return arg0 * arg1
            else:
                # we need to use late binding
                f = function_factory.get("Mul")
                params = {}
                pkeys = [a for a in f.parameters.keys()]
                params[pkeys[0]] = arg0
                params[pkeys[1]] = arg1
                return f.create_function(**params)
        # TODO: this doesn't have the associative property, so it's necessary to flatten the tree if this is actually the plan
        if (self.type == "expression") and len(self.children) == 3 and self.children[0].type == "term" and self.children[1].value == "/" and self.children[2].type == "term":
            arg0 = self.children[0].reify(function_factory)
            arg1 = self.children[2].reify(function_factory)
            if(isinstance(arg0, int) or isinstance(arg0, float)) and (isinstance(arg1, int) or isinstance(arg1, float)):
                return arg0 / arg1
            else:
                # we need to use late binding
                f = function_factory.get("Div")
                params = {}
                pkeys = [a for a in f.parameters.keys()]
                params[pkeys[0]] = arg0
                params[pkeys[1]] = arg1
                return f.create_function(**params)
        if (self.type == "expression") and len(self.children) == 3 and self.children[0].type == "term" and self.children[1].value == "%" and self.children[2].type == "term":
            arg0 = self.children[0].reify(function_factory)
            arg1 = self.children[2].reify(function_factory)
            if(isinstance(arg0, int) or isinstance(arg0, float)) and (isinstance(arg1, int) or isinstance(arg1, float)):
                return arg0 % arg1
            else:
                # we need to use late binding
                f = function_factory.get("Mod")
                params = {}
                pkeys = [a for a in f.parameters.keys()]
                params[pkeys[0]] = arg0
                params[pkeys[1]] = arg1
                return f.create_function(**params)
        if (self.type == "expression") and len(self.children) == 3 and self.children[0].type == "term" and self.children[1].value == "**" and self.children[2].type == "term":
            arg0 = self.children[0].reify(function_factory)
            arg1 = self.children[2].reify(function_factory)
            if(isinstance(arg0, int) or isinstance(arg0, float)) and (isinstance(arg1, int) or isinstance(arg1, float)):
                return arg0 ** arg1
            else:
                # we need to use late binding
                f = function_factory.get("Pow")
                params = {}
                pkeys = [a for a in f.parameters.keys()]
                params[pkeys[0]] = arg0
                params[pkeys[1]] = arg1
                return f.create_function(**params)
        if (self.type == "expression") and len(self.children) == 3 and self.children[0].type == "term" and self.children[1].value == "<" and self.children[2].type == "term":
            arg0 = self.children[0].reify(function_factory)
            arg1 = self.children[2].reify(function_factory)
            if(isinstance(arg0, int) or isinstance(arg0, float)) and (isinstance(arg1, int) or isinstance(arg1, float)):
                return arg0 < arg1
            else:
                # we need to use late binding
                f = function_factory.get("Lt")
                params = {}
                pkeys = [a for a in f.parameters.keys()]
                params[pkeys[0]] = arg0
                params[pkeys[1]] = arg1
                return f.create_function(**params)
        if (self.type == "expression") and len(self.children) == 3 and self.children[0].type == "term" and self.children[1].value == "<=" and self.children[2].type == "term":
            arg0 = self.children[0].reify(function_factory)
            arg1 = self.children[2].reify(function_factory)
            if(isinstance(arg0, int) or isinstance(arg0, float)) and (isinstance(arg1, int) or isinstance(arg1, float)):
                return arg0 <= arg1
            else:
                # we need to use late binding
                f = function_factory.get("Le")
                params = {}
                pkeys = [a for a in f.parameters.keys()]
                params[pkeys[0]] = arg0
                params[pkeys[1]] = arg1
                return f.create_function(**params)
        if (self.type == "expression") and len(self.children) == 3 and self.children[0].type == "term" and self.children[1].value == ">" and self.children[2].type == "term":
            arg0 = self.children[0].reify(function_factory)
            arg1 = self.children[2].reify(function_factory)
            if(isinstance(arg0, int) or isinstance(arg0, float)) and (isinstance(arg1, int) or isinstance(arg1, float)):
                return arg0 > arg1
            else:
                # we need to use late binding
                f = function_factory.get("Gt")
                params = {}
                pkeys = [a for a in f.parameters.keys()]
                params[pkeys[0]] = arg0
                params[pkeys[1]] = arg1
                return f.create_function(**params)
        if (self.type == "expression") and len(self.children) == 3 and self.children[0].type == "term" and self.children[1].value == ">=" and self.children[2].type == "term":
            arg0 = self.children[0].reify(function_factory)
            arg1 = self.children[2].reify(function_factory)
            if(isinstance(arg0, int) or isinstance(arg0, float)) and (isinstance(arg1, int) or isinstance(arg1, float)):
                return arg0 >= arg1
            else:
                # we need to use late binding
                f = function_factory.get("Ge")
                params = {}
                pkeys = [a for a in f.parameters.keys()]
                params[pkeys[0]] = arg0
                params[pkeys[1]] = arg1
                return f.create_function(**params)
        if (self.type == "expression") and len(self.children) == 3 and self.children[0].type == "term" and self.children[1].value == "==" and self.children[2].type == "term":
            arg0 = self.children[0].reify(function_factory)
            arg1 = self.children[2].reify(function_factory)
            if(isinstance(arg0, int) or isinstance(arg0, float)) and (isinstance(arg1, int) or isinstance(arg1, float)):
                return arg0 == arg1
            else:
                # we need to use late binding
                f = function_factory.get("Eq")
                params = {}
                pkeys = [a for a in f.parameters.keys()]
                params[pkeys[0]] = arg0
                params[pkeys[1]] = arg1
                return f.create_function(**params)
        if (self.type == "expression") and len(self.children) == 3 and self.children[0].type == "term" and self.children[1].value == "!=" and self.children[2].type == "term":
            arg0 = self.children[0].reify(function_factory)
            arg1 = self.children[2].reify(function_factory)
            if(isinstance(arg0, int) or isinstance(arg0, float)) and (isinstance(arg1, int) or isinstance(arg1, float)):
                return arg0 != arg1
            else:
                # we need to use late binding
                f = function_factory.get("Ne")
                params = {}
                pkeys = [a for a in f.parameters.keys()]
                params[pkeys[0]] = arg0
                params[pkeys[1]] = arg1
                return f.create_function(**params)
                
        if self.type == "term" and len(self.children) == 1:
            return self.children[0].reify(function_factory)
        if self.type == "factor" and len(self.children) == 1:
            return self.children[0].reify(function_factory)
        if self.type == "factor" and len(self.children) == 3 and self.children[0].value == "(" and self.children[2].value == ")":
            return self.children[1].reify(function_factory)
        if self.type == "number" and (self.children is None or len(self.children) == 0):
            try:
                return int(self.value)
            except:
                return float(self.value)
        if self.type == "string" and self.children is None or len(self.children) == 0:
            return self.value
            
        if self.type == "factor" and len(self.children) == 4 and self.children[0].type == "identifier" and self.children[1].value == "(" and self.children[2].type == "arguments" and self.children[3].value == ")":

            identifier_node = self.children[0]
            name = identifier_node.value

            try:
                definition = function_factory.get(name)
            except ValueError:
                raise ValueError(f"Function '{name}' not found in the factory.")

            params = {}

            arguments_node = self.children[2]
            param_index = 0
            param_names = list(definition.parameters.keys())

            # Handle named arguments first
            named_params_processed = set()  # Keep track of named params

            def flatten_arguments(arguments_node):
                rv = []
                if arguments_node.children[0].type == "argument":
                    rv.append(arguments_node.children[0])
                    if(len(arguments_node.children) == 3 and arguments_node.children[1].value == ","):
                        rv.extend(flatten_arguments(arguments_node.children[2]))
                return rv
                
            flattened_arguments = flatten_arguments(arguments_node)

            #print(flattened_arguments)
                
            for argument_node in flattened_arguments:
                if len(argument_node.children) == 3 and argument_node.children[1].type == "operator" and argument_node.children[1].value == "=":
                    param_name = argument_node.children[0].value
                    param_value_node = argument_node.children[2]

                    if param_name in named_params_processed: # Skip already processed named parameters
                        continue

                    try:
                        param_def = definition.parameters[param_name]
                    except KeyError:
                        raise ValueError(f"Parameter '{param_name}' not found for indicator '{name}'.")

                    param_value = param_value_node.reify(function_factory)  # Evaluate the value node
                    params[param_name] = param_value
                    named_params_processed.add(param_name) # Add to the set of processed named parameters

            # Next, handle positional arguments (skip named ones)
            for argument_node in flattened_arguments:
                if len(argument_node.children) == 1:  # Positional argument
                    try:
                        param_name = param_names[param_index]
                        if param_name in named_params_processed: # Skip if already named
                            param_index += 1
                            continue

                        param_def = definition.parameters[param_name]
                    except IndexError:
                        raise ValueError(f"Incorrect number of positional parameters for '{name}'.")

                    param_value_node = argument_node.children[0]
                    param_value = param_value_node.reify(function_factory)
                    params[param_name] = param_value
                    param_index += 1



            required_params = set(definition.parameters.keys())
            provided_params = set(params.keys())
            if required_params != provided_params:
                missing = required_params - provided_params
                raise ValueError(f"Missing required parameters for {name}: {missing}")

            return definition.create_function(**params)

        raise ValueError(f"Cannot reify node of type: {self.type} with {len(self.children)} children: {self}")



In [7]:
class GrammarRule:
    def __init__(self, left: str, right: typing.List[str]):
        self.left = left
        self.right = right

    def __repr__(self):
        return f"{self.left} -> {' '.join(self.right)}"



In [8]:
class Grammar:
    def __init__(self, grammar_string: str):
        """Initializes a Grammar object by parsing the grammar string."""
        self.rules = []
        for line in grammar_string.strip().splitlines():
            if line.strip():  # Skip empty lines
                parts = line.split("->")
                if len(parts) != 2:
                    raise ValueError(f"Invalid grammar rule: {line}")
                left = parts[0].strip()
                right = [part.strip() for part in parts[1].split()]
                self.rules.append(GrammarRule(left, right))  # Store rules as attributes

    def build_parse_tree(self, tokens: typing.List["Token"], start_symbol: str = "expression") -> typing.Optional["ParseTreeNode"]:
        """Builds a parse tree from a list of tokens using the grammar rules."""

        def _parse(index: int, nonterminal: str, current_depth=0) -> typing.Optional["ParseTreeNode"]:
            applicable_rules = [rule for rule in self.rules if rule.left == nonterminal]

            if index >= len(tokens):  # End of tokens
                if any(not rule.right for rule in applicable_rules): # Check for a matching epsilon rule
                    return ParseTreeNode(nonterminal, children=[])
                return None # No matching epsilon rule

            if not applicable_rules:
                return None

            for rule in applicable_rules:
                rule_matched = True
                children = []
                current_index = index

                for symbol in rule.right:
                    if current_index >= len(tokens):
                        rule_matched = False
                        break

                    if current_index < len(tokens):
                        token = tokens[current_index]

                        if (symbol == token.type) or (symbol == f'"{token.value}"') or \
                           (symbol == "identifier" and token.type == "identifier") or \
                           (symbol == "number" and token.type == "number") or \
                           (symbol == "string" and token.type == "string") or \
                           (symbol == "operator" and token.type == "operator"):
                            child = ParseTreeNode(token.type, value=token.value)
                            child.start_index = current_index
                            child.end_index = current_index
                            children.append(child)
                            current_index += 1  # Increment for terminal

                        elif any(gr.left == symbol for gr in self.rules):
                            child_node = _parse(current_index, symbol, current_depth + 1)
                            if child_node:
                                children.append(child_node)
                                current_index = child_node.end_index + 1
                            else:
                                rule_matched = False
                                break

                        else:
                            rule_matched = False
                            break

                if rule_matched:
                    node = ParseTreeNode(nonterminal, children=children)
                    node.start_index = children[0].start_index if children else index # Handle epsilon rules where children is empty
                    node.end_index = children[-1].end_index if children else index -1 # Handle epsilon rules where children is empty

                    return node

            return None

        return _parse(0, start_symbol)  # Allow specifying the start symbol
        
    def parse(self, input_string: str, start_symbol: str = "expression"):
        """Parses an input string into a parse tree."""
        tokens = self.tokenize(input_string)  # Tokenize the input string
        return self.build_parse_tree(tokens, start_symbol)

    def tokenize(self, expression: str) -> typing.List[Token]:
        """
        Tokenizes a string expression, splitting on spaces and identifying operators.
        """
    
        # Pattern to match tokens. Note: if we wanted to be really fancy, we would specify the token types in the grammar.
        pattern = r"(\*\*|\*|/|//|%|\+|-|==|!=|<=|>=|<|>|=|!|&&|\|\||&|\||\^|~|<<|>>|\(|\)|\[|\]|\{|\}|,|:|\.|->|@|=|;|\+=|-=|\*=|/=|//=|%=|&=|\|=|\^=|\<<=|>>=)|'([^']+)'|\"([^\"]+)\"|(\d+\.?\d*)|([a-zA-Z_]\w*)"
    
        tokens = []
        for match in re.finditer(pattern, expression):
            operator_match = match.group(1)
            single_quote_match = match.group(2)
            double_quote_match = match.group(3)
            number_match = match.group(4)
            identifier_match = match.group(5)
    
            if operator_match:
                tokens.append(Token("operator", operator_match))
            elif single_quote_match:
                tokens.append(Token("string", single_quote_match))
            elif double_quote_match:
                tokens.append(Token("string", double_quote_match))
            elif number_match:
                tokens.append(Token("number", number_match))
            elif identifier_match:
                tokens.append(Token("identifier", identifier_match))
            else:
                raise ValueError(f"invalid token in {expression}")
    
        return tokens

In [9]:
class FunctionInstance:
    def __init__(self, name: str, parameters: typing.Dict[str, typing.Any], definition):
        self.name = name
        self.parameters = parameters
        self.definition = definition

    def evaluate_parameters(self, data):
        rv = {}

        for k in self.parameters:
            v = self.parameters[k]
            if isinstance(v, FunctionInstance):
                rv[k] = v.calculate(data)
            else:
                rv[k] = v
        return rv

    def calculate(self, data: pd.DataFrame): 
        """
        Screens the data using the screener's definition and parameters.

        Args:
            data: The Pandas DataFrame containing the data.

        Returns:
            A Pandas Dataframe
        """
        return self.definition.calculate(data, self.evaluate_parameters(data)) 

    def __repr__(self):
        params_str = ", ".join(f"{name}={value}" for name, value in self.parameters.items())
        return f"{self.definition.name}({params_str})"



In [10]:
class ParameterType:
    """
    A class for specifying parameters for screeners and indicators.
    """

    def __init__(self,
#                 name: str,
                 data_type: typing.Literal["integer", "real", "boolean", "string"],
                 min_val: typing.Union[int, float, None] = None,
                 max_val: typing.Union[int, float, None] = None,
                 default: typing.Any = None,
                 timeframe_defaults: typing.Dict[typing.Literal["tick", "1s", "5s", "15s", "1m", "2m", "5m", "15m", "1d", "1w", "1M"], typing.Any] = None,
                 increment: typing.Union[int, float, None] = None,
                 allowed_strings: typing.List[str] | None = None):
#        if not isinstance(name, str):
#            raise TypeError("name must be a string")
        if data_type not in ("integer", "real", "boolean", "string", "any"):
            raise ValueError("data_type must be 'integer', 'real', 'boolean', 'string', or 'any'")

        if min_val is not None:
            if data_type == "integer" and not isinstance(min_val, int):
                raise TypeError("min_val must be an integer for integer data_type")
            elif data_type in ("real", "integer") and not isinstance(min_val, (int, float)):
                raise TypeError("min_val must be a number for real or integer data_type")

        if max_val is not None:
            if data_type == "integer" and not isinstance(max_val, int):
                raise TypeError("max_val must be an integer for integer data_type")
            elif data_type in ("real", "integer") and not isinstance(max_val, (int, float)):
                raise TypeError("max_val must be a number for real or integer data_type")

        if timeframe_defaults is not None:
            if not isinstance(timeframe_defaults, dict):
                raise TypeError("timeframe_defaults must be a dictionary")
            for timeframe in timeframe_defaults:
                if timeframe not in ("tick", "1s", "5s", "15s", "1m", "2m", "5m", "15m", "1d", "1w", "1M"):
                    raise ValueError(f"Invalid timeframe: {timeframe}")

        if data_type == "integer" and increment is None:
            increment = 1
        elif data_type == "real" and increment is None:
            increment = 0.01

        if data_type == "string" and allowed_strings is not None and not isinstance(allowed_strings, list):
          raise TypeError("allowed_strings must be a list of strings")

        if data_type != "string" and allowed_strings is not None:
          raise ValueError("allowed_strings can only be specified for string data type")

#        self.name = name
        self.data_type = data_type
        self.min_val = min_val
        self.max_val = max_val
        self.default = default
        self.timeframe_defaults = timeframe_defaults or {}
        self.increment = increment
        self.allowed_strings = allowed_strings

    def get_default(self) -> typing.Any:
        return self.default

    def get_possible_values(self) -> typing.Iterable[typing.Any]:
        if self.data_type == "integer":
            if self.min_val is not None and self.max_val is not None:
                return range(self.min_val, self.max_val + 1)
        elif self.data_type == "real":
            if self.min_val is not None and self.max_val is not None:
                current = self.min_val
                while current <= self.max_val:
                    yield current
                    current += 0.01
        elif self.data_type == "boolean":
            return [True, False]
        elif self.data_type == "string":
            if self.allowed_strings is not None:  # Check if allowed_strings is defined
                return self.allowed_strings  # If defined, return those values
            else:
                return []  # Return an empty list if allowed_strings is None (unrestricted)
        return []

    def __repr__(self):
#        return f"ParameterType(name='{self.name}', data_type='{self.data_type}', min_val={self.min_val}, max_val={self.max_val}, default={self.default}, allowed_strings={self.allowed_strings})"
        return f"ParameterType(data_type='{self.data_type}', min_val={self.min_val}, max_val={self.max_val}, default={self.default}, allowed_strings={self.allowed_strings})"






In [11]:
class FunctionDefinition:
    def __init__(self, name: str, parameters: typing.Dict[str, "ParameterType"], calculation_function, factory=None): 
        if not isinstance(name, str):
            raise TypeError("name must be a string")

        if not isinstance(parameters, dict):
            raise TypeError("parameters must be a dictionary")

        if not all(isinstance(param, ParameterType) for param in parameters.values()):
            raise TypeError("All values in parameters must be ParameterType objects")

        if len(set(parameters.keys())) != len(parameters.keys()): # Check for duplicate keys
            raise ValueError("Parameter names must be unique.")

        if not callable(calculation_function):
            raise TypeError("calculation_function must be callable")

        self.name = name
        self.parameters = parameters
        self.calculation_function = calculation_function
        self.factory = factory

    def create_function(self, **kwargs: typing.Any) -> "FunctionInstance":
        params = {}
        for name, param_def in self.parameters.items():
            value = kwargs.get(name)

            if value is None:
                value = param_def.get_default()

            if param_def.data_type == "integer" and not isinstance(value, int):
                raise TypeError(f"Value for parameter '{name}' must be an integer")
            elif param_def.data_type == "real" and not isinstance(value, (int, float)):
                raise TypeError(f"Value for parameter '{name}' must be a number")
            elif param_def.data_type == "boolean" and not isinstance(value, bool):
                raise TypeError(f"Value for parameter '{name}' must be a boolean")
            elif param_def.data_type == "string" and not isinstance(value, str):
                raise TypeError(f"Value for parameter '{name}' must be a string")
            elif param_def.data_type in ("integer", "real"):
                if param_def.min_val is not None and value < param_def.min_val:  # Check min_val
                    raise ValueError(f"Value for parameter '{name}' must be greater than or equal to {param_def.min_val}")
                if param_def.max_val is not None and value > param_def.max_val:  # Check max_val
                    raise ValueError(f"Value for parameter '{name}' must be less than or equal to {param_def.max_val}")

            if param_def.data_type == "string" and param_def.allowed_strings is not None and value not in param_def.allowed_strings:
                raise ValueError(f"Value {value} is not in allowed strings for parameter {name}")

            params[name] = value

        return FunctionInstance(self.name, params, self)

    def calculate(self, data: pd.DataFrame, params: typing.Dict[str, typing.Any]) -> pd.DataFrame:
        """
        Calculates the function using the provided data and parameters.
        """
        kwargs = params.copy() 
        return self.calculation_function(data, **kwargs)

    def __repr__(self):
        return f"FunctionDefinition(name='{self.name}', parameters={self.parameters}, calculation_function={self.calculation_function.__name__ if hasattr(self.calculation_function, '__name__') else str(self.calculation_function)}, factory={self.factory})"


In [12]:
# FIXME: the grammar and reification modules do not correctly handle order of operations for operations
# that do not have the associative property. An easy workaround is to require parentheses.
# The parser does not handle epsilon correctly either.
# It's possible to alter this to use standard order of operations by permitting them in the grammar,
# flattening contiguous subexpression branches with equal-priority operators before reification,
# and performing the operations iteratively in the desired direction.

# REMINDER for LLM tools: This grammar MUST be an epsilon-free grammar.
# That means that rules may not be terminated explicitly.
# Higher rules have higher priority.
# More than one lookahead token is permitted.
default_grammar_specification = """
expression -> term "+" term
expression -> term "-" term
expression -> term "*" term
expression -> term "/" term
expression -> term "%" term
expression -> term "**" term
expression -> term "<" term
expression -> term "<=" term
expression -> term ">" term
expression -> term ">=" term
expression -> term "==" term
expression -> term "!=" term
expression -> term "&&" term
expression -> term "||" term
expression -> term "^^" term
expression -> term
term -> factor
term -> factor "[" expression "]"
factor -> "(" expression ")"
factor -> number
factor -> string
factor -> "-" factor
factor -> "!" factor
factor -> "+" factor
factor -> identifier "(" arguments ")"
factor -> identifier
factor -> optimization
optimization -> "@" identifier "(" expression "," optimization_arguments ")"
optimization -> "@" identifier "(" expression ")"
optimization_arguments -> optimization_argument "," optimization_arguments
optimization_arguments -> optimization_argument
optimization_argument -> argument
optimization_argument -> optimization_parameter
optimization_parameter -> "@" identifier "=" expression
arguments -> argument "," arguments
arguments -> argument
argument -> identifier "=" expression
argument -> expression
"""

class FunctionFactory:
    """
    A class to manage a suite of function definitions.
    """

    def __init__(self, grammar_specification=default_grammar_specification):
        self.function_definitions: typing.Dict[str, Definition] = {}
        self.grammar = Grammar(default_grammar_specification)

    def register(self, function_definition):
        """
        Registers a new screener definition.

        Args:
            function_definition: The Definition to register.

        Raises:
            ValueError: If a screener with the same name is already registered.
        """
#        if function_definition.name in self.function_definitions:
#            raise ValueError(f"A screener with the name '{function_definition.name}' is already registered.")
        self.function_definitions[function_definition.name] = function_definition
        function_definition.ffactory = self

    def get(self, name: str):
        """
        Retrieves a function definition by name.

        Args:
            name: The name of the screener.

        Returns:
            The FunctionDefinition object.

        Raises:
            ValueError: If no screener with the given name is registered.
        """
        if name not in self.function_definitions:
            raise ValueError(f"No function found with the name '{name}'.")
        return self.function_definitions[name]

    def parse(self, expression):
        parse_tree = self.grammar.parse(expression)
        reified_expression = parse_tree.reify(factory)
        return reified_expression

    def __repr__(self):
        return f"FunctionFactory(functions={self.function_definitions})"



In [13]:

factory = FunctionFactory()


In [14]:
def calculate_indicator_by(df, field, indicator_function, *args, **kwargs):
    """
    Calculates an indicator by a specified field within a Pandas DataFrame.

    Args:
        df (pd.DataFrame): The DataFrame containing the data.
        field (str): The field to group by (e.g., 'symbol', 'date').
        indicator_function (callable): The indicator function to apply.
        *args: Positional arguments to pass to the indicator function.
        **kwargs: Keyword arguments to pass to the indicator function.

    Returns:
        pd.DataFrame: The DataFrame with the calculated indicator(s) added.
    """
    if field in df.columns:
        result_dfs = []
        for group_value, group_df in df.groupby(field):
            result_dfs.append(indicator_function(group_df.copy(), *args, **kwargs).assign(**{field: group_value}))
        rv = pd.concat(result_dfs)
        return rv.drop(field, axis=1)
    else:
        return indicator_function(df.copy(), *args, **kwargs)





# Example indicator function (replace with your actual function)
def example_indicator(df, length):
    return df.assign(example_result=df['close'].rolling(length).mean())

# Example DataFrame:
data = pd.DataFrame({
    'symbol': ['AAPL', 'AAPL', 'MSFT', 'MSFT'],
    'close': [150, 152, 300, 305],
    'high': [155, 156, 310, 311],
    'low': [148, 149, 295, 296]
})

# Create a partial function representing a specific call:
calculate_example_indicator_length_5 = functools.partial(calculate_indicator_by, field="symbol", indicator_function=example_indicator, length=5)

# Now, you can use the partial function like a regular function:
result = calculate_example_indicator_length_5(data)

print(result)

#You can create other references as well.
calculate_example_indicator_length_10 = functools.partial(calculate_indicator_by, field="symbol", indicator_function=example_indicator, length=10)

result2 = calculate_example_indicator_length_10(data)

print(result2)

#Example without symbol column.
data_no_symbol = pd.DataFrame({'close':[1,2,3], 'high':[2,3,4], 'low':[0,1,2]})

calculate_example_indicator_length_3_no_symbol = functools.partial(calculate_indicator_by, field="symbol", indicator_function=example_indicator, length=3)

result3 = calculate_example_indicator_length_3_no_symbol(data_no_symbol)
print(result3)

   close  high  low  example_result
0    150   155  148             NaN
1    152   156  149             NaN
2    300   310  295             NaN
3    305   311  296             NaN
   close  high  low  example_result
0    150   155  148             NaN
1    152   156  149             NaN
2    300   310  295             NaN
3    305   311  296             NaN
   close  high  low  example_result
0      1     2    0             NaN
1      2     3    1             NaN
2      3     4    2             2.0


  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()


In [15]:
# Indicator Calculation Functions

def do_calculate_sma(df: pd.DataFrame, length: int) -> pd.DataFrame:
    print(f"calculate_sma(df, {length})")
    sma_values = df['close'].rolling(window=length).mean().values
    return pd.DataFrame({f"SMA({length})": sma_values}, index=df.index)


def do_calculate_rsi(df: pd.DataFrame, length: int) -> pd.DataFrame:
    length = int(length)
    delta = df['close'].diff()
    gains = delta.clip(lower=0)
    losses = -delta.clip(upper=0)
    avg_gains = gains.rolling(window=length).mean()
    avg_losses = losses.rolling(window=length).mean()
    rs = avg_gains / avg_losses.replace(0, float('inf'))
    rsi = 100 - (100 / (1 + rs))
    rsi_values = rsi.values
    return pd.DataFrame({f"RSI({length})": rsi_values}, index=df.index)

def do_calculate_macd(df: pd.DataFrame, fast_length: int, slow_length: int, signal_length: int) -> pd.DataFrame:
    ema_fast = df['close'].ewm(span=fast_length, adjust=False).mean()
    ema_slow = df['close'].ewm(span=slow_length, adjust=False).mean()
    macd = ema_fast - ema_slow
    signal = macd.ewm(span=signal_length, adjust=False).mean()
    histogram = macd - signal
    return pd.DataFrame({f'MACD({fast_length},{slow_length},{signal_length})["macd"]': macd.values, f'MACD({fast_length},{slow_length},{signal_length})["signal"]': signal.values, f'MACD({fast_length},{slow_length},{signal_length})["histogram"]': histogram.values}, index=df.index)  # No alignment needed for MACD

def do_calculate_bollinger_bands(df: pd.DataFrame, length: int, std_dev: float) -> pd.DataFrame:
    rolling_mean = df['close'].rolling(window=length).mean()
    rolling_std = df['close'].rolling(window=length).std()
    upper_band = rolling_mean + (rolling_std * std_dev)
    lower_band = rolling_mean - (rolling_std * std_dev)
    middle_values = rolling_mean.values
    upper_values = upper_band.values
    lower_values = lower_band.values
    bb_df = pd.DataFrame({f'BB({length},{std_dev})["middle"]': middle_values, f'BB({length},{std_dev})["upper"]': upper_values, f'BB({length},{std_dev})["lower"]': lower_values}, index=df.index)
    return bb_df

def do_calculate_rvwap(df: pd.DataFrame, length: int) -> pd.DataFrame:
    typical_price = (df['high'] + df['low'] + df['close']) / 3
    rolling_volume = df['volume'].rolling(length).sum()
    typical_price_x_volume = df["volume"] * typical_price
    rolling_typical_price_x_volume = typical_price_x_volume.rolling(length).sum()
    vwap = rolling_typical_price_x_volume / rolling_volume
    return pd.DataFrame({f"RVWAP({length})": vwap.values}, index=df.index)

calculate_sma = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_sma)
calculate_rsi = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_rsi)
calculate_macd = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_macd)
calculate_bollinger_bands = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_bollinger_bands)
calculate_rvwap = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_rvwap)


# Example usage

# SMA
sma_length_param = ParameterType("integer", min_val=1, max_val=200, default=20)
factory.register(FunctionDefinition("SMA", {"length": sma_length_param}, calculate_sma))

# RSI
rsi_length_param = ParameterType("integer", min_val=1, max_val=200, default=14) # Different default length
factory.register(FunctionDefinition("RSI", {"length": rsi_length_param}, calculate_rsi))

# MACD
fast_length_param = ParameterType("integer", min_val=1, max_val=100, default=12)
slow_length_param = ParameterType("integer", min_val=1, max_val=200, default=26)
signal_length_param = ParameterType("integer", min_val=1, max_val=50, default=9)
factory.register(FunctionDefinition("MACD", {"fast_length": fast_length_param, "slow_length": slow_length_param, "signal_length": signal_length_param}, calculate_macd))

# Bollinger Bands
bb_length_param = ParameterType("integer", min_val=1, max_val=200, default=20)
std_dev_param = ParameterType("real", min_val=0.1, max_val=5.0, default=2.0, increment=0.1)
factory.register(FunctionDefinition("BB", {"length": bb_length_param, "std_dev": std_dev_param}, calculate_bollinger_bands))

# VWAP
vwap_length_param = ParameterType("integer", min_val=1, max_val=200, default=20)
factory.register(FunctionDefinition("RVWAP", {"length": vwap_length_param}, calculate_rvwap))

df = sharadar_etfs

# Calculate indicators

sma_indicator = factory.parse("SMA(5)")  # Or SMA(length=5)
print(sma_indicator)
sma_result = sma_indicator.calculate(df)
df = df.join(sma_result)  # Add the result to your DataFrame
print(df)

rsi_indicator = factory.parse("RSI(14)")  # Or RSI(length=14)
rsi_result = rsi_indicator.calculate(df)
df = df.join(rsi_result)

bb_indicator = factory.parse("BB(20,2)")
bb_result = bb_indicator.calculate(df)
df = df.join(bb_result)

macd_indicator = factory.parse("MACD(12, 26, 9)")  # Or MACD(fast_length=12, slow_length=26, signal_length=9)
macd_result = macd_indicator.calculate(df)
df = df.join(macd_result)

rvwap_indicator = factory.parse("RVWAP(20)") # Or RVWAP(length=20)
rvwap_result = rvwap_indicator.calculate(df)
df = df.join(rvwap_result)

print(sma_indicator)
print(rsi_indicator)
print(macd_indicator)
print(rvwap_indicator)

print(df)


SMA(length=5)
calculate_sma(df, 5)


  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()


         ticker        date   open    high     low   close   volume  closeadj  \
0           AAA  2020-09-09  25.10  25.119  25.070  25.070  17327.0    21.144   
1           AAA  2020-09-10  25.06  25.070  25.046  25.068  23485.0    21.142   
2           AAA  2020-09-11  25.04  25.050  25.020  25.035  33362.0    21.114   
3           AAA  2020-09-14  25.01  25.060  25.010  25.020  13146.0    21.102   
4           AAA  2020-09-15  25.02  25.030  25.010  25.010  12069.0    21.093   
...         ...         ...    ...     ...     ...     ...      ...       ...   
13501536   ^VIX  2025-02-19  15.14  15.960  15.050  15.270      0.0    15.270   
13501537   ^VIX  2025-02-20  15.61  16.630  15.120  15.660      0.0    15.660   
13501538   ^VIX  2025-02-21  15.63  19.030  15.280  18.210      0.0    18.210   
13501539   ^VIX  2025-02-24  18.08  20.240  17.310  18.980      0.0    18.980   
13501540   ^VIX  2025-02-25  19.09  21.480  18.850  19.430      0.0    19.430   

          closeunadj lastup

  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()


In [16]:
display(df.head(50))

  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()


Unnamed: 0,ticker,date,open,high,low,close,volume,closeadj,closeunadj,lastupdated,SMA(5),RSI(14),"BB(20,2)[""middle""]","BB(20,2)[""upper""]","BB(20,2)[""lower""]","MACD(12,26,9)[""macd""]","MACD(12,26,9)[""signal""]","MACD(12,26,9)[""histogram""]",RVWAP(20)
0,AAA,2020-09-09,25.1,25.119,25.07,25.07,17327.0,21.144,25.07,2025-02-07,,,,,,0.0,0.0,0.0,
1,AAA,2020-09-10,25.06,25.07,25.046,25.068,23485.0,21.142,25.068,2025-02-07,,,,,,-0.00016,-3.2e-05,-0.000128,
2,AAA,2020-09-11,25.04,25.05,25.02,25.035,33362.0,21.114,25.035,2025-02-07,,,,,,-0.002915,-0.000609,-0.002307,
3,AAA,2020-09-14,25.01,25.06,25.01,25.02,13146.0,21.102,25.02,2025-02-07,,,,,,-0.006238,-0.001734,-0.004503,
4,AAA,2020-09-15,25.02,25.03,25.01,25.01,12069.0,21.093,25.01,2025-02-07,25.0406,,,,,-0.009567,-0.003301,-0.006266,
5,AAA,2020-09-16,24.97,24.984,24.97,24.97,14127.0,21.059,24.97,2025-02-07,25.0206,,,,,-0.015258,-0.005692,-0.009565,
6,AAA,2020-09-17,24.99,25.0,24.98,24.98,15160.0,21.068,24.98,2025-02-07,25.003,,,,,-0.018745,-0.008303,-0.010442,
7,AAA,2020-09-18,24.973,24.98,24.97,24.975,5339.0,21.064,24.975,2025-02-07,24.991,,,,,-0.021662,-0.010974,-0.010687,
8,AAA,2020-09-21,24.99,24.99,24.98,24.98,8555.0,21.068,24.98,2025-02-07,24.983,,,,,-0.023301,-0.01344,-0.009861,
9,AAA,2020-09-22,24.98,24.99,24.975,24.975,8067.0,21.064,24.975,2025-02-07,24.976,,,,,-0.024719,-0.015696,-0.009024,


In [17]:
def do_calculate_atr(df, length):
    """Calculates Average True Range (ATR)."""
    tr1 = df["high"] - df["low"]
    tr2 = abs(df["high"] - df["close"].shift(1))
    tr3 = abs(df["low"] - df["close"].shift(1))
    true_range = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
    atr = true_range.rolling(window=length).mean()
    return pd.DataFrame({f"ATR({length})": atr}, index=df.index)


def do_calculate_adx(df, length):
    """Calculates Average Directional Index (ADX) and Directional Movement Indicators."""
    high = df["high"]
    low = df["low"]
    close = df["close"]

    upmove = high - high.shift(1)
    downmove = low.shift(1) - low
    plus_dm = pd.Series(np.where((upmove > downmove) & (upmove > 0), upmove, 0))
    minus_dm = pd.Series(np.where((downmove > upmove) & (downmove > 0), downmove, 0))

    tr1 = high - low
    tr2 = abs(high - close.shift(1))
    tr3 = abs(low - close.shift(1))
    true_range = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)

    # Calculate +DI and -DI
    plus_di = 100 * (plus_dm.ewm(alpha=1 / length).mean() / true_range.ewm(alpha=1 / length).mean())
    minus_di = 100 * (minus_dm.ewm(alpha=1 / length).mean() / true_range.ewm(alpha=1 / length).mean())

    # Calculate DX
    dx = 100 * np.abs(plus_di - minus_di) / (plus_di + minus_di).replace(0, np.inf)

    # Calculate ADX
    adx = dx.ewm(alpha=1 / length).mean().fillna(0)

    return pd.DataFrame({f"ADX({length}):adx": adx, f"ADX({length}):pdi": plus_di, f"ADX({length})mdi": minus_di}, index=df.index)


def do_calculate_cci(df, length):
    """Calculates Commodity Channel Index (CCI)."""
    typical_price = (df["high"] + df["low"] + df["close"]) / 3
    ma_typical_price = typical_price.rolling(window=length).mean()
    mean_deviation = pd.Series(abs(typical_price - ma_typical_price)).rolling(window=length).mean()
    cci = (typical_price - ma_typical_price) / (0.015 * mean_deviation)
    return pd.DataFrame({f"CCI({length})": cci})


def do_calculate_cmf(df, length):
    """Calculates Chaikin Money Flow (CMF)."""
    money_flow = ((df["close"] - df["low"]) - (df["high"] - df["close"])) / (df["high"] - df["low"]) * df["volume"]
    money_flow_volume = money_flow.rolling(window=length).sum()
    volume_sum = df["volume"].rolling(window=length).sum()
    cmf = money_flow_volume / volume_sum
    return pd.DataFrame({f"CMF({length})": cmf}, index=df.index)


#def do_calculate_aroon(df, length):
#    """Calculates Aroon Up and Aroon Down."""
#    a = 100.0 / length
#    aroon_up = (length - df["high"].rolling(window=length).apply(lambda x: pd.Series(x).argmax())) * a
#    aroon_down = (length - df["low"].rolling(window=length).apply(lambda x: pd.Series(x).argmin())) * a
#    return pd.DataFrame({f"AROON({length}):up": aroon_up, f"AROON({length}):down": aroon_down}, index=df.index)

def do_calculate_aroon(df, length):
    """Calculates Aroon Up and Aroon Down (incremental optimization)."""
    high = df["high"].values
    low = df["low"].values
    aroon_up = np.zeros(len(df))
    aroon_down = np.zeros(len(df))

    if len(df) < length:
        return pd.DataFrame({f"AROON({length}):up": aroon_up, f"AROON({length}):down": aroon_down}, index=df.index)

    highest_index = 0
    lowest_index = 0

    for i in range(length, len(df)):
        window_start = i - length

        # Update highest index
        if highest_index < window_start:  # If the previous highest is outside the window
            highest_index = window_start
            for j in range(window_start + 1, i):
                if high[j] > high[highest_index]:
                    highest_index = j
        elif high[i - 1] >= high[highest_index]:
            highest_index = i - 1

        # Update lowest index
        if lowest_index < window_start:  # If the previous lowest is outside the window
            lowest_index = window_start
            for j in range(window_start + 1, i):
                if low[j] < low[lowest_index]:
                    lowest_index = j
        elif low[i - 1] <= low[lowest_index]:
            lowest_index = i - 1

        aroon_up[i] = (length - (i - 1 - highest_index)) * 100.0 / length
        aroon_down[i] = (length - (i - 1 - lowest_index)) * 100.0 / length

    return pd.DataFrame({f"AROON({length}):up": aroon_up, f"AROON({length}):down": aroon_down}, index=df.index)



def do_calculate_mfi(df, length):
    """Calculates Money Flow Index (MFI)."""
    typical_price = (df["high"] + df["low"] + df["close"]) / 3
    money_flow = typical_price * df["volume"]

    positive_money_flow = money_flow[df["close"] > df["close"].shift(1)]
    negative_money_flow = money_flow[df["close"] <= df["close"].shift(1)]

    positive_money_flow = positive_money_flow.rolling(window=length).sum()
    negative_money_flow = abs(negative_money_flow.rolling(window=length).sum())

    money_ratio = positive_money_flow / negative_money_flow
    mfi = 100 - (100 / (1 + money_ratio))
    return pd.DataFrame({f"MFI({length})": mfi}, index=df.index)


def do_calculate_pct_rank(df, length):
    """Calculates percentile rank using pandas only."""
    pct_rank = df['close'].rolling(window=length).apply(lambda x: (x < x[-1]).sum() / (len(x)-1) if len(x) > 1 else 0, raw=True)
    return pd.DataFrame({f"PCT({length})": pct_rank}, index=df.index)

def do_calculate_prp(df, length):
    """Calculates the Price Range Percentage."""
    high_max = df["high"].rolling(window=length).max()
    low_min = df["low"].rolling(window=length).min()
    range_width = high_max - low_min
    price_percentage = (df["close"] - low_min) / range_width * 100
    return pd.DataFrame({f"PRP({length})": price_percentage}, index=df.index)

def do_calculate_lret(df, length):
    """Calculates Log Return"""
    v = (df["close"] / df["close"].shift(length)).apply(lambda a: np.log(a) if a != np.nan else np.nan)
    return pd.DataFrame({f"LRET({length})": v}, index=df.index)


calculate_atr = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_atr)
calculate_adx = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_adx)
calculate_cci = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_cci)
calculate_cmf = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_cmf)
calculate_aroon = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_aroon)
calculate_mfi = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_mfi)
calculate_pct_rank = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_pct_rank)
calculate_prp = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_prp)
calculate_lret = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_lret)


In [18]:
atr_length_param = ParameterType("integer", min_val=1, max_val=200, default=14)
factory.register(FunctionDefinition("ATR", {"length": atr_length_param}, calculate_atr))

adx_length_param = ParameterType("integer", min_val=1, max_val=200, default=14)
factory.register(FunctionDefinition("ADX", {"length": adx_length_param}, calculate_adx))

cci_length_param = ParameterType("integer", min_val=1, max_val=200, default=14)
factory.register(FunctionDefinition("CCI", {"length": cci_length_param}, calculate_cci))

cmf_length_param = ParameterType("integer", min_val=1, max_val=200, default=14)
factory.register(FunctionDefinition("CMF", {"length": cmf_length_param}, calculate_cmf))

aroon_length_param = ParameterType("integer", min_val=1, max_val=200, default=14)
factory.register(FunctionDefinition("Aroon", {"length": aroon_length_param}, calculate_aroon))

mfi_length_param = ParameterType("integer", min_val=1, max_val=200, default=14)
factory.register(FunctionDefinition("MFI", {"length": mfi_length_param}, calculate_mfi))

pct_rank_length_param = ParameterType("integer", min_val=1, max_val=200, default=14)
factory.register(FunctionDefinition("PCT", {"length": pct_rank_length_param}, calculate_pct_rank))

prp_length_param = ParameterType("integer", min_val=1, max_val=200, default=14)
factory.register(FunctionDefinition("PRP", {"length": prp_length_param}, calculate_prp))

lret_length_param = ParameterType("integer", min_val=1, max_val=200, default=1)
factory.register(FunctionDefinition("LRET", {"length": lret_length_param}, calculate_lret))



In [19]:
atr_indicator = factory.parse("ATR(4)")
print(atr_indicator)
atr_result = atr_indicator.calculate(df)
df = df.join(atr_result)


adx_indicator = factory.parse("ADX(14)")
print(adx_indicator)
adx_result = adx_indicator.calculate(df)
df = df.join(adx_result)

cci_indicator = factory.parse("CCI(25)")
print(cci_indicator)
cci_result = cci_indicator.calculate(df)
df = df.join(cci_result)

cmf_indicator = factory.parse("CMF(9)")
print(cmf_indicator)
cmf_result = cmf_indicator.calculate(df)
df = df.join(cmf_result)

aro_indicator = factory.parse("Aroon(20)")
print(aro_indicator)
aro_result = aro_indicator.calculate(df)
df = df.join(aro_result)

mfi_indicator = factory.parse("MFI(25)")
print(mfi_indicator)
mfi_result = mfi_indicator.calculate(df)
df = df.join(mfi_result)

pct_indicator = factory.parse("PCT(21)")
print(pct_indicator)
pct_result = pct_indicator.calculate(df)
df = df.join(pct_result)

prp_indicator = factory.parse("PRP(25)")
print(prp_indicator)
prp_result = prp_indicator.calculate(df)
df = df.join(prp_result)

lret_indicator = factory.parse("LRET(5)")
print(lret_indicator)
lret_result = lret_indicator.calculate(df)
df = df.join(lret_result)


ATR(length=4)
ADX(length=14)
CCI(length=25)
CMF(length=9)
Aroon(length=20)
MFI(length=25)
PCT(length=21)
PRP(length=25)
LRET(length=5)


In [20]:

df




  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()


Unnamed: 0,ticker,date,open,high,low,close,volume,closeadj,closeunadj,lastupdated,...,ADX(14):pdi,ADX(14)mdi,CCI(25),CMF(9),AROON(20):up,AROON(20):down,MFI(25),PCT(21),PRP(25),LRET(5)
0,AAA,2020-09-09,25.10,25.119,25.070,25.070,17327.0,21.144,25.070,2025-02-07,...,0.000000,0.000000,,,0.0,0.0,,,,
1,AAA,2020-09-10,25.06,25.070,25.046,25.068,23485.0,21.142,25.068,2025-02-07,...,0.000000,34.532374,,,0.0,0.0,,,,
2,AAA,2020-09-11,25.04,25.050,25.020,25.035,33362.0,21.114,25.035,2025-02-07,...,0.000000,42.907014,,,0.0,0.0,,,,
3,AAA,2020-09-14,25.01,25.060,25.010,25.020,13146.0,21.102,25.020,2025-02-07,...,0.000000,29.021019,,,0.0,0.0,,,,
4,AAA,2020-09-15,25.02,25.030,25.010,25.010,12069.0,21.093,25.010,2025-02-07,...,0.000000,25.470221,,,0.0,0.0,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
13501536,^VIX,2025-02-19,15.14,15.960,15.050,15.270,0.0,15.270,15.270,2025-02-19,...,31.033769,18.765714,-48.889685,,25.0,20.0,,0.30,8.701135,-0.047948
13501537,^VIX,2025-02-20,15.61,16.630,15.120,15.660,0.0,15.660,15.660,2025-02-20,...,31.862941,17.599042,-22.748350,,20.0,15.0,,0.40,13.619168,-0.014580
13501538,^VIX,2025-02-21,15.63,19.030,15.280,18.210,0.0,18.210,18.210,2025-02-21,...,36.444658,15.089981,71.016970,,15.0,10.0,,0.95,45.775536,0.187276
13501539,^VIX,2025-02-24,18.08,20.240,17.310,18.980,0.0,18.980,18.980,2025-02-24,...,36.964397,13.473656,132.128761,,10.0,5.0,,1.00,55.485498,0.250788


In [21]:
def do_calculate_shift(df, series="close", n=1):
    name = "!unknown"
    if isinstance(series, str):
        name = series
        series = df[series]
    elif isinstance(series, pd.DataFrame):
        if(series.shape[1] != 1):
            raise ValueError(f"Shift() currently will operate on a single column only")
        name = series.columns[0]
        series = series.iloc[:, 0]
    elif isinstance(series, pd.Series):
        name = series.name
    return pd.DataFrame({f"Shift({name},{n})":series.shift(n)}, index=series.index)

shift_n_param = ParameterType("integer", min_val=1, max_val=200, default=1)
shift_series_param = ParameterType("any", default="close")

calculate_shift = functools.partial(calculate_indicator_by, field="symbol",indicator_function=do_calculate_shift)

factory.register(FunctionDefinition("Shift", {"series": shift_series_param, "n": shift_n_param}, calculate_shift))


  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()


Unnamed: 0,ticker,date,open,high,low,close,volume,closeadj,closeunadj,lastupdated,"Shift(RSI(14),1)",RSI(14)
0,AAA,2020-09-09,25.1,25.119,25.07,25.07,17327.0,21.144,25.07,2025-02-07,,
1,AAA,2020-09-10,25.06,25.07,25.046,25.068,23485.0,21.142,25.068,2025-02-07,,
2,AAA,2020-09-11,25.04,25.05,25.02,25.035,33362.0,21.114,25.035,2025-02-07,,
3,AAA,2020-09-14,25.01,25.06,25.01,25.02,13146.0,21.102,25.02,2025-02-07,,
4,AAA,2020-09-15,25.02,25.03,25.01,25.01,12069.0,21.093,25.01,2025-02-07,,
5,AAA,2020-09-16,24.97,24.984,24.97,24.97,14127.0,21.059,24.97,2025-02-07,,
6,AAA,2020-09-17,24.99,25.0,24.98,24.98,15160.0,21.068,24.98,2025-02-07,,
7,AAA,2020-09-18,24.973,24.98,24.97,24.975,5339.0,21.064,24.975,2025-02-07,,
8,AAA,2020-09-21,24.99,24.99,24.98,24.98,8555.0,21.068,24.98,2025-02-07,,
9,AAA,2020-09-22,24.98,24.99,24.975,24.975,8067.0,21.064,24.975,2025-02-07,,


In [24]:
#expression -> term "+" term
def calculate_add(df, a0, a1):
    return a0 + a1

add_a0_param = ParameterType("any")
add_a1_param = ParameterType("any")
factory.register(FunctionDefinition("Add", {"a0": add_a0_param, "a1": add_a1_param}, calculate_add))

#expression -> term "-" term
def calculate_sub(df, a0, a1):
    return a0 - a1

sub_a0_param = ParameterType("any")
sub_a1_param = ParameterType("any")
factory.register(FunctionDefinition("Sub", {"a0": sub_a0_param, "a1": sub_a1_param}, calculate_sub))

#expression -> term "*" term
def calculate_mul(df, a0, a1):
    return a0 * a1

mul_a0_param = ParameterType("any")
mul_a1_param = ParameterType("any")
factory.register(FunctionDefinition("Mul", {"a0": mul_a0_param, "a1": mul_a1_param}, calculate_mul))

#expression -> term "/" term
def calculate_div(df, a0, a1):
    return a0 / a1

div_a0_param = ParameterType("any")
div_a1_param = ParameterType("any")
factory.register(FunctionDefinition("Div", {"a0": div_a0_param, "a1": div_a1_param}, calculate_div))

#expression -> term "%" term
def calculate_mod(df, a0, a1):
    return a0 % a1

mod_a0_param = ParameterType("any")
mod_a1_param = ParameterType("any")
factory.register(FunctionDefinition("Mod", {"a0": mod_a0_param, "a1": mod_a1_param}, calculate_mod))

#expression -> term "**" term
def calculate_pow(df, a0, a1):
    return a0 ** a1

pow_a0_param = ParameterType("any")
pow_a1_param = ParameterType("any")
factory.register(FunctionDefinition("Pow", {"a0": pow_a0_param, "a1": pow_a1_param}, calculate_pow))

#expression -> term "<" term
def calculate_lt(df, a0, a1):
    return a0 < a1

lt_a0_param = ParameterType("any")
lt_a1_param = ParameterType("any")
factory.register(FunctionDefinition("Lt", {"a0": lt_a0_param, "a1": lt_a1_param}, calculate_lt))

#expression -> term "<=" term
def calculate_le(df, a0, a1):
    return a0 <= a1

le_a0_param = ParameterType("any")
le_a1_param = ParameterType("any")
factory.register(FunctionDefinition("Le", {"a0": le_a0_param, "a1": le_a1_param}, calculate_le))

#expression -> term ">" term
def calculate_gt(df, a0, a1):
    return a0 > a1

gt_a0_param = ParameterType("any")
gt_a1_param = ParameterType("any")
factory.register(FunctionDefinition("Gt", {"a0": gt_a0_param, "a1": gt_a1_param}, calculate_gt))

#expression -> term ">=" term
def calculate_ge(df, a0, a1):
    return a0 >= a1

ge_a0_param = ParameterType("any")
ge_a1_param = ParameterType("any")
factory.register(FunctionDefinition("Ge", {"a0": ge_a0_param, "a1": ge_a1_param}, calculate_ge))

#expression -> term "==" term
def calculate_eq(df, a0, a1):
    return a0 == a1

eq_a0_param = ParameterType("any")
eq_a1_param = ParameterType("any")
factory.register(FunctionDefinition("Eq", {"a0": eq_a0_param, "a1": eq_a1_param}, calculate_eq))

#expression -> term "!=" term
def calculate_ne(df, a0, a1):
    return a0 != a1

ne_a0_param = ParameterType("any")
ne_a1_param = ParameterType("any")
factory.register(FunctionDefinition("Ne", {"a0": ge_a0_param, "a1": ne_a1_param}, calculate_ne))

#expression -> term "&&" term
#expression -> term "||" term
#expression -> term "^^" term


In [25]:
foo = factory.parse("RSI(14)>=80")
foo.calculate(df)

Unnamed: 0,RSI(14)
0,False
1,False
2,False
3,False
4,False
...,...
13501536,False
13501537,False
13501538,False
13501539,False


In [26]:
# Example Screener Functions 

def top_n_screener_function(context, field, top_n):
    """
    Produces a boolean mask (pd.Series) for the top 5 readings per day.

    Args:
        df: Pandas DataFrame with columns for date and reading.
        date_col: Name of the column containing the date. Should be datetime or convertible.
        reading_col: Name of the column containing the reading.

    Returns:
        A pandas Series (boolean mask) with True for rows corresponding to the 
        top 5 readings for each day, and False otherwise. Returns
        an empty Series if the input DataFrame is empty.
    """
    print(field)
    if(isinstance(field, str)):
        foo = context.groupby("date")[field].rank(ascending=False, method='first')
    else:
        foo = field.groupby(by=context["date"]).rank(ascending=False, method="first")
    print(foo)
    mask = foo <= top_n 
    return mask.iloc[:, 0]


def percentile_screener_function(context, field, percentile):
    
    if(isinstance(field, str)):
        foo = context.groupby(by="date")[field].rank(ascending=False, method='first', pct=True)
    else:
        foo = field.groupby(by=context["date"]).rank(ascending=False, method="first", pct=True)
    mask = foo >= percentile
    
    return mask.iloc[:, 0]



# Top N Screener FunctionDefinition and Registration
top_n_field_param = ParameterType("any", default="return")  # Example allowed strings
top_n_n_param = ParameterType("integer", min_val=1, default=5)
factory.register(FunctionDefinition("TopN", {"field": top_n_field_param, "top_n": top_n_n_param}, top_n_screener_function))

# Percentile Screener Definition and Registration
percentile_field_param = ParameterType("any", default="return")
percentile_percentile_param = ParameterType("real", min_val=0.0, max_val=1.0, default=.1)
factory.register(FunctionDefinition("Percentile", {"field": percentile_field_param, "percentile": percentile_percentile_param}, percentile_screener_function))


## Example DataFrame (replace with your data)
data = {'date': ['2024-01-01', '2024-01-01', '2024-01-01', '2024-01-02', '2024-01-02', '2024-01-02'],
        'symbol': ['A', 'B', 'C', 'A', 'B', 'C'],
        'close': [0.10, 0.05, 0.15, 0.12, 0.08, 0.18],
        'other_field': [10, 20, 30, 15, 25, 35]}
df = pd.DataFrame(data)

# Create and use screeners
top_n_screener = factory.parse("TopN(top_n=2, field=LRET(1))")
top_n_result = top_n_screener.calculate(df)
print(top_n_result)
print("Top N Result:\n", df.sort_index().loc[top_n_result.sort_index()])

#percentile_screener = factory.parse("Percentile(percentile=0.5, field=LRET(1))")
#percentile_result = percentile_screener.calculate(df)
#print("Percentile Result:\n", df.loc[percentile_result])


    LRET(1)
0       NaN
3  0.182322
1       NaN
4  0.470004
2       NaN
5  0.182322
   LRET(1)
0      NaN
3      2.0
1      NaN
4      1.0
2      NaN
5      3.0
0    False
3     True
1    False
4     True
2    False
5    False
Name: LRET(1), dtype: bool
Top N Result:
          date symbol  close  other_field
3  2024-01-02      A   0.12           15
4  2024-01-02      B   0.08           25


  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_large_values = (abs_vals > 1e6).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  has_small_values = ((abs_vals < 10 ** (-self.digits)) & (abs_vals > 0)).any()
  return op(a, b)


In [27]:
df

Unnamed: 0,date,symbol,close,other_field
0,2024-01-01,A,0.1,10
1,2024-01-01,B,0.05,20
2,2024-01-01,C,0.15,30
3,2024-01-02,A,0.12,15
4,2024-01-02,B,0.08,25
5,2024-01-02,C,0.18,35


In [28]:
def do_normalize(context, weights):
    """Normalizes a Series of weights (including negative) to sum to 1 (absolute values)."""
    if isinstance(weights, pd.DataFrame):
        weights = weights.iloc[:, 0]
    elif not isinstance(weights, pd.Series):
        weights = pd.Series(weights)

    wt = weights.astype("float")
    absolute_weights = wt.abs()
    signs = wt.div(absolute_weights).fillna(0)
    total_absolute_weight = absolute_weights.sum()

    if total_absolute_weight == 0:
        return pd.DataFrame({"weight": [0.0] * len(weights)}, index=weights.index)

    normalized_absolute_weights = absolute_weights / total_absolute_weight
    normalized_weights = normalized_absolute_weights * signs

    return pd.DataFrame({"weight": normalized_weights}, index=weights.index)


def do_abs_threshold(context, weights, threshold):
    if isinstance(weights, pd.DataFrame):
        weights = weights.iloc[:, 0]
    elif not isinstance(weights, pd.Series):
        weights = pd.Series(weights)

    wt = weights.astype("float")
    absolute_weights = wt.abs()

    # Apply threshold
    wt[absolute_weights < threshold] = 0

    return pd.DataFrame({"weight": wt}, index=weights.index)

def do_abs_min_cutoff(context, weights, minimum):
    """Applies a minimum weight to a list of weights."""
    if isinstance(weights, pd.DataFrame):
        weights = weights.iloc[:, 0]
    elif not isinstance(weights, pd.Series):
        weights = pd.Series(weights)

    wt = weights.astype("float")
    absolute_weights = wt.abs()

    # Apply minimum
    mask = (absolute_weights > 0) & (absolute_weights < minimum)
    wt[mask] = np.sign(wt[mask]) * minimum

    return pd.DataFrame({"weight": wt}, index=weights.index)

def do_abs_max_cutoff(context, weights, maximum):
    if isinstance(weights, pd.DataFrame):
        weights = weights.iloc[:, 0]
    elif not isinstance(weights, pd.Series):
        weights = pd.Series(weights)

    wt = weights.astype("float")
    absolute_weights = wt.abs()

    # Apply maximum
    mask = absolute_weights > maximum
    wt[mask] = np.sign(wt[mask]) * maximum

    return pd.DataFrame({"weight": wt}, index=weights.index)


In [29]:
# Note: I am not completely happy with how this works; ideally, this would be merged with calculate_indicator_by

def apply_daily_function(context, weights, f, *args, **kwargs):
    if(isinstance(weights, str)):
        foo = context.groupby("date")[weights].apply(lambda a: f(context, a, *args, **kwargs))
    else:
        foo = weights.groupby(by=context["date"]).apply(lambda a: f(context, a, *args, **kwargs))
    # Swap levels and drop the 'date' level
    foo.index = foo.index.droplevel("date")    
    return foo.iloc[:, 0]



In [30]:
df = sharadar_etfs.copy()

calculate_normalize = functools.partial(apply_daily_function, f=do_normalize)
normalize_weights = ParameterType("any", default="weights")
factory.register(FunctionDefinition("Normalize", {"weights": normalize_weights}, calculate_normalize))

calculate_abs_threshold = functools.partial(apply_daily_function, f=do_abs_threshold)
abs_threshold_weights = ParameterType("any", default="weights")
abs_threshold_threshold = ParameterType("real", default=1e-6)
factory.register(FunctionDefinition("AbsThreshold", {"weights": abs_threshold_weights, "threshold": abs_threshold_threshold}, calculate_abs_threshold))

calculate_abs_min_cutoff = functools.partial(apply_daily_function, f=do_abs_min_cutoff)
abs_min_cutoff_weights = ParameterType("any", default="weights")
abs_min_cutoff_cutoff = ParameterType("real", default=1e-6)
factory.register(FunctionDefinition("AbsMinCutoff", {"weights": abs_min_cutoff_weights, "minimum": abs_min_cutoff_cutoff}, calculate_abs_min_cutoff))

calculate_abs_max_cutoff = functools.partial(apply_daily_function, f=do_abs_max_cutoff)
abs_max_cutoff_weights = ParameterType("any", default="weights")
abs_max_cutoff_cutoff = ParameterType("real", default=1e-6)
factory.register(FunctionDefinition("AbsMaxCutoff", {"weights": abs_max_cutoff_weights, "maximum": abs_max_cutoff_cutoff}, calculate_abs_max_cutoff))

#normalizer = factory.parse("Normalize(RSI(14) > 80)")
#normed_results = normalizer.calculate(df)
#print(normed_results)

cutter = factory.parse("AbsMinCutoff(weights=Normalize(RSI(14) > 80), minimum=1.0)")
cutter_results = cutter.calculate(df)
print(cutter_results)

7014        0.0
11443       0.0
32808       0.0
65071       1.0
93454       0.0
           ... 
13459239    0.0
13466070    0.0
13466352    0.0
13466634    0.0
13467390    0.0
Name: weight, Length: 13501541, dtype: float64


In [31]:
# ok now to add in optimizers...

import itertools
import random
import math

def grid_maximize(func, param_ranges, max_combinations=None, sampling_method="deterministic"):
    """
    Maximizes a function over a grid of parameters with optional sampling,
    with an optional limit on the maximum number of combinations checked.

    Args:
        func: The function to maximize. It should accept named parameters.
        param_ranges: A dictionary where keys are parameter names and values are dictionaries
                      containing 'min', 'max', and 'increment' keys.
        max_combinations: An optional integer specifying the maximum number of combinations to check.
                          If None, all combinations are checked.
        sampling_method: "deterministic" or "random". Determines the sampling method.

    Returns:
        A dictionary containing the maximized parameters and the corresponding function value.
    """

    param_names = list(param_ranges.keys())
    param_values_lists = []

    for param_name in param_names:
        min_val = param_ranges[param_name]['min']
        max_val = param_ranges[param_name]['max']
        increment = param_ranges[param_name]['increment']

        param_values = []
        current_val = min_val
        while current_val <= max_val:
            param_values.append(current_val)
            current_val += increment

        param_values_lists.append(param_values)

    best_params = None
    best_value = float('-inf')

    all_combinations = list(itertools.product(*param_values_lists))
    total_combinations = len(all_combinations)

    if max_combinations is not None and total_combinations > max_combinations:
        if sampling_method == "random":
            sampled_combinations = random.sample(all_combinations, max_combinations)
        elif sampling_method == "deterministic":
            step = math.ceil(total_combinations / max_combinations)
            sampled_combinations = all_combinations[::step]
        else:
            raise ValueError("Invalid sampling_method. Must be 'random' or 'deterministic'.")
    else:
        sampled_combinations = all_combinations

    for param_values in sampled_combinations:
        params = dict(zip(param_names, param_values))

        try:
            value = func(**params)
        except Exception as e:
            print(f"Error evaluating function with params {params}: {e}")
            continue

        if value > best_value:
            best_value = value
            best_params = params

    if best_params is None:
        return None

    return {"params": best_params, "value": best_value}

# Example usage:
def my_function(x, y, z):
    return - (x - 2)**2 - (y + 1)**2 - (z - 3)**2

param_ranges = {
    'x': {'min': 0, 'max': 4, 'increment': 1},
    'y': {'min': -3, 'max': 1, 'increment': 0.5},
    'z': {'min': 1, 'max': 5, 'increment': 2},
}

# Deterministic sampling
result_det = grid_maximize(my_function, param_ranges, max_combinations=10, sampling_method="deterministic")
print("Maximized parameters (deterministic):", result_det)

# Random sampling
result_rand = grid_maximize(my_function, param_ranges, max_combinations=10, sampling_method="random")
print("Maximized parameters (random):", result_rand)

# All combinations
result_all = grid_maximize(my_function, param_ranges)
print("Maximized parameters (all):", result_all)

# Invalid sampling method
try:
    grid_maximize(my_function, param_ranges, max_combinations=10, sampling_method="invalid")
except ValueError as e:
    print(f"Error: {e}")

Maximized parameters (deterministic): {'params': {'x': 2, 'y': -0.5, 'z': 3}, 'value': -0.25}
Maximized parameters (random): {'params': {'x': 2, 'y': -0.5, 'z': 3}, 'value': -0.25}
Maximized parameters (all): {'params': {'x': 2, 'y': -1.0, 'z': 3}, 'value': 0.0}
Error: Invalid sampling_method. Must be 'random' or 'deterministic'.


In [32]:
class OptimizerDefinition:
    def __init__(self, name: str, parameters: typing.Dict[str, "ParameterType"], optimization_function, factory=None): 
        if not isinstance(name, str):
            raise TypeError("name must be a string")

        if not isinstance(parameters, dict):
            raise TypeError("parameters must be a dictionary")

        if not all(isinstance(param, ParameterType) for param in parameters.values()):
            raise TypeError("All values in parameters must be ParameterType objects")

        if len(set(parameters.keys())) != len(parameters.keys()): # Check for duplicate keys
            raise ValueError("Parameter names must be unique.")

        if not callable(calculation_function):
            raise TypeError("calculation_function must be callable")

        self.name = name
        self.parameters = parameters
        self.calculation_function = calculation_function
        self.factory = factory

    def create_optimizer(self, **kwargs: typing.Any) -> "OptimizerInstance":
        params = {}
        for name, param_def in self.parameters.items():
            value = kwargs.get(name)

            if value is None:
                value = param_def.get_default()

            if param_def.data_type == "integer" and not isinstance(value, int):
                raise TypeError(f"Value for parameter '{name}' must be an integer")
            elif param_def.data_type == "real" and not isinstance(value, (int, float)):
                raise TypeError(f"Value for parameter '{name}' must be a number")
            elif param_def.data_type == "boolean" and not isinstance(value, bool):
                raise TypeError(f"Value for parameter '{name}' must be a boolean")
            elif param_def.data_type == "string" and not isinstance(value, str):
                raise TypeError(f"Value for parameter '{name}' must be a string")
            elif param_def.data_type in ("integer", "real"):
                if param_def.min_val is not None and value < param_def.min_val:  # Check min_val
                    raise ValueError(f"Value for parameter '{name}' must be greater than or equal to {param_def.min_val}")
                if param_def.max_val is not None and value > param_def.max_val:  # Check max_val
                    raise ValueError(f"Value for parameter '{name}' must be less than or equal to {param_def.max_val}")

            if param_def.data_type == "string" and param_def.allowed_strings is not None and value not in param_def.allowed_strings:
                raise ValueError(f"Value {value} is not in allowed strings for parameter {name}")

            params[name] = value

        return OptimizerInstance(self.name, params, self)

    def calculate(self, data: pd.DataFrame, params: typing.Dict[str, typing.Any]) -> pd.DataFrame:
        """
        Calculates the optimization using the provided data and parameters.
        """
        kwargs = params.copy() 
        return self.optimization_function(data, **kwargs)

    def __repr__(self):
        return f"OptimizerDefinition(name='{self.name}', parameters={self.parameters}, calculation_function={self.calculation_function.__name__ if hasattr(self.calculation_function, '__name__') else str(self.calculation_function)}, factory={self.factory})"

    

In [33]:
# This code is used for prompt engineering

def generate_simplified_signature_comment(obj):
    """Generates commented signatures and stubs"""

    comment = ""

    if inspect.isclass(obj):
        comment += f"# class {obj.__name__}:\n"

        class_docstring = inspect.getdoc(obj)
        if class_docstring and not _is_unhelpful_docstring(class_docstring):  # Check docstring
            wrapped_class_docstring = textwrap.dedent(class_docstring).strip()
            comment += f"#  \"\"\"" + "\n"
            for line in wrapped_class_docstring.splitlines():
                comment += f"#   {line}\n"  
            comment += f"#  \"\"\"" + "\n" 

        members = inspect.getmembers(obj)
    elif inspect.isfunction(obj):
        comment += f"# def {obj.__name__}:\n"
        members = [(obj.__name__, obj)]
    else:
        return None

    for name, member in members:
        if inspect.isfunction(member) or inspect.ismethod(member):
            signature = inspect.signature(member)
            params = []
            param_defs = []
            for param in signature.parameters.values():
                param_type = typing.get_type_hints(member).get(param.name) or "Any"
                default = ""
                if param.default is not inspect.Parameter.empty:
                    default = f" = {param.default!r}"
                params.append(f"{param.name}: {param_type}{default}")
                param_defs.append(f"{param.name}{default}")

            return_type = typing.get_type_hints(member).get('return') or "Any"

            comment += f"#  def {name}({', '.join(param_defs)}):\n"
            comment += f"#   # -> {return_type}\n"

            docstring = inspect.getdoc(member)
            if docstring and not _is_unhelpful_docstring(docstring):  # Check docstring
                wrapped_docstring = textwrap.dedent(docstring).strip()
                comment += f"#   \"\"\"" + "\n"  # 1-space indent
                for line in wrapped_docstring.splitlines():
                    comment += f"#    {line}\n"  # 1-space indent + 2 spaces for line content
                comment += f"#   \"\"\"" + "\n"  # 1-space indent
            comment += f"#   pass\n"  # 1-space indent

    return comment


def _is_unhelpful_docstring(docstring):
    """Checks if a docstring is likely to be auto-generated."""

    unhelpful_phrases = [
        "initialize self.  see help(type(self)) for accurate signature.",
        "see help(type(self)) for accurate signature.",
        "method generated by ide",
        "return repr(self)",
    ]

    cleaned_docstring = docstring.strip().lower()  # Lowercase and strip ONCE

    for phrase in unhelpful_phrases:
        if phrase in cleaned_docstring:  # Use simple string containment check
            return True

def _get_fully_qualified_type_name(type_hint):
    if type_hint is None:
        return "None"

    origin = typing.get_origin(type_hint)  # Use typing.get_origin

    if origin is not None:  # Generic type (List, Dict, etc.)
        args = typing.get_args(type_hint)   # Use typing.get_args
        if origin is typing.List:         # Use typing.List
            arg_str = ", ".join(_get_fully_qualified_type_name(arg) for arg in args) if args else ""
            return f"typing.List[{arg_str}]"
        elif origin is typing.Dict:        # Use typing.Dict
            arg_str = ", ".join(_get_fully_qualified_type_name(arg) for arg in args) if args else ""
            return f"typing.Dict[{arg_str}]"
        elif origin is typing.Optional:    # Use typing.Optional
            arg_str = _get_fully_qualified_type_name(args[0]) if args else ""
            return f"typing.Optional[{arg_str}]"
        elif origin is typing.Tuple:       # Use typing.Tuple
            arg_str = ", ".join(_get_fully_qualified_type_name(arg) for arg in args) if args else ""
            return f"typing.Tuple[{arg_str}]"
        elif origin is typing.Union:       # Use typing.Union. Added support for Union
            arg_str = ", ".join(_get_fully_qualified_type_name(arg) for arg in args) if args else ""
            return f"typing.Union[{arg_str}]"
        else:
            return origin.__module__ + "." + origin.__name__ if hasattr(origin, '__module__') else origin.__name__  # Handle other generics
    elif hasattr(type_hint, '__module__') and hasattr(type_hint, '__name__'):  # Regular class
        return type_hint.__module__ + "." + type_hint.__name__
    elif hasattr(type_hint, '__name__'):  # Regular class
        return type_hint.__name__
    else:
        return str(type_hint)  # Fallback to string representation

def list_module_objects(module=None):
    """Lists all functions and classes defined in the current module.

    Args:
        module: The module to inspect. If None, defaults to the current module.

    Returns:
        A list of tuples, where each tuple contains the name and the object 
        (function or class).  Returns an empty list if no suitable objects are found.
    """

    if module is None:
        import sys
        module = sys.modules[__name__]  # Get the current module

    objects = []
    for name, obj in inspect.getmembers(module):
        if inspect.isfunction(obj) or inspect.isclass(obj):
            if obj.__module__ == module.__name__: #check if object is defined in the current module
                objects.append((name, obj))
    return objects


if __name__ == "__main__":
    # Example usage:
    module_objects = list_module_objects()

    for name, obj in module_objects:
        comment = generate_simplified_signature_comment(obj)
        if comment:
            print(comment, end="#\n")
        #if you want to execute the prompt engineering on the current module
        #and write the output to a file, you can do this:
        # with open("prompt_engineering_output.txt", "a") as f:
        #     f.write(comment + "\n")

    # To list objects from another module (if needed):
    # import my_other_module
    # other_module_objects = list_module_objects(my_other_module)
    # for name, obj in other_module_objects:
    #   # ... process objects from the other module

# class FunctionDefinition:
#  def __init__(self, name, parameters, calculation_function, factory = None):
#   # -> Any
#   pass
#  def __repr__(self):
#   # -> Any
#   pass
#  def calculate(self, data, params):
#   # -> <class 'pandas.core.frame.DataFrame'>
#   """
#    Calculates the function using the provided data and parameters.
#   """
#   pass
#  def create_function(self, kwargs):
#   # -> <class '__main__.FunctionInstance'>
#   pass
#
# class FunctionFactory:
#  """
#   A class to manage a suite of function definitions.
#  """
#  def __init__(self, grammar_specification = '\nexpression -> term "+" term\nexpression -> term "-" term\nexpression -> term "*" term\nexpression -> term "/" term\nexpression -> term "%" term\nexpression -> term "**" term\nexpression -> term "<" term\nexpression -> term "<=" term\nexpression -> term ">" term\nexpression -> term ">=" term\nexpression -> term "==" term\nexpression -> term "!=" term\nexpression -> term "&&" term\nexpression -> term "||" ter

NameError: name 'OptimizerInstance' is not defined