In [9]:
# %%writefile qyear.py
# Wrapper class for Q-year format string values
class QYear:
    
    def __init__(self, label=None):
        if label is None:
            self.label = None
            self.year = 0
            self.quarter = 0
        elif len(label) != 6:
            raise ValueError('%s does not match correct value format: "YYYYQ#"' % label)
        else:
            self.label = label
            self.year = int(label[0:4])
            self.quarter = int(label[5])
    
    def __str__(self):
        return self.label
    
    def __repr__(self):
        return '<QYear year:%d, quarter:%d, label:%s>' % (self.year, self.quarter, self.label)
            
    def from_value(self, year, quarter):
        self.set_year(year)
        self.set_quarter(quarter)
        return self

    def set_year(self, year):
        self.year = year
        self.label = str(self.year) + 'Q' + str(self.quarter)
        return self

    def set_quarter(self, quarter):
        if quarter > 4 or quarter < 0:
            raise ValueError('Q-year quarter value must be between 1 and 4')
        self.quarter = quarter
        self.label = str(self.year) + 'Q' + str(self.quarter)
        return self

    # shift() - adds or subtracts quarters/years from a QYear object
    def shift(self, qMove, inplace=False):
        tempy = self.year
        tempq = self.quarter
        while qMove != 0:
            if qMove > 0:
                if tempq == 4:
                    tempy += 1
                    tempq = 1
                else:
                    tempq += 1
                qMove -= 1                   
            else:
                if tempq == 1:
                    tempy -= 1
                    tempq = 4
                else:
                    tempq -= 1
                qMove += 1
        if inplace:
            self.set_year(tempy)
            self.set_quarter(tempq)
            return self
        else:
            return QYear().set_year(tempy).set_quarter(tempq)
    
    # split() 
    def make_range(self, count, output='list', style='string'):
        out = []
        out.append(str(self))
        while count != -1 or count != 1:
            if count > 0:
                out.append(str(self.shift(1)))
                count -= 1
            else:
                out.insert(0, str(self.shift(-1)))
                count += 1
        return out

# q_diff() - returns the number of quarters between the start of q1 and the end of q2
#   a negative return value indicates that q1 occurs after q2
#   a positive return value indicates that q1 occurs before q2
#   a return value of 0 indicates that q1 and q2 are the same quarter
def q_diff(q1, q2):
    q1 = QYear(str(q1))
    q2 = QYear(str(q2))
    out = 0
    ydiff = q1.year - q2.year
    if ydiff > 0:
        out += -((ydiff - 1) * 4 + (4 - q1.quarter) + (4 - q2.quarter))
    elif ydiff < 0:
        out += -((ydiff - 1) * 4 + (4 - q1.quarter) + (4 - q2.quarter))
    else:
        out += q2.quarter - q1.quarter
    return out
    
# q_fill() - returns a list of all QYear labels from X to Y
def q_fill(start, end, output='list', style='string'):
    start = QYear(str(start))
    end = QYear(str(end))
    out = []
    out.append(start.label)
    diff = q_diff(start, end)
    while diff != 0:
        if diff < 0:
            start = start.shift(-1)
            out.append(start.label)
            diff = q_diff(start, end)
        elif diff > 0:
            start = start.shift(1)
            out.append(start.label)
            diff = q_diff(start, end)
    return out
    

Overwriting qyear.py


In [8]:
q_diff('2017Q2', '2017Q1')
test = q_fill('2018Q4', '2017Q3')
print(test)

['2018Q4', '2018Q3', '2018Q2', '2018Q1', '2017Q4', '2017Q3']


In [None]:
# %%writefile portfolio.py
class Portfolio:

    def __init__(self, source=None, mode='ref'):
        import pandas as pd
        self.source = pd.read_csv(source, index_col=('QYEAR'))
        self.tlist = None

    def add(self, tickers):
        if self is None:
            self = Portfolio()
        if tickers is not None:
            fixed = []
            for item in tickers:
                if ' US Equity' not in item:
                    item += ' US Equity'
                fixed.append(item)
            if self.tlist is None:
                self.tlist = list(set(fixed))
            else:
                self.tlist += fixed
                self.tlist = list(set(self.tlist))
        return self

    def set_source(self, source):
        self.source = source

    def set_mode(self, mode):
        if mode != 'ref' or mode != 'fetch':
            raise ValueError('mode must be "ref" or "fetch"')
        else:
            self.mode = mode
            
    def _update(self):
        if self.source is None:
            raise ValueError('No source specified for Portfolio')
            
    def _calc_eqbeta(self):
        # calc the portfolio beta
        if source is None:
           raise ValueError('A data source must be specified to perform Porfolio functions in "ref" mode')

    def qtr_return(self, qyear='all'):
        if self.source is None or self.tlist is None:
            raise ValueError('No source specified for Portfolio')
        if qyear is 'all':
            return None
        # Use QYear class to verify format of input
        vyears = sorted(set(self.source.index.get_level_values(0).values))
        if qyear not in vyears:
            raise ValueError('Requested QYear is not in the data source, %s' % qyear)
        return str((self.source.loc[self.source['TICKER'].isin(self.tlist)].loc[qyear,'RETURN'].sum()) * 100) + '%'

    def beta(self, qyear='all'):
        if self.source is None or self.tlist is None:
            raise ValueError('No source specified for Portfolio')
        if qyear is 'all':
            return None
        # Use QYear class to verify format of input
        vyears = sorted(set(self.source.index.get_level_values(0).values))
        if qyear not in vyears:
            raise ValueError('Requested QYear not available in data source, %s' % qyear)
        return self.source.loc[self.source['TICKER'].isin(self.tlist)].loc[qyear,'ADJUSTED_BETA'].mean()
    
    def sharpe(self, qyear='all'):
        if self.source is None or self.tlist is None:
            raise ValueError('No source specified for Portfolio')
        if qyear is 'all':
            return None
        # Use QYear class to verify format of input
        vyears = sorted(set(self.source.index.get_level_values(0).values))
        if qyear not in vyears:
            raise ValueError('Requested QYear not available in data source, %s' % qyear)
        ret = self.source.loc[self.source['TICKER'].isin(self.tlist)].loc[qyear,'RETURN'].sum()
        std_ret = self.source.loc[self.source['TICKER'].isin(self.tlist)].loc[qyear,'RETURN'].std()
        return (ret - 0.029)/std_ret

