# Combination rule testing

Based on the rules of different TA, try to optimize the set rule to maximized profit or reduced risk.

## Intention

## Classes

In [1]:
class Element():
    def __init__(self, element):
        self.element = element
    
    @property
    def element(self):
        return self.__element
    
    @element.setter
    def element(self, value):
        self.__element = self.parse_element(value)
    
    def parse_element(self, value):
        
        try:
            value = float(value)
            self.type = "constant"
            return value

        except Exception as e:
            self.type = "column"
            return value
            # raise ValueError( f"Element {value} from rule has no valid pattern." )
        

In [2]:
import operator


In [3]:
class Rule():

    __OPERATORS = {
            '>': operator.gt,
           '<': operator.lt,
           '>=': operator.ge,
           '<=': operator.le,
           '==': operator.eq,
           '!=': operator.ne
        }

    def __init__(self, rule):
        self.error = None
        self.rule = rule
        
        self.parse_operator(rule)

        self.elements = [ Element(e.strip()) for e in rule.split( self.operator ) ]
        
        # Join types of elements in rule to determine rule type
        # Sort help us in Ruletesting
        
        types = [ e.type for e in self.elements ]
        types.sort()
        self.type = "_".join( types )
    
    def parse_operator(self, rule):

        for o, application in self.__OPERATORS.items():
            if o in rule:
                self.operator = o
                self.apply_op = application
                return

        raise ValueError( f"No operator was found on rule: {rule}" )
        
    def column_constant(self):
        return self.__OPERATORS[ self.operator ]( self.asset.df[ self.elements[ 0 ].element ], self.elements[ 1 ].element )

    def column_column(self):
        return self.__OPERATORS[ self.operator ]( self.asset.df[ self.elements[ 0 ].element ], self.asset.df[ self.elements[ 1 ].element ] )
    
    def apply(self, asset):
        self.asset = asset

        return {
            "column_constant":self.column_constant,
            "column_column":self.column_column
        }[ self.type ]()


In [4]:
from copy import deepcopy
import re
import pandas as pd
import numpy as np

In [150]:
def rule_validation(asset):
    buy = False
    price_bought = None
    price_sold = None
    returns = []
    df = asset.df[ (asset.df["buy"] == True) | ( asset.df["sell"] == True ) ]
    df = df.replace(np.nan, 0)
    df = df[ (df[ "buy" ] != df["buy"].shift()) | (df[ "sell" ] != df["sell"].shift()) ]
    for i in range(len(df)):
        if not buy and df.iloc[i]["buy"] :
            buy = True
            price_bought = df.iloc[i]["close"]
            date_bought = df.index[ i ]

            trans = [ date_bought, price_bought ]
        
        elif buy and df.iloc[i]["sell"]:
            buy = False
            price_sold = df.iloc[i]["close"]
            date_sold = df.index[ i ]
            
            trans += [ date_sold, price_sold ]
            returns.append( trans )

    df = pd.DataFrame(returns, columns = ["date_bought", "bought", "date_sold", "sold"] )
    df["returns"] = ((df["sold"] / df["bought"]) - 1).round(3)
    df[ "acc" ] = (df["sold"] / df["bought"]).cumprod().round(3)
    # df["open"] = df["date_sold"] - df["date_bought"]

    return df

In [151]:
class RuleTesting():

    __OPERATORS = [ "and", "or", "xor" ]
    
    def __init__(self, asset, rules, target = None, **kwargs):
        
        self.asset = deepcopy(asset)
        self.rules = rules
        self.target = target

    def __str__(self):
        return f"Rules: "

    @property
    def target(self): return self.__target
    
    @target.setter
    def target(self, value):

        if self.type == "both":
            self.type_target = "sell"
        elif value is None:
            raise ValueError( f"If not sell rule, then a target value must be set" )
        elif value >= 1:
            self.type_target = "places"
        elif value < 1:
            self.type_target = "pct"
        else: 
            raise ValueError( f"Target value '{value}' is not valid." )

        self.__target = value

    @property
    def rules(self): return self.__rules
    
    @rules.setter
    def rules(self, value):
        assert isinstance(value, dict), f"Rules is not dictionary, is type {type(value)}"
        assert "buy" in value or "sell" in value, f"Rules keys must be 'buy' or 'sell'."
        
        keys = list( value.keys() )
        self.type = "both" if len(keys) == 2 else keys[0]

        self.__rules = value

    def apply(self, rules, operators):

        for i, rule in enumerate(rules):
            rule_elem = Rule( rule )
            self.asset.df[ rule ] = rule_elem.apply( self.asset )
            
            if i == 0: 
                first_series = self.asset.df[ rule ]
            elif operators[i] == "and":
                first_series += self.asset.df[ rule ]
            elif operators[i] == "or":
                first_series *= self.asset.df[ rule ]
            elif operators[i] == "xor":
                first_series ^= self.asset.df[ rule ]

        return first_series

    def get_operator(self, string):
        for o in self.__OPERATORS:
            if o in string:
                return o
        
        raise ValueError( f"No operator valid on str '{string}'")

    def parse_rule(self, rule):
        rule = rule.lower()

        operators_pattern = "|".join( self.__OPERATORS )

        operators = [m.start() for m in re.finditer(operators_pattern, rule )]        
        operators = [self.get_operator( rule[o:(o+3)] ) for o in operators ]
        operators = [None] + operators

        rules =  [ r.strip() for r in re.split( operators_pattern, rule ) ]

        assert len(operators) == len(rules), f"Rule and operators do not math in rule '{rule}'."

        return rules, operators

    def buy(self):
        rules, operators = self.parse_rule( self.rules["buy"] )
        self.asset.df["buy"] = self.apply( rules, operators ) 
    
    def sell_column(self):

        if self.type_target == "places":
            self.asset.df["sell"] = self.asset.df["buy"].shift( self.target )

        elif self.type_target == "pct":
            self.asset.df["sell"] = False
            true_values = self.asset.df[ self.asset.df["buy"] == True ].index.tolist()
            l = len(self.asset.df)
            close = self.asset.df["close"]

            for i in true_values:
                close_price = close[i]
                close_aux = close[ i: ]
                close_aux = ( close_aux / close_price ) - 1
                pct_index = close_aux[ close_aux > self.target ]

                if len(pct_index) == 0:
                    raise Exception( "Testing did not prove a better return." )
                
                pct_index = pct_index.index[0]

                self.asset.df["sell"].iloc[ pct_index ] = True

    def run(self):
        if self.type == "both":
            self.buy()
            self.sell()
        
        elif self.type == "buy":
            self.buy()
            self.sell_column()
        
        return self.validate()

    def validate(self):
        self.results = rule_validation( self.asset )
        
        return self.results


In [152]:
from sklearn.model_selection import ParameterGrid
from itertools import combinations
from copy import deepcopy

In [174]:
class RulesGenerator():
    def __init__(self, asset, rules, and_or = True, columns = {}, universe = [], target = [], **kwargs):
        """  
        
            universe (list):
                NOTE: AVOID INPUT with three annidated lists.
        """
        self.asset = asset
        self.rules = rules
        self.columns = columns

        # for i, v in kwargs.items(): self.__dict__[ i ] = v
        self.universe = universe
        self.target = target

        assert len(self.rules) >= len(self.universe), f"No same length universe of params and rules"

        self.and_or = and_or

    @property
    def columns(self):
        return self.__columns
    
    @columns.setter
    def columns(self, value):
        assert isinstance(value, dict), f"Column variable must be type dict, but got '{type(value)}'"
        
        # Some type of validation at this point
        self.__columns = value

    @property
    def grid(self):
        if hasattr(self, "__grid"):
            return self.__grid
        else:
            self.grid = self.get_grid()
            return self.__grid

    @grid.setter
    def grid(self, value):
        assert isinstance(value, list), f"Grid must be a list type, but got type {type(value)}"
        self.__grid = value

    def create_grid_from_list(self, universe):
        new_universe = { i:self.get_param_universe( single_universe ) for i, single_universe in enumerate(universe) }
        return [ tuple( p.values() ) for p in ParameterGrid( new_universe )]

    def get_param_universe(self, universe):
        """ Return list from param setting """
        if isinstance( universe, tuple ):
            return range( *universe )

        elif isinstance( universe, list ):
            types = set([type(i) for i in universe ])
            if list in types or tuple in types:
                # If got a universe where inside there are more list, we form a new grid of tupples
                universe = self.create_grid_from_list( universe )

        return universe

    def prep_rules(self):
        """ Generate a list of rule's combinations, considering even different universe parameters """
        if hasattr(self, "universe") and len(self.universe) > 0: 
            # Get rules to format
            rules2format = [ i for i, rule in enumerate(self.rules) if "{" in rule ]

            # Rules that do not have formatting in them
            remainrules = [ rule for i, rule in enumerate(self.rules) if "{" not in rule ]
            assert len(self.universe) <= len(rules2format), "Formating rules and universe of parameters do not match"

            # Iterate over formatting rules, and based on their universe, form the full set of rules from them.
            modified_rules = []
            for i, universe in enumerate( self.universe ):
                                
                if isinstance(universe, tuple):
                    # if tupple, we considere the universe as parameters for the range func
                    modified_rules.append( [ self.rules[ rules2format[i] ].format( pu ) for pu in range( *universe ) ] )

                elif isinstance(universe, list):
                    # When universe of parameter has a inside list that correspond to a str with two formatting areas
                    # From universe of params, create list of them and transform the universe to a grid of tupples to feed to formatting str
                    uni_param_grid = self.create_grid_from_list( universe )
                    modified_rules.append( [ self.rules[ rules2format[i] ].format( *u ) for u in uni_param_grid ] )
                    
                else:
                    raise ValueError( f"Invalid universe of parameters")
            
            # Add list of rules to LIST of LISTS [] + [ [] + [] ]
            rules = remainrules + modified_rules
        
        else:
            rules = self.rules
        
        # if more than one rule, then we have to do a combination
        if len(rules) > 1:
            rules_grid = []
            for L in range(1, len(rules) + 1):
                # Consider the "modified rules" as one
                for subset in combinations(rules, L):
                    subset = list(subset)
                    if L == 1:
                        if isinstance(subset[0], str):
                            rules_grid += subset
                        elif isinstance( subset[0], list ):
                            rules_grid += subset[0]

                    # if combination of more than one rule, we need to concatenate unmodified rule with set of modified rules
                    # Ex: 
                    # [sma > 0], [ rsi > 5, rsi > 10, rsi > 15 ]
                    # [sma > 0 , rsi > 5], [sma > 0 , rsi > 10], [sma > 0 , rsi > 15]
                    
                    else:
                        types_str = set([type(i) for i in subset ])
                        
                        # if only str types, then we can join to final list without further modification
                        if  types_str ==set([str]):
                            rules_grid += subset

                        # in this case, a str type and a list (as in the example) are in the subset array
                        else:
                            # Create a dictionary with possible values, to later on feed to ParemeterGrid 
                            def ensure_lists(x):
                                if isinstance(x, str):
                                    return [x]
                                elif isinstance(x, list):
                                    return x

                            subset_dict = { i:ensure_lists(s) for i, s in enumerate(subset) }

                            if isinstance( self.and_or, str ):
                                # Concat the operator to the combination rules generated on the ParameterGrid
                                join_str = f" {self.and_or} "
                                [ rules_grid.append( join_str.join( map( str, list(i.values())))) for i in ParameterGrid( subset_dict ) ]
                        
        else:
            rules_grid = rules

        self.rules_grid = rules_grid

        return [{"buy":r} for r in self.rules_grid]

    def prep_asset(self, columns):
        """ Returns a list of assets if columns variable has information """
        if len(columns) == 0:
            return [ self.asset ]

        assets = []

        columns = { col_name:self.get_param_universe( param ) for col_name, param in columns.items()}

        param_universe = list( ParameterGrid( columns ) )

        for param in param_universe:
            asset = deepcopy(self.asset)

            for col_name, p in param.items():
                    
                if isinstance(p, tuple):
                    asset.df[ col_name ] = getattr( asset, col_name )( *p )
                else:
                    asset.df[ col_name ] = getattr( asset, col_name )( p )

            assets.append( asset )

        # for col_name, param_universe in columns.items():
        #     param_universe = self.get_param_universe( param_universe )

        #     for param in param_universe:
        #         asset = deepcopy(self.asset)
        #         if isinstance(param, tuple):
        #             asset.df[ col_name ] = getattr( asset, col_name )( *param )
        #         else:
        #             asset.df[ col_name ] = getattr( asset, col_name )( param )

        #         assets.append( asset )

        return assets

    def get_grid(self):        
        
        rules = self.prep_rules()
        rules_grid = []

        for rule in rules:
            rule_str = list(rule.values())[0]
            
            asset_params = { name:param for name, param in self.columns.items() if name in rule_str }

            rule_dict = {}
            rule_dict["rules"] = [ rule ]
            rule_dict["asset"] = self.prep_asset(asset_params)        

            if hasattr(self, "target"):
                rule_dict["target"] = self.target

            rule_grid = list(ParameterGrid( rule_dict ))
            rules_grid += rule_grid 

        return rules_grid

    def run(self):
        self.rules_obj = [  RuleTesting( **params ) for params in self.grid ]
        results = []
        for i, ro in enumerate(self.rules_obj):
            df_aux =  ro.run()
            
            if len(df_aux) == 0:
                results.append([])
                continue
            
            positive = len( df_aux[df_aux["returns"] > 0])
            negative = len( df_aux[df_aux["returns"] <= 0])
            mean_ = df_aux["returns"].mean()
            min_ = df_aux["returns"].min()
            std_ = df_aux["returns"].std()
            sharpe = mean_ / std_
            sortino = mean_ / df_aux[ df_aux["returns"] < 0]["returns"].std()

            results.append( [ len(df_aux),  df_aux["acc"].iloc[-1], mean_, min_, std_, sharpe, sortino, positive, negative ] )
        
        self.results = pd.DataFrame( results, columns = [ "qty_trans", "acc", "mean", "max_drawdown", "std", "sharpe", "sortino", "positive", "negative" ] )

        assert len(self.results) == len(self.rules_obj), "Results and rule objects do not match"

## Testing

In [175]:
from trading import Asset
from datetime import date

In [176]:
asset = Asset(
    symbol="eth",
    fiat = "USDT",
    start = date(2022,5,1),
    end = date(2022,11,10),
    frequency = "1d",
    broker = "binance",
    from_ = "db"
)

In [177]:
rules = [ 
    "sma > close",
    "rsi > {}",
    "rsi_smoth > 0"
]

universe = [ 
    ( 20, 51, 10 ) # 20, 30, 40, 50
]

columns = { 
    "sma":(30, 121, 30), # 30, 60, 90, 120
    "rsi":(7, 22, 7), # 7, 14, 21
    "rsi_smoth":[  [7, 14], (2, 5)  ] # 7-2, 7-3, 7-4, 14-2, 14-3, 14-4
}

In [178]:
rg = RulesGenerator( asset=asset, rules=rules, universe=universe , target = [1,2,3], and_or = "and", columns=columns)
rg.run()
rg.results

Unnamed: 0,qty_trans,acc,mean,max_drawdown,std,sharpe,sortino,positive,negative
0,7,0.713,-0.045143,-0.174,0.065920,-0.684810,-0.764902,1,6
1,7,0.882,-0.017429,-0.065,0.034894,-0.499466,-0.809465,3,4
2,6,0.824,-0.029333,-0.091,0.072586,-0.404121,-0.726331,2,4
3,7,0.909,-0.010857,-0.174,0.075329,-0.144129,-0.115369,4,3
4,7,0.973,-0.003143,-0.065,0.047150,-0.066656,-0.114481,3,4
...,...,...,...,...,...,...,...,...,...
1387,1,1.023,0.023000,0.023,,,,1,0
1388,1,1.031,0.031000,0.031,,,,1,0
1389,1,1.055,0.055000,0.055,,,,1,0
1390,1,1.023,0.023000,0.023,,,,1,0


In [179]:
rg.results.sort_values(by = "acc", ascending = False)

Unnamed: 0,qty_trans,acc,mean,max_drawdown,std,sharpe,sortino,positive,negative
211,8,1.441,0.047875,-0.025,0.052406,0.913537,2.943716,6,2
214,8,1.350,0.039375,-0.025,0.053981,0.729419,2.421072,6,2
290,6,1.318,0.055333,-0.094,0.147125,0.376097,1.164633,3,3
98,5,1.295,0.063600,-0.158,0.164704,0.386148,,4,1
212,9,1.282,0.031889,-0.107,0.095812,0.332829,0.828260,6,3
...,...,...,...,...,...,...,...,...,...
188,4,0.762,-0.064500,-0.107,0.046018,-1.401622,-3.212974,0,4
185,4,0.762,-0.064500,-0.107,0.046018,-1.401622,-3.212974,0,4
71,4,0.762,-0.064500,-0.107,0.046018,-1.401622,-3.212974,0,4
102,7,0.713,-0.045143,-0.174,0.065920,-0.684810,-0.764902,1,6


In [126]:
rg.rule_obj[ 20 ].rules

{'buy': 'rsi > 55'}

In [127]:
from trading.strategy import Strategy

In [137]:
st = Strategy( asset, tas = "rsi" )

In [138]:
st.run_tas()

In [139]:
st.asset.df

Unnamed: 0_level_0,open,high,low,close,volume,quotevolume
date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2022-05-02,2824.81,2894.22,2778.78,2856.54,5.217069e+05,1.476063e+09
2022-05-03,2856.54,2861.47,2755.19,2781.70,3.713312e+05,1.046308e+09
2022-05-04,2781.70,2965.85,2771.74,2940.64,5.487381e+05,1.572901e+09
2022-05-05,2940.65,2954.65,2683.00,2747.97,7.348453e+05,2.055752e+09
2022-05-06,2747.96,2758.18,2632.95,2692.85,7.114493e+05,1.918268e+09
...,...,...,...,...,...,...
2022-11-06,1626.98,1639.02,1564.39,1568.29,4.148640e+05,6.685695e+08
2022-11-07,1568.25,1608.04,1545.03,1568.10,6.424094e+05,1.013047e+09
2022-11-08,1568.09,1579.98,1233.00,1334.77,2.339917e+06,3.356475e+09
2022-11-09,1334.78,1337.16,1073.53,1102.73,2.494798e+06,3.021176e+09


In [140]:
asset.__dict__

{'broker': 'binance', 'verbose': 0, '_asset': Binance: eth}

In [2]:
def test(a,b):

    t = Test(a,b)

    return ( t.a, t.b )

In [3]:
class Test():
    def __init__(self, a = 1, b = 2):
        self.a = a
        self.b = b

In [4]:
d = [
    {"a":2, "b":3},
    {"a":4, "b":5}
]

In [5]:
dd = [
    (2,3),
    (4,5)
]

In [6]:
import multiprocessing as mp

In [7]:
with mp.Pool( 2 ) as pool:
    r = pool.starmap(
        test,
        dd}
    )