In [1]:
import re

class Node:
    pass

class WeightedAssetNode(Node):
    def __init__(self, asset, weight):
        self.asset = asset
        self.weight = weight

class OperationNode(Node):
    def __init__(self, operation, children):
        self.operation = operation  # Operation: '+', '-', '/'
        self.children = children    # List of child nodes

class NumeraireNode(Node):
    def __init__(self, numeraire):
        self.numeraire = numeraire

class FunctionNode(Node):
    def __init__(self, function, argument):
        self.function = function  # e.g., 'vega', 'delta'
        self.argument = argument  # The AST subtree representing the argument

def tokenize(expression):
    return re.findall(r'[0-9.]+\*[a-zA-Z0-9]+|[a-zA-Z0-9]+|[+\-*/()]', expression)

def parse(tokens):
    print(f"DEBUG: Parsing tokens: {tokens}")  # 调试输出

    def parse_expression(index):
        children = []
        while index < len(tokens):
            token = tokens[index]
            print(f"DEBUG: Processing token: {token}")

            if token in ['vega', 'delta', 'theta']:  # 处理 Greek 函数
                print(f"DEBUG: Detected Greek function: {token}")
                if tokens[index + 1] == '(':
                    arg_tokens = tokens[index + 2:]  # Extract everything after '('
                    argument_ast, new_index = parse_expression(index + 2)
                    children.append(FunctionNode(token.lower(), argument_ast))
                    index = new_index
            elif token == '(':
                subtree, index = parse_expression(index + 1)
                children.append(subtree)
            elif token == ')':
                if len(children) == 1:
                    return children[0], index + 1  # 如果只有一个子节点，直接返回
                else:
                    return OperationNode('sequence', children), index + 1  # 如果有多个子节点，构建 `sequence`
            elif token in '+-*/':  # 运算符
                if children:
                    left = children.pop()
                    right, index = parse_expression(index + 1)
                    children.append(OperationNode(token, [left, right]))
                else:
                    raise ValueError(f"Operator {token} without left operand")
            elif re.match(r'[0-9.]+\*[a-zA-Z0-9]+', token):  # 带权重的资产
                weight, asset = token.split('*')
                print(f"DEBUG: Weighted asset detected: {weight} * {asset}")
                children.append(WeightedAssetNode(asset, float(weight)))
            elif re.match(r'[a-zA-Z0-9]+', token):  # 计价单位或其他标记
                print(f"DEBUG: Numeraire detected: {token}")
                children.append(NumeraireNode(token))
            else:
                raise ValueError(f"Unexpected token: {token}")
            index += 1

        if len(children) == 1:  # 如果只有一个子节点，直接返回它
            return children[0], index
        return OperationNode('sequence', children), index


    ast, _ = parse_expression(0)
    print(f"DEBUG: AST constructed: {ast}")  # 输出构建的 AST
    return ast



In [2]:
import yfinance as yf

fx_rates_cache = {}

def get_fx_rate(currency_pair):
    # Check cache first
    if currency_pair in fx_rates_cache:
        return fx_rates_cache[currency_pair]

    # Fetch real-time FX rate from Yahoo Finance
    ticker = f"{currency_pair}=X"
    try:
        data = yf.Ticker(ticker)
        price = data.history(period="1d")["Close"].iloc[-1]
        fx_rates_cache[currency_pair] = price
        return price
    except Exception as e:
        raise ValueError(f"Failed to fetch FX rate for {currency_pair}: {e}")


In [3]:
def validate_fx_pair(numerator_currency, denominator_currency):
    # Add supported currencies
    supported_currencies = ["USD", "RMB", "SGD", "EUR"]
    if numerator_currency not in supported_currencies or denominator_currency not in supported_currencies:
        raise ValueError(f"Unsupported currency pair: {numerator_currency}/{denominator_currency}")


In [4]:
historical_prices = {}