In [None]:
# %%writefile scoring.py

class CutoffScoreConfig:
    
    def __init__(self, cutoff, mag=1):
        if '%' in str(cutoff):
            self.style = 'percentile'
        else:
            self.style = 'value'
        self.cutoff = float(str(cutoff).split('%')[0])
        self.mag = mag
        self.scores = [1*mag, 0]
        self.config_type = 'csc'
        self.composite = None
        
    def applyto(self, array, weight='higher'):
        import numpy as np
        arrayFixed = array[~np.isnan(array)]
        if weight != 'higher' and weight != 'lower':
            raise ValueError('weight must be "higher" or "lower"')
        if weight == 'higher':
            self.composite = ((self.config_type, 'higher'), (1.00, np.percentile(arrayFixed, 100-self.cutoff)))
        else:
            self.composite = ((self.config_type, 'lower'), (1.00, np.percentile(arrayFixed, self.cutoff)))
        return self
    
class ThresholdScoreConfig:
    
    def __init__(self, thresholds, scores, mag=1, data='v'):
        if '%' in str(cutoff):
            self.style = 'percentile'
        else:
            self.style = 'value'
        self.cutoff = float(str(cutoff).split('%')[0])
        self.mag = mag
        self.scores = [1*mag, 0]
        self.code = 'tsc'
        self.composite = None
    # TODO: finish
    
class BoundedRatioScoreConfig:

    def __init__(self, lower=0, upper=0, mag=1, ):
        if upper < lower:
            raise ValueError('Lower bound must be numerically equal to or less than upper bound')
        self.mag = mag
        self.lower = lower
        self.upper = upper
        self.scores = [1*mag, 0]
        self.config_type = 'brsc'
        self.composite = ((self.config_type), (lower, upper))
    
    def center_on_average(self, array):
        import numpy as np
        return None
          
def score(composite, val):
    if composite[0][0] == 'csc':
        if 'higher' in composite[0]:
            # Higher values score better
            score = 0
            for pair in composite[1:]:
                if val >= pair[1]:
                    score = pair[0]
            return score
        elif 'lower' in composite[0]:
            # Lower values score better
            score = 0
            for pair in composite[1:]:
                if val <= pair[1]:
                    score = pair[0]
            return score
        else:
            raise ValueError('Invalid composite score')
    
    else:
        raise ValueError('Composite score configuartion not supported: {0} provided'.format(composite[0][0]))
        
def score_col(composite, df, col_name, name=None, inplace=False):
    # TODO: Implement a method that scores a df column and adds a score column to the df
    return None

In [None]:
#import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

test = np.random.rand(5000)*100
#print(test)
#pyplot.plot(test, '+b')
#np.percentile(test, 5)
comp = CutoffScoreConfig('89%').applyto(test, weight='lower').composite
print(comp)

In [None]:
c1 = (('csc', 'higher'), (0.5, 20), (30, 1.0))
c2 = (('csc', 'lower'), (20, 0.5), (10, 1.0))
c3 = (('csc', 'lower'), (0.25, 50), (0.75, 35), (0.85, 17), (1.0, 13))

score(c3, 19)

In [None]:
def score_percentile(array, val, divs=5, weight='high', mag=1):
    if weight != 'low' and weight != 'high':
        raise ValueError('weight must be "high" or "low"')
    if type(val) is not int and type(val) is not float:
        raise ValueError('Found val input of ' + str(type(val)))
    if divs < 2:
        raise ValueError('At least 2 divisions must be used to score input value: ' + str(divs) + ' entered')
    array = array.values
    arrayFixed = array[~np.isnan(array)]
    pd.Series(data=arrayFixed)
    quants = divs
    divs = divs - 1
    divSize = 100.0/quants
    scoreSize = 1.0/divs
    if weight == 'low':
        count = 0
        while count <= divs:
            if val <= np.percentile(arrayFixed, count * divSize):
                return float("{0:.2f}".format((divs-count) * scoreSize * mag))
            count = count+1
        return 0.0
    if weight == 'high':
        count = divs
        while count >= 0:
            if val >= np.percentile(arrayFixed, count * divSize):
                return float("{0:.2f}".format(count * scoreSize * mag))
            count = count-1
        return 0.0

In [1]:
%%writefile log.py

class Log:
    def __init__(self, status='OFF'):
        self.status = status
    def set_status(self, status='OFF'):
        if status != 'OFF' and status != 'ON' and status != 'DEBUG':
            raise ValueError('Invalid status request: try "ON", "OFF", or "DEBUG"')
        self.status = status
    def log(self, string):
        if self.status == 'ON':
            print(string)
        elif self.status == 'DEBUG':
            print('DEBUG: ' + string)
    # Add dump to file option

Writing log.py


In [None]:
debug = Log(status='ON')
test = (1,2,3)
debug.log('This is a tuple: {0}'.format(test))