# Notebook to generate accessorsm module

In [7]:
import sys

import inspect
import project

import pandas as pd

from pprint import pformat
from pathlib import Path

from mintalib import core
from mintalib.utils import random_prices, random_walk



In [8]:

def core_functions(first_param=('series', 'prices')):
    """ list of core functions """

    from mintalib import core

    result = []
    for k, v in vars(core).items():
        if k.startswith("_") or not k.islower():
            continue

        if isinstance(v, type) or not callable(v):
            continue

        params = list(inspect.signature(v).parameters)

        if params[0] not in first_param:
            continue

        result.append(k)

    return result


print(core_functions())

['calc_avgprice', 'calc_typprice', 'calc_wclprice', 'calc_midprice', 'calc_price', 'crossover', 'crossunder', 'flag_above', 'flag_below', 'invert_flag', 'updown_flag', 'calc_log', 'calc_exp', 'calc_roc', 'calc_diff', 'calc_min', 'calc_max', 'calc_sum', 'calc_mad', 'calc_stdev', 'calc_sma', 'calc_ema', 'calc_rma', 'calc_wma', 'calc_dema', 'calc_tema', 'calc_ma', 'calc_rsi', 'calc_plusdi', 'calc_minusdi', 'calc_adx', 'calc_trange', 'calc_atr', 'calc_natr', 'calc_latr', 'calc_psar', 'calc_cci', 'calc_cmf', 'calc_mfi', 'calc_bop', 'calc_bbands', 'calc_keltner', 'efficiency_ratio', 'calc_kama', 'calc_macd', 'calc_ppo', 'calc_slope', 'calc_curve', 'calc_stoch', 'streak_up', 'streak_down']


In [9]:

def get_series(data, item=None, *, default_item='close'):
    if hasattr(data, 'columns'):
        if item is None:
            item = default_item

        return data[item]

    if item is not None:
        raise ValueError("Cannot specify item with %s input!", type(data).__name__)

    return data


print(inspect.getsource(get_series))


def get_series(data, item=None, *, default_item='close'):
    if hasattr(data, 'columns'):
        if item is None:
            item = default_item

        return data[item]

    if item is not None:
        raise ValueError("Cannot specify item with %s input!", type(data).__name__)

    return data


In [10]:


def make_wrapper(func, accessor=None, verbose=False):
    fname = func.__qualname__
    module = func.__module__.rpartition('.')[2]
    qname = f"{module}.{fname}"
    indent = "    " if accessor else ""


    signature = inspect.signature(func)
    parameters = list(signature.parameters.values())
    Parameter = inspect.Parameter

    if verbose:
        print("signature", signature)
        print("parameters", parameters)

    newparams = []
    extras = dict()
    ftype = parameters[0].name

    for i, p in enumerate(parameters):
        if i == 0 and accessor:
            if p.name in ('series', 'prices'):
                p = p.replace(name='self')
            else:
                raise ValueError("Expected series or prices as first parameter")

        if p.name == 'wrap':
            extras.update(wrap=True)
            continue

        if p.name in ['period']:
            p = p.replace(annotation=int)

        for typ in (int, float, bool):
            if isinstance(p.default, typ):
                p = p.replace(annotation=typ)

        newparams.append(p)

    if ftype == 'series' and accessor != 'series':
        default = 'close' if accessor else None
        p = Parameter('item', Parameter.KEYWORD_ONLY, default=default, annotation=str)
        newparams.append(p)


    if verbose:
        print("newparams", newparams)
        print("extras", extras)

    newsig = inspect.Signature(newparams)

    name = func.__name__.upper()
    if name.startswith("CALC_"):
        name = name.partition('_')[2]

    def argument(arg):
        if arg == 'self':
            return ftype
        if arg in ('series', 'prices', 'period'):
            return arg
        return f"{arg}={arg}"

    args = [argument(p.name) for p in newparams if p.name not in ['item']]
    args += [f"{k}={v!r}" for k, v in extras.items()]

    args = ", ".join(args)

    code = ""


    if name.isupper():
        code += indent + "# noinspection PyPep8Naming\n"

    wrapper = "utils.wrap_accessor" if accessor else "utils.wrap_function"

    code += indent + f"@{wrapper}({qname})\n"
    code += indent + f"def {name}" + str(newsig) + ":\n"

    if ftype == 'series':
        if accessor == 'prices':
            code += indent + "    series = self.prices[item]\n"
        elif accessor == 'series':
            code += indent + "    series = self.series\n"
        else:
            code += indent + "    series = get_series(series, item=item)\n"

    if ftype == 'prices':
        if accessor == 'prices':
            code += indent + "    prices = self.prices\n"


    code += indent + f"    return {qname}({args})\n"

    return code



print(make_wrapper(core.calc_ema, accessor='series'))
print(make_wrapper(core.calc_ema, accessor='prices'))
print(make_wrapper(core.calc_trange, accessor='prices'))




    # noinspection PyPep8Naming
    @utils.wrap_accessor(core.calc_ema)
    def EMA(self, period: int, *, adjust: bool = False):
        series = self.series
        return core.calc_ema(series, period, adjust=adjust, wrap=True)

    # noinspection PyPep8Naming
    @utils.wrap_accessor(core.calc_ema)
    def EMA(self, period: int, *, adjust: bool = False, item: str = 'close'):
        series = self.prices[item]
        return core.calc_ema(series, period, adjust=adjust, wrap=True)

    # noinspection PyPep8Naming
    @utils.wrap_accessor(core.calc_trange)
    def TRANGE(self, *, log_prices: bool = False, percent: bool = False):
        prices = self.prices
        return core.calc_trange(prices, log_prices=log_prices, percent=percent, wrap=True)


In [None]:


REGISTER_ACCESSORS = '''
def register_accessors(name='ta', *, force=False):
    """ Register Pandas Accessors """

    if force or not hasattr(pd.Series, name):
        pd.api.extensions.register_series_accessor(name)(SeriesAccessor)

    if force or not hasattr(pd.DataFrame, name):
        pd.api.extensions.register_dataframe_accessor(name)(PricesAccessor)
'''


SERIES_ACCESSOR = '''
class SeriesAccessor:
    """ Pandas Series Accessor """

    def __init__(self, series):
        assert isinstance(series, pd.Series)
        self.series = series
'''


PRICES_ACCESSOR = '''
class PricesAccessor:
    """ Pandas Prices Accessor """

    def __init__(self, prices):
        assert isinstance(prices, pd.DataFrame)
        self.prices = prices
'''


def make_accessors(relative=False):
    buffer = "# Do not edit! File generated automatically...\n\n"

    package = '.' if relative else 'mintalib'

    buffer += f"import pandas as pd\n\n"
    buffer += f"from numpy import nan\n"
    buffer += f"from {package} import core\n"
    buffer += f"from {package} import utils\n"
    buffer += "\n"

    buffer += REGISTER_ACCESSORS + "\n"

    buffer += SERIES_ACCESSOR + "\n"

    accessor = 'series'
    names = core_functions(['series'])

    for name in names:
        func = getattr(core, name)
        text = make_wrapper(func, accessor=accessor)
        buffer += text + "\n"

    buffer += PRICES_ACCESSOR + "\n"

    accessor = 'prices'
    names = core_functions(['series', 'prices'])

    for name in names:
        func = getattr(core, name)
        text = make_wrapper(func, accessor=accessor)
        buffer += text + "\n"

    return buffer

code = make_accessors()
print(code)
exec(code, {})



In [27]:

code = make_accessors()
exec(code, {})

code = make_accessors(relative=True)

#accessors = project.pkgdir / "accessors.py"
#print(f"Updating {accessors.name} ...")
#accessors.write_text(code)