def get_historical_price(asset, date):
    # Check cache
    if asset in historical_prices and date in historical_prices[asset]:
        return historical_prices[asset][date]

    # Fetch historical price from Yahoo Finance
    try:
        data = yf.Ticker(asset)
        price = data.history(start=date, end=date)["Close"].iloc[0]
        if asset not in historical_prices:
            historical_prices[asset] = {}
        historical_prices[asset][date] = price
        return price
    except Exception as e:
        raise ValueError(f"Failed to fetch historical price for {asset} on {date}: {e}")

def evaluate_historical(node, date):
    if isinstance(node, WeightedAssetNode):
        price = get_historical_price(node.asset, date)
        return node.weight * price
    elif isinstance(node, OperationNode):
        left = evaluate_historical(node.children[0], date)
        right = evaluate_historical(node.children[1], date)
        if node.operation == '+':
            return left + right
        elif node.operation == '-':
            return left - right
        elif node.operation == '/':
            return left / right
    else:
        raise ValueError(f"Unsupported node type for historical evaluation: {type(node)}")


In [5]:
def get_market_prices(asset):
    prices = {
        'aapl': 150,
        'nvda': 200
    }
    return prices.get(asset.lower(), None)

def get_exchange_rate(currency_pair):
    rates = {
        'usdsgd': 1.35
    }
    return rates.get(currency_pair.lower(), None)


In [6]:
portfolios = {}

def handle_assignment(expression):
    if '=' in expression:
        var, portfolio_expr = map(str.strip, expression.split('=', 1))
        if not var.isidentifier():
            raise ValueError(f"Invalid variable name: {var}")
        tokens = tokenize(portfolio_expr)
        ast = parse(tokens)
        portfolios[var] = ast
        print(f"DEBUG: Portfolio {var} assigned.")
        return None
    return expression


In [7]:
def print_ast(node, depth=0):
    indent = "  " * depth
    if isinstance(node, FunctionNode):
        print(f"{indent}FunctionNode(function={node.function})")
        print_ast(node.argument, depth + 1)
    elif isinstance(node, OperationNode):
        print(f"{indent}OperationNode(operation={node.operation})")
        for child in node.children:
            print_ast(child, depth + 1)
    elif isinstance(node, WeightedAssetNode):
        print(f"{indent}WeightedAssetNode(asset={node.asset}, weight={node.weight})")
    elif isinstance(node, NumeraireNode):
        print(f"{indent}NumeraireNode(numeraire={node.numeraire})")
    else:
        print(f"{indent}UnknownNode(type={type(node)})")


In [8]:
def get_option_greeks(asset):
    greeks = {
        'aapl': {'delta': 0.5, 'vega': 0.2, 'theta': -0.05},
        'nvda': {'delta': 0.6, 'vega': 0.25, 'theta': -0.04}
    }
    return greeks.get(asset.lower(), {})

def calculate_greek(node, greek):
    print(f"DEBUG: Calculating Greek ({greek}) for node: {node} (type: {type(node)})")

    if isinstance(node, WeightedAssetNode):
        greeks = get_option_greeks(node.asset)
        result = node.weight * greeks.get(greek, 0)
        print(f"DEBUG: Greek value for {node.asset}: {result}")
        return result
    elif isinstance(node, OperationNode):
        if node.operation == 'sequence':
            print(f"DEBUG: Skipping 'sequence' node: {node}")
            # 遍历所有子节点，计算它们的 Greek 值
            result = sum(calculate_greek(child, greek) for child in node.children)
            print(f"DEBUG: Aggregated Greek value for 'sequence': {result}")
            return result
        left = calculate_greek(node.children[0], greek)
        right = calculate_greek(node.children[1], greek)
        print(f"DEBUG: Left Greek value: {left}, Right Greek value: {right}")
        if node.operation == '+':
            result = left + right
        elif node.operation == '-':
            result = left - right
        elif node.operation == '/':
            right_value = evaluate(node.children[1])  # 使用 evaluate 获取比例因子
            result = left / right_value
        else:
            raise ValueError(f"Unsupported operation for Greek calculation: {node.operation}")
        print(f"DEBUG: Operation Greek result ({node.operation}): {result}")
        return result
    elif isinstance(node, NumeraireNode):
        print(f"DEBUG: NumeraireNode encountered: {node.numeraire}. Returning scaling factor.")
        return evaluate(node)  # 获取汇率作为比例因子
    else:
        raise ValueError(f"Unsupported node type for Greek calculation: {type(node)}")

def evaluate(node):
    print(f"DEBUG: Evaluating node: {node} (type: {type(node)})")  # 调试输出

    if isinstance(node, FunctionNode):
        print(f"DEBUG: Function detected: {node.function}, Argument: {node.argument}")
        greek = node.function
        result = calculate_greek(node.argument, greek)
        print(f"DEBUG: Function result ({node.function}): {result}")
        return result
    elif isinstance(node, WeightedAssetNode):
        print(f"DEBUG: Weighted asset detected: {node.asset}, Weight: {node.weight}")
        price = get_market_prices(node.asset)
        print(f"DEBUG: Asset price: {price}, Weighted value: {node.weight * price}")
        return node.weight * price
    elif isinstance(node, OperationNode):
        print(f"DEBUG: Operation detected: {node.operation}, Children: {node.children}")
        left = evaluate(node.children[0])
        right = evaluate(node.children[1])
        print(f"DEBUG: Left value: {left}, Right value: {right}")
        if node.operation == '+':
            result = left + right
        elif node.operation == '-':
            result = left - right
        elif node.operation == '/':
            result = left / right
        else:
            raise ValueError(f"Unsupported operation: {node.operation}")
        print(f"DEBUG: Operation result ({node.operation}): {result}")
        return result
    elif isinstance(node, NumeraireNode):
        print(f"DEBUG: Numeraire detected: {node.numeraire}")
        exchange_rate = get_exchange_rate(node.numeraire)
        print(f"DEBUG: Exchange rate: {exchange_rate}")
        return exchange_rate
    else:
        raise ValueError(f"Unsupported node type: {type(node)}")




In [9]:
def main():
    print("Welcome to Portfolio Calculator!")
    print("Type 'exit' or 'quit' to leave the program.")
    while True:
        try:
            expression = input("Enter expression (e.g., vega((0.4*aapl + 0.6*nvda) / usdsgd)): ").strip()
            if expression.lower() in ['exit', 'quit', 'q']:
                print("Exiting Portfolio Calculator. Goodbye!")
                break

            print(f"DEBUG: Received expression: {expression}")
            tokens = tokenize(expression)
            print(f"DEBUG: Tokens generated: {tokens}")

            ast = parse(tokens)
            print("DEBUG: AST generated:")
            print_ast(ast)  # 输出 AST 结构

            result = evaluate(ast)
            print(f"Result: {result}")
        except Exception as e:
            print(f"Error: {e}")


if __name__ == "__main__":
    main()


Welcome to Portfolio Calculator!
Type 'exit' or 'quit' to leave the program.
DEBUG: Received expression: vega((0.4*aapl + 0.6*nvda) / usdsgd
DEBUG: Tokens generated: ['vega', '(', '(', '0.4*aapl', '+', '0.6*nvda', ')', '/', 'usdsgd']
DEBUG: Parsing tokens: ['vega', '(', '(', '0.4*aapl', '+', '0.6*nvda', ')', '/', 'usdsgd']
DEBUG: Processing token: vega
DEBUG: Detected Greek function: vega
DEBUG: Processing token: (
DEBUG: Processing token: 0.4*aapl
DEBUG: Weighted asset detected: 0.4 * aapl
DEBUG: Processing token: +
DEBUG: Processing token: 0.6*nvda
DEBUG: Weighted asset detected: 0.6 * nvda
DEBUG: Processing token: )
DEBUG: Processing token: usdsgd
DEBUG: Numeraire detected: usdsgd
DEBUG: AST constructed: <__main__.FunctionNode object at 0x0000017FDB5EB3A0>
DEBUG: AST generated:
FunctionNode(function=vega)
  OperationNode(operation=sequence)
    OperationNode(operation=+)
      WeightedAssetNode(asset=aapl, weight=0.4)
      WeightedAssetNode(asset=nvda, weight=0.6)
    NumeraireNode