In [352]:
import re

# 3rd part imports
import numpy as np
import pandas as pd
pd.options.mode.chained_assignment = None

import requests
from bs4 import BeautifulSoup as BS

from bokeh.io import output_notebook, show

output_notebook()

# COLORS FROM .XLSX

In [102]:
# native
import datetime
import os
from functools import wraps
from types import NoneType

# local
from dashboard.logic.constants import NONE_LIKE_LIST, nav_names
from dashboard.logic.io import getNewestFilename, total_assets, \
    downloadSheet, getOldestFilename, logError
 

# 3rd
import openpyxl
import numpy as np
import pandas as pd

  

def getPreviousFilename(files: list[str], ts_format:str="%Y-%m-%d_%H:%M:%S") -> str:
    """Returns the filename with oldest timestamp"""
    
    if len(files) == 1: 
        return files[0]
    
    return sorted(files, key=lambda x: datetime.datetime.strptime(x[:19], ts_format))[1] 
    
def readXlsx(route: str, newest_file: bool=True) -> DDF:

    # Open the downloaded .xlsx file with openpyxl
    path_to_file = os.path.join("dashboard", "cache", route)
    file_name = getNewestFilename(os.listdir(path_to_file)) if newest_file else getPreviousFilename(os.listdir(path_to_file))
    full_rel_path = os.path.join(path_to_file, file_name)

    # read .xlsx w/ openpyxl & select the worksheet
    workbook = openpyxl.load_workbook(full_rel_path, data_only=True, read_only=True)
    sheet_name = workbook.sheetnames[0]
    worksheet = workbook[sheet_name]

    # read .xslx with pandas & get number of rows & cols accurately
    df_pandas = pd.read_excel(io=full_rel_path, sheet_name=sheet_name, header=None)
    max_row = df_pandas.index.max() + 1
    max_col = df_pandas.columns.max() + 1

    # Read in the rows
    rows = [row for row in worksheet.iter_rows(min_row=1, max_row=max_row, max_col=max_col)]

    row_values = [] # create an empty list to hold the cell values
    # iterate over the rows and columns to capture each cell properties
    for row in rows:
        cell_values = []
        for cell in row:
            # get the cell value, color, fill & font style
            cell_value = np.nan if cell.data_type in ['f'] or cell.value is None else cell.value
            
            fill_obj = cell.fill
            cell_color = fill_obj.start_color.index if not isinstance(fill_obj, NoneType) else np.nan
            cell_color = 'FFFFFFFF' if cell_color in ['00000000', None, NoneType, np.nan] else cell_color
            
            #cell_fill = cell.fill.fill_type if cell.fill else np.nan
            
            cell_font = cell.font
            
            font_weight = 'bold' if not isinstance(cell_font, NoneType) and cell_font.b else 'normal'
            
            # append the cell value, color, and fill to the list
            cell_values.append({'value': cell_value, 
                                'color': f"#{cell_color[2:]}" if isinstance(cell_color, str) else np.nan,
                                'font-weight': font_weight})

        row_values.append(cell_values)

    # close workbook when read_only=True
    if workbook.read_only:
        workbook.close()

    # create a pandas dataframe from the list of cell values
    return DDF(row_values)

def getMetaDataDict(route_name: str) -> dict:
    """Returns sheet metadata dictionary based on route name."""
    return nav_names[[k for k,v in nav_names.items() if nav_names[k]['route_name'] == route_name][0]]

def formatToDollars(value, precision: int=2):
    """Format number to have commas separating thousans and leading $ sign."""
    if value == np.nan or pd.isna(value):
        return value
    try:
        float_value = float(value)
        return f"${float_value:,.{precision}f}"
    except ValueError:
        return value




# DECORATORS
def ioCacheAndLog(
    url: str,
    route: str,
    testing: bool=False):
    
    def my_decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            
            if not testing:
            
                # delete oldest file if 3 files present
                cache_path = os.path.join("dashboard", "cache", route)
                if not os.path.exists(cache_path):
                    os.mkdir(cache_path)
                
                files = os.listdir(cache_path)
                while len(files) > 2:
                    oldest_file = getOldestFilename(files=files)
                    files = [file for file in files if file != oldest_file]
                    os.remove(os.path.join(cache_path, oldest_file))
                
                # download google sheet as .xlsx file
                downloadSheet(url=url, file_path=cache_path, route=route)
            
            try:
                ddf = readXlsx(route=route, newest_file=True)
                result = func(ddf)
                # if any errors occured except block is executed
                return result
            
            # read data from cache
            except Exception as e:
                
                if not testing:
                    # capture and save occurred error log
                    logError(route=route, exception=e)
                else:
                    print(e)
                
                # read data from the previous cached file
                ddf = readXlsx(route=route, newest_file=False)
                newest_file = getNewestFilename(files)
                os.remove(os.path.join(cache_path, newest_file))
                
                result = func(ddf)
                return result
        return wrapper
    
    return my_decorator


In [338]:
def addButton(df: pd.DataFrame,
                  col_names: list[str]=None,
                  popover_style_str: str="",
                  button_name: str="Details",
                  button_class: str="btn btn-secondary btn-sm",
                  data_bs_html: str="true",
                  data_bs_toggle: str="popover",
                  data_bs_trigger: str="focus",
                  button_style: str="--bs-btn-font-size: .85rem;"):
        """Add button html to the value-dict."""
        def add_button(value):
            if value in NONE_LIKE_LIST:
                return value
            
            html_string = " ".join(
                (
                f'<button type="button" class="{button_class}"', 
                f'data-bs-content="<div style="{popover_style_str}">{value}</div>"',
                f'data-bs-html="{data_bs_html}" data-bs-toggle={data_bs_toggle} data-bs-trigger="{data_bs_trigger}"', 
                f'style="{button_style}">{button_name}</button>'
                )
            )
            return html_string 
        
        # apply to specified columns
        df[col_names] = df[col_names].applymap(add_button)
        
        # return entire DF
        return df

### Stocks Watchlist

In [193]:
#ddf = readXlsx('stocks_watchlist')
metadata = getMetaDataDict(route_name='stocks_watchlist')

@ioCacheAndLog(url=metadata['url'], route=metadata['route_name'], testing=True)
def stocks_watchlist_script(ddf:DDF) -> dict:
    """Return dict of {'obj_name': object}. Object can be pd.Series, DF, styler."""
    
    results = {}
    
    # get dict of DDFs
    ddfs = ddf.getDdfDict({
        'disclaimer': ('contains', 'These Valuations', 'down'),
        'watch': ('contains', "Neil's Value", 'down', 0)
        }
    )
    
    # DISCLAIMER DF
    df_disc = ddfs['disclaimer']

    # add period end of the sentence if not so
    df_disc = DDF(df_disc).v[0].to_frame().rename(columns={0: 'info'})
    df_disc.loc[:,'info'] = [x + '.' if x[-1] != '.' else x for x in df_disc['info']]

    # add color column and set it as cat for ordering purposes
    df_disc['color'] = ['warning', 'success', 'warning', 'success', 'success', 'danger']
    df_disc['color'] = pd.Categorical(df_disc['color'],
                                      categories=['success', 'warning', 'danger'],
                                      ordered=True)

    # add icon_id column
    icon_dict = {'warning': 'exclamation-triangle-fill', 'success': 'check-lg', 'danger':'exclamation-octagon-fill'}
    df_disc['icon_id'] = df_disc['color'].map(icon_dict)
    df_disc = df_disc.sort_values('color')
    
    # add 'df_disc' to results
    results['df_disc'] = df_disc
    
    # STOCKS WATCHLIST DF
    ddf_watch = ddfs['watch']
    df_watch_styled = (ddf_watch
        .setHeader() # set header
        .addButton(keys=['color', 'font-weight'], col_names=['Notes']) # add button
        .setStyle(keys=['color', 'font-weight'], subset=pd.IndexSlice[:, :'Sector'])
        .hide(axis='index')
        .set_table_attributes('class="stockwatch"')
    )
    # add to results
    results['df_watch_styled'] = df_watch_styled
    
    return results

#stocks_watchlist_script(ddf)



### Investments

In [406]:
from dashboard.logic.constants import styling_vars
from dashboard.logic.plots import pie_chart
from dashboard.logic.io import findRefRowCol

def calcTotalUSD(df: pd.DataFrame, col_name:str) -> pd.DataFrame:
    """Calculate total value if '#ERROR!' in col_name."""
    total_idx = np.where(a.apply(lambda x: x.str.contains(r"Total \(USD\)") == True))[0][0]
    exclude_cols = ['Monthly Income', np.nan]
 
    # check if total value has ERROR msg
    if df.loc[total_idx, col_name] in ["#ERROR!", "#NAME?"]:
        df.loc[total_idx, col_name] = 0
    else:
        return df
     
    # exclude error fields and monthly income
    df_mask = df[(~df[col_name].isin(['#ERROR!', '#NAME?'])) & (~df['Asset Class'].isin(exclude_cols))]
    
    # str to float and calc total sum
    df_mask.loc[col_name] = df_mask[col_name].replace(r"[\$,]", "", regex=True).astype(float)
    
    total = df_mask[col_name].sum()
    df.loc[total_idx, col_name] = "${0:,.2f}".format(total) # format numbers back to string

    return df

def getRelContainsIdx(df: pd.DataFrame, pattern: str, na=None):
    """Return relative row index where pattern is found."""
    return np.where(df.apply(lambda x: x.str.contains(pattern, na=np.nan) == True))[0][0]

class DDF(pd.DataFrame):
    
    EXCLUSIONS = NONE_LIKE_LIST
    
    def __init__(self, data=None, index=None, columns=None, dtype=None, copy=False):
        super().__init__(data=data, index=index, columns=columns, dtype=dtype, copy=copy)  
    
    @property
    def v(self):
        def extract_value(value_dict):
            return value_dict.get('value')
        return self.applymap(extract_value)

    def prop(self, property: str):
        def extract_property(value_dict):
            return value_dict.get(property)
        return self.applymap(extract_property)
    
    def getDdfDict(self, references_dict: dict[str, tuple[str,str,str]]) -> dict[str]:
        """Return dictionary of subset DDF-s based on reference dict.
        
        Args:

            references_dict (dict): Dictionary of df_name: tuple('method', 'string' , 'direction', int('col_idx1)).
                method has 2 options ['contains' and 'equals'], 'direction' has 3 options 
                ['one', 'down', 'up'] where one means only one line needs to be parsed, 'up' and 'down'
                respectively correspond to the parsing direction from the reference point.
            
        Returns:
            dict: Dictionary of {'df_name': df}"""
        
        
        def get_reference_row_col_idx(df: pd.DataFrame, pattern: str, method: str='contains') -> tuple[int, int]:
            """Find reference row and column numeric index values.

            Args:
                pattern (str): Character sequence or regular expression.
                method (str, optional): Finding reference via pattern within the text ('contains')
                    or equalling the value exactly ('equals'). Defaults to 'contains'.

            Raises:
                ValueError: Raises error if other than ['contains', 'equals'] is specified for the method.

            Returns:
                tuple[int, int]: Tuple of ['row_i', 'col_i']
            """
            
            
            # validate that method is correctly entered
            if method not in ['contains', 'equals']:
                raise ValueError(f"{method} can take only values: 'contains' or 'equals'!")
            
            if method == 'contains':
                index, column = np.where(df.apply(lambda x: x.str.contains(pattern) == True))
                return index[0], column[0]
            
            if method == 'equals':
                index, column = np.where(df == pattern)
                return index[0], column[0]
    
        def get_DDF(ddf: DDF, 
            row_idx1: int=None, 
            row_idx2:int=None, 
            col_idx1: int=None, 
            col_idx2: int=None,
            col_0: bool=True, 
            direction='down') -> DDF:
            """Slice DF till the first occuring empty row in given direction.

            Args:
                col_0 (bool, optional): If col_idx1 is actually first column. Defaults to True.
                direction (str, optional): Slice upwards or downwards from given row index. Defaults to 'down'.

            Raises:
                ValueError: If no row indices are specified.

            Returns:
                pd.DataFrame
            """
            
            # assert that at least one of the row indices is specified
            if row_idx1 is None and row_idx2 is None:
                raise ValueError(f"Both row indices can't equal {None}!") 
            
            # if column index is not the first column then None
            col_idx1 = col_idx1 if col_0 is True else None
            
            # find missing row index
            if direction == 'down':
                nan_mask = ddf.v.loc[row_idx1:,col_idx1:col_idx2].isna().all(axis='columns')
                row_idx2 = None if nan_mask.sum() == 0 else nan_mask.idxmax()

            if direction == 'up':
                nan_mask = ddf.v.loc[:row_idx2,col_idx1:col_idx2].isna().all(axis='columns')
                row_idx1 = None if nan_mask.sum() == 0 else nan_mask[::-1].idxmax() + 1
                row_idx2 += 1
                
            return DDF(ddf.loc[row_idx1:row_idx2,col_idx1:col_idx2])
        
        
        # set all cols str type, NaN -> 'nan'
        df = self.v.astype('O')
        
        # {ddf name: ddf} dictionary
        ddfs = {}
        for k,v in references_dict.items():
            
            # find reference position (row index & col index)
            row_i, col_i = get_reference_row_col_idx(df, pattern=v[1], method=v[0])
            
            # shift ref position if specified
            col_i = v[3] if len(v) == 4 else col_i
            
            if v[2] == 'one':
                ddfs[k] = DDF(self.loc[row_i,col_i:])
            if v[2] == 'down':
                ddfs[k] = get_DDF(self, row_idx1=row_i, col_idx1=col_i, direction=v[2])
            if v[2] == 'up':
                ddfs[k] = get_DDF(self, row_idx2=row_i, col_idx1=col_i, direction=v[2])

        # strip all NaN cols and rows
        ddfs_clean = {}
        for k,ddf_ in ddfs.items():
            
            # drop any NaN-s in the row
            if isinstance(ddf_.v, pd.Series):
                df_ = ddf_.v.dropna(how='any', axis='rows')
                ddfs_clean[k] = DDF(ddf_.loc[df_.index,0])
            # drop 
            else:
                df_ = (ddf_.v 
                    .dropna(how='all', axis='columns')
                    .dropna(how='all', axis='rows')
                )
                ddfs_clean[k] = DDF(ddf_.loc[df_.index, df_.columns])
        
        return ddfs_clean
    
    def addButton(self, keys: list=[],
                  col_names: list[str]=None,
                  button_name: str="Details",
                  button_class: str="btn btn-secondary btn-sm",
                  data_bs_html: str="true",
                  data_bs_toggle: str="popover",
                  data_bs_trigger: str="focus",
                  button_style: str="--bs-btn-font-size: .85rem;"):
        """Add button html to the value-dict."""
        def add_button(value_dict):
            
            # select only specified keys and values that are not none-like
            style_args = [f"{key}:{val}" for key,val in value_dict.items() \
                if key in keys and val not in DDF.EXCLUSIONS]
            
            # concat style args into one string
            style_str = ";".join(style_args) if len(style_args) > 0 else ""
            data_value = value_dict['value'] 
                      
            html_string = " ".join(
                (
                f'<button type="button" class="{button_class}"', 
                f'data-bs-content="<div style="{style_str}">{data_value}</div>"',
                f'data-bs-html="{data_bs_html}" data-bs-toggle={data_bs_toggle} data-bs-trigger="{data_bs_trigger}"', 
                f'style="{button_style}">{button_name}</button>'
                )
            )
            return {'value':html_string} if data_value not in DDF.EXCLUSIONS else {'value':""}
        
        # apply to specified columns
        self[col_names] = DDF(self[col_names]).applymap(add_button)
        
        # return entire DDF
        return DDF(self)
        
    def replaceValues(self, df:pd.DataFrame, keys: list=None):
        """Replace values in DDF based on values in DF. Return DDF with DF dimensions."""
        df_idx, df_cols = df.index, df.columns
        for i in df_idx:
            for c in df_cols:
                self.loc[i,c]['value'] = df.loc[i,c]
        return DDF(self.loc[df_idx,df_cols])

    def replaceProperties(self, subset: pd.IndexSlice, property_dict: dict):
        """Replace values in DDF based on property_dict keys and values."""
        
        def replace_values(value_dict):
            new_value_dict = {k:v for k,v in value_dict.items() if k not in property_dict.keys()}
            return new_value_dict | property_dict
        
        if isinstance(self.loc[subset], pd.Series):
            self.loc[subset] = self.loc[subset].apply(replace_values)
            return DDF(self)
        else:
            self.loc[subset] = self.loc[subset].applymap(replace_values)
            return DDF(self)
               

    def setHeader(self, header_idx: int=None, col_names: list[str]=None):
        """Returns DDF with specified header."""
        
        if col_names is not None:
            self.columns = col_names
            return self
        
        header_idx = self.index[0] if header_idx is None else header_idx
        
        columns = self.v.loc[header_idx]
        index = self.loc[header_idx+1:].index
        
        return DDF(data=self.loc[header_idx+1:].values, index=index, columns=columns)

    def setIndex(self, keys):
        index_array = self.v[keys]
        cols_to_drop = [keys] if isinstance(keys, str) else keys
        return DDF(self.set_index(index_array).drop(columns=cols_to_drop))
    
    def setStyle(self, keys: list=[], subset:pd.IndexSlice=pd.IndexSlice[:,:]):
        """Return the styled values DF."""
        def get_style_kwargs(value_dict):
            return {k:v for k,v in value_dict.items() if k in keys}

        df_style = self.v.style.format(precision=2)
        for row_i in self.index:
            for col in self.columns:
                df_style.set_properties(subset=pd.IndexSlice[row_i,col], **get_style_kwargs(self.loc[row_i, col]))
        return df_style
      

#ddf = readXlsx('investments')

metadata = getMetaDataDict(route_name='investments')

@ioCacheAndLog(url=metadata['url'], route=metadata['route_name'], testing=True)
def investments_script(ddf:DDF) -> dict:
    
    results = {}
    
    # read in sub DDFs
    ddfs = ddf.getDdfDict({
        'main' : ("contains", "Monthly Income", 'up'),
        'ads' : ("contains", 'My Finance Course', 'down'),
        'announce' : ("contains", "Jul 2022: I'm", 'up'),
        'advice' : ("contains", '3x Excellent', 'up'),
        'warning_msg' : ("contains", 'NOTE: Occasionally', 'one'),
        'risk' : ("equals", "RISK", 'down', 0),
        'historical' : ("equals", "My Historical Investments", "down"),
        'cash_pos' : ("contains", "CASH POSITION", 'one'),
        'general_notes' : ("equals", "GENERAL NOTES", "down"),
        'success' : ("equals", "Investment Success:", "down")
        }
    )
    
    # ADS
    ddf_ads = ddfs['ads']
    ddf_ads.setHeader(col_names=['text', 'hyperlink'])
    
    df_ads = ddf_ads.v
    df_ads['icon'] = df_ads.text.str.extract(r"\s*(\S)")
    df_ads['text'] = df_ads.text.str.extract(r"(\b.+[^\s])")

    # manually generated headers
    # headers = ['My Finance Course', 'My UK Property Courses', 'Mentoring', 'Metals Globally', 
    #         'Metals USA', 'Metals UK', 'Crypto Security', 'Stock Platform', 'Bank Account']
    
    # dynamically generated headers
    headers = [" ".join(string.split()[:4]) for string in df_ads['text']]
    
    icons_html_dict = {'My Finance Course' : 'bi bi-graph-up-arrow', 
                        'My UK Property Courses' : 'bi bi-house', 
                        'Mentoring' : 'fa-regular fa-handshake', 
                        'Metals Globally': 'bi bi-globe-asia-australia', 
                        'Metals USA': 'bi bi-currency-dollar', 
                        'Metals UK': 'bi bi-currency-pound', 
                        'Crypto Security': 'bi bi-currency-bitcoin', 
                        'Stock Platform': 'fa-solid fa-chart-column', 
                        'Bank Account': 'bi bi-bank'}

    df_ads['header'] = headers
    
    df_ads['new_icon_html'] = df_ads['header'].map(icons_html_dict)
    
    results['df_ads'] = df_ads # add ADS DF to results
    
    
    # MAIN DF
    ddf_main = DDF(ddfs['main'].iloc[:,:3])
    ddf_main = (ddf_main
        .setHeader(col_names=['Asset Class', 'Total Value', 'Notes']) # set header
        .addButton(keys=['color', 'font-weight'], col_names=['Notes']) # add button
    )
    
    df_main = ddf_main.v
    df_main = calcTotalUSD(df_main, col_name='Total Value') # calc total if errors
    df_plot = df_main.copy() # set indermediet variable for pie chart later
    df_main['Total Value'] = df_main['Total Value'].apply(formatToDollars) # apply $0,00.00 format
    df_main = df_main.fillna('').replace('#NAME?', '#ERROR!') # replace values if present
    
    # get styled object
    df_main_styled = (ddf_main
        .replaceValues(df_main) # replace values based on DF vals
        .replaceProperties(pd.IndexSlice[df_main.query("`Asset Class` == 'Total (USD)'").index, :], {"color": "#E2B842"})
        .replaceProperties(pd.IndexSlice[findRefRowCol(df_main, r'Monthly Income')[0], :], {"color": "grey", "font-style": "italic"})
        .replaceProperties(pd.IndexSlice[df_main.query("`Total Value` == '#ERROR!'").index, "Total Value"], {"color": "red", "opacity": "0.75"})
        .setStyle(keys=['color', 'font-weight', 'opacity', 'font-style'])
        .hide(axis='index')
    )
    
    results['df_main_styled'] = df_main_styled # add to results
    
    # PIE-CHART
    # add underscores to col names
    df_plot.columns = df_plot.columns.map(lambda x: x.replace(" ", "_"))
    
    # set 'Asset_Class' as new index
    df_plot = (df_plot
        .set_index('Asset_Class') 
        .drop(columns=['Notes'])
        .loc[:'Total (USD)'].iloc[:-1]
        .query("Total_Value != '#ERROR!'")
        .reset_index()
        .dropna()
    )
    df_plot['Total_Value'] = df_plot['Total_Value'].astype(float)
    #a = 'Total_Value'
    #return df_plot[a] / df_plot[a].sum() * 100
    
    # create plot object
    pie_chart_plot = pie_chart(
        df=df_plot,
        x='Asset_Class',
        y='Total_Value',
        background_color=styling_vars['bg-color'],
        legend_place='below',
        fig_height=720,
        label_distance=3.2,
        label_kwargs=dict(text_font_size='9pt', text_align='center', text_font_style='bold'),
        radius=0.62,
        sizing_mode='scale_width'
    )

    # ASSET RISKS
    ddf_risk = ddfs['risk']
    ddf_risk = ddf_risk.setHeader(col_names=['asset', 'risk', 'notes'])
    
    df_risk = ddf_risk.v
    idx1 = getRelContainsIdx(df_risk, 'RISK') # find relative index of pattern
    idx2 = getRelContainsIdx(df_risk, 'CODE') # find relative index of pattern
    df_risk = df_risk.iloc[idx1+1:idx2,] # cut df
    df_risk.iloc[-2].notes += ". " + df_risk.iloc[-1, -1]
    df_risk = df_risk.iloc[:-1].reset_index(drop=True)
    df_risk['risk'] = [1, 1, 2, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4]
    df_risk['risk_word'] = df_risk['risk'].replace({1:'VERY LOW', 2: 'LOW', 3: 'MEDIUM', 4: 'HIGH'})
    df_risk['li_group'] = df_risk['risk'].replace({1: 'success', 2: 'info', 3: 'warning', 4: 'danger'})
    
    return df_risk
    
    results['df_risk'] = df_risk # add to results
    
    return results

investments_script()

Unnamed: 0,asset,risk,notes,risk_word,li_group
0,Silver,1,Unlikely to be confiscated & good inflation he...,VERY LOW,success
1,Farmland,1,Holds value extremely well and can be used for...,VERY LOW,success
2,Gold,2,Risk of Confiscation under a GOLD standard sce...,LOW,info
3,Crypto,3,Risk of further/continued regulation or the cu...,MEDIUM,warning
4,Cash,3,WILL be phased out when CBDC launches (3-5 yea...,MEDIUM,warning
5,Bonds,3,Russia is a great example of paper certificate...,MEDIUM,warning
6,Businesses,3,Inflation = Consumer cut in spending = RECESSI...,MEDIUM,warning
7,Pensions,3,Collapse risk! (Watch my YT video on this here...,MEDIUM,warning
8,Real Estate,4,High risk of declines in late 2022 and through...,HIGH,danger
9,Stock Markets,4,High risk of declines once USA interest rates ...,HIGH,danger


In [357]:
    # plot_js, plot_div = components(pie_chart_plot) # create static plot objects
    # results['plot_js'] = plot_js
    # results['plot_div'] = plot_div
    
    # # ASSET RISKS
    # ddf_risk = ddfs['risk']
    # ddf_risk = ddf_risk.setHeader(col_names=['asset', 'risk', 'notes'])
    
    # df_risk = ddf_risk.v
    # idx1 = getContainsIdx(df_risk, 'RISK') # find relative index of pattern
    # idx2 = getContainsIdx(df_risk, 'CODE') # find relative index of pattern
    # df_risk = df_risk.iloc[idx1+1:idx2,] # cut df
    # df_risk.iloc[-2].notes += ". " + df_risk.iloc[-1, -1]
    # df_risk = df_risk.iloc[:-1].reset_index(drop=True)
    # df_risk['risk'] = [1, 1, 2, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4]
    # df_risk['risk_word'] = df_risk['risk'].replace({1:'VERY LOW', 2: 'LOW', 3: 'MEDIUM', 4: 'HIGH'})
    # df_risk['li_group'] = df_risk['risk'].replace({1: 'success', 2: 'info', 3: 'warning', 4: 'danger'})
    # results['df_risk'] = df_risk # add to results
    
    # # ANNOUNCEMENTS
    # df_a = ddfs['announce'].v
    # df_a = df_a[0].str.split(pat=':', n=1, expand=True)
    # df_a.columns = ['date', 'text']
    # no_date_idx = df_a.index[df_a['date'].str.contains(r"property for £980k", case=False)][0]
    # df_a.loc[no_date_idx, 'text'] = df_a.loc[no_date_idx, 'date']
    # df_a.loc[no_date_idx, 'date'] = re.search(r"[A-Z]{1}[a-z]{2}\s202\d{1}", df_a.loc[no_date_idx, 'text'])[0]
    # results['df_a'] = df_a
    
    # GENERAL ADVICE
    results['df_advice'] = ddfs['advice'].v
    
    # GENERAL NOTES
    df_gen = ddfs['general_notes'].v
    df_suc = ddfs['success'].v

    df_suc.iloc[0, 1] = df_suc.iloc[0, 1] + df_suc.iloc[-1,1]
    df_gen = pd.concat([df_gen.iloc[1:], df_suc.iloc[0,:].to_frame().T]).reset_index(drop=True)
    df_gen.columns = ['field', 'info']
    results['df_gen'] = df_gen
    
    # HISTORICAL INVESTMENTS
    df_hist = ddfs['historical'].v
    results['df_hist'] = df_hist

NameError: name 'ddfs' is not defined

### Example_portfolios

In [213]:
#ddf = readXlsx('example_portf')
metadata = getMetaDataDict(route_name='example_portf')

@ioCacheAndLog(url=metadata['url'], route=metadata['route_name'], testing=True)
def example_portf_script(ddf:DDF) -> dict:
    
    results = {}
    
    ddf = ddf.setHeader(col_names=['asset', 'percent']) # set header
    
    # chop raw cs df into dict of 'name' : {'title':, 'df':, 'extra':}
    mask_df = (ddf.v
        .astype('str')
        .apply(lambda x: x.str.startswith("IDEAL PORTFOLIO", na=np.nan))
    )
     
    # reference indices for separate DFs 
    ref_idxs = mask_df[mask_df.asset == True].index
    
    df_names = ['2023', 'crash_risk', 'high_inflation', 'normal']
    
    df_dict = {}
    for i, (name, i1) in enumerate(zip(df_names, ref_idxs)):
        i2 = None if i == len(ref_idxs)-1 else ref_idxs[i+1] - 1 
        df_ = ddf.v.loc[i1:i2,:]
        
        info_dict = {}
        info_dict['title_1'] = re.match(r"^([^\(]+)\b", df_.asset.iloc[0])[0]
        info_dict['title_2'] = re.search(r"(?![^\(]+).+", df_.asset.iloc[0])[0]
        info_dict['df'] = df_.dropna()
        info_dict['extra'] = df_.loc[info_dict['df'].index[-1]+1:,:].dropna(how='all').asset
        df_dict[name] = info_dict
        
    df_dict['2023']['extra'].iloc[0] += ' ' + df_dict['2023']['extra'].iloc[1]
    
    # prepare the DFs
    dfs = df_dict.copy()
    
    #return dfs['normal']['df']
    
    for dct in dfs.values():
        dct['df_plot'] = dct['df'].iloc[:-1,] # strip total
        dct['df_plot']['percent_n'] = (dct['df_plot']['percent'] * 100).astype(int) 
        dct['df_plot']['asset_hover'] = dct['df_plot']['asset'].apply(lambda x: re.match(r"^([^\(:]+)", x)[0] if len(x)>35 else x)
        dct['df'].columns = ['Asset Class', 'Percentage']
    
    # PLOT
    hover_tt = f"""
                    <div>
                        <p style="margin:0;font-weight:bold;color:grey;">@asset_hover</p>
                        <p style="padding:0;margin:0;font-weight:bold;">@percentage_hover{{0,0}}%</p>
                    </div>
                """

    # for dct in dfs.values():
    #     p = pie_chart(
    #         df=dct['df_plot'],
    #         x='asset_hover',
    #         y='percent_n',
    #         x_hover='asset_hover',
    #         percentage_decimal=0,
    #         label_distance=3.15,
    #         hover_tooltip=hover_tt,
    #         legend_place='below',
    #         fig_height=720,
    #         radius=0.7,
    #         background_color='#2C2B2B',
    #         label_kwargs=dict(text_font_size='12pt', text_align='center', text_font_style='bold')
    #     )
    #     dct['plot_js'], dct['plot_div'] = components(p)
        
    return dfs
#example_portf_script(ddf)

### Forecasts

In [229]:
ddf = readXlsx('forecasts')
metadata = getMetaDataDict(route_name='forecasts')

#@ioCacheAndLog(url=metadata['url'], route=metadata['route_name'], testing=True)
def forecasts_script(ddf:DDF) -> dict:
    
    results = {}
    
    # read in sub DDFs
    ddfs = ddf.getDdfDict({
        'forecasts' : ("contains", "Forecasts", "down"),
        'risks' : ("equals", "Risks", "down")
        }
    )
    
    # FORECASTS
    df_fore_styler = (ddfs['forecasts']
        .setHeader()
        .setStyle(keys=['color', 'font-weight'])
        .hide(axis='index')
    )
    results['df_fore_styler'] = df_fore_styler
    
    # RISKS
    # pallette = inferno
    
    ddf_risks = (ddfs['risks']
        .setHeader()
    )
    df_risks = ddf_risks.v
    df_risks['numeric_risk'] = (df_risks # convert percentage
        .filter(regex=(r"[Rr]isk\s[Ll]evel.*"))
        .squeeze()
        .astype('float64')
        * 100
    )
    
    df_risks = df_risks.sort_values('numeric_risk', ascending=False, ignore_index=True)
    #df_risks['color'] = df_risks.numeric_risk.apply(riskPallette, scale=get_risk_pallete(pallette))
    df_risks['css_ref'] = (df_risks
        .filter(regex=(r"[Rr]isks.*"))
        .squeeze()
        .str.extract(r"([A-Za-z0-9\s]+)")
        .squeeze()
        .str.replace('\s', '-', regex=True)
    )

    results['df_risks'] = df_risks
    
    return results
    
#forecasts_script(ddf)

### Stocks

In [347]:
#ddf = readXlsx('stocks')
#metadata = getMetaDataDict(route_name='stocks')

#@ioCacheAndLog(url=metadata['url'], route=metadata['route_name'], testing=True)
def stocks_script(ddf:DDF) -> dict:
    
    results = {}
    
    # read in sub DDFs
    ddfs = ddf.getDdfDict({
        'stocks' : ("equals", "Company", "down"),
        'analysis' : ("contains", "Analysis ratio", "down"),
        'sectors1' : ("contains", "S&P500 Index", "down"),
        'sectors2' : ("contains", "Don't just buy crap! ", "up", 0)
        }
    )
    
    ### STOCKS TABLE ###
    ddf_stocks = ddfs['stocks'].setHeader()
    df_stocks = ddf_stocks.v
    
    # check if df is empty and if so is there extra info
    all_nan = df_stocks.iloc[:,1:].isna().all(axis=1)
    all_nan_idx = all_nan.loc[all_nan].index

    df_stocks_info = df_stocks.loc[all_nan].dropna(axis='columns') # info
    results['df_stocks_info'] = df_stocks_info
    
    df_stocks = df_stocks.loc[~df_stocks.index.isin(all_nan_idx),] # df stocks
    results['df_stocks'] = df_stocks
    
    ### ANALYSIS RATIOS TABLE & TITLE ###
    ddf_ana = ddfs['analysis']
    df_ana = ddf_ana.v
    
    # title of the analysis section
    stocks_ana_title = df_ana.iloc[0,0]
    results['stocks_ana_title'] = stocks_ana_title
    

    # find header index
    header_idx = df_ana.apply(lambda x: x.str.contains(r"Ratio:") == True).idxmax()[0] 
    header_cols = (df_ana
        .fillna('')
        .loc[header_idx]
        .apply(lambda x: x.strftime("%b %d") if isinstance(x, datetime.datetime) else x)
    )
    ddf_ana = DDF(ddf_ana.loc[header_idx+1:]).setHeader(col_names=header_cols)
    df_ana = ddf_ana.v

    df_ana.fillna('', inplace=True)
    joined_nan_cols = df_ana.loc[:, ""].apply(lambda x: "".join(x.astype(str)), axis=1)
    
    # capture first nan column pos index
    nan_col_idx = [i for i,col in enumerate(df_ana.columns) if col == ""][0]
    
    # remove original nan columns
    ddf_ana = DDF(ddf_ana.drop([''], axis='columns'))
    ddf_ana.insert(nan_col_idx, "Notes", joined_nan_cols)
    ddf_ana['Notes'] = ddf_ana.Notes.apply(lambda x: {'value':x}) # wrap values into dict
    df_ana_styled = (ddf_ana
        .addButton(keys=['color', 'font-weight'], col_names=['Notes']) # add Details button
        .setStyle(keys=['color', 'font-weight'])
        .hide(axis='index')
    )
    
    results['df_ana_styled'] = df_ana_styled
    
    
    ### SUGGESTED SECTORS
    stock_sectors_title = "Suggested sectors for long term value"
    results['stock_sectors_title'] = stock_sectors_title
    
    ddf_sec1 = ddfs['sectors1'].setIndex(keys=0)
    ddf_sec2 = ddfs['sectors2'].setIndex(keys=0)
    
    # prepare sectors 1 table
    df_sec1 = ddf_sec1.v.apply(lambda x: x.str.strip()) # strip leading/trailing whitespaces
    tech_label_1 = df_sec1.index[df_sec1.index.str.contains(r"tech", case=False, regex=True)][0]
    df_sec1 = (df_sec1
        .apply(lambda x: x.str.strip())
        .apply(lambda x: x+'.' if x[-1] not in ['.', '!', '?', '%', '>'] else x, axis="rows")
        .fillna("")
        .apply(lambda x: " ".join(x.astype(str)).strip(), axis=1)
    )

    
    # prepare sectors 2 table
    df_sec2 = ddf_sec2.v
    
    # add empty index row entry to previous
    if df_sec2.index.isna()[-1]: 
        df_sec2.iloc[-2,-1] += ' ' + df_sec2.iloc[-1,-1]

    df_sec2 = df_sec2.iloc[:-1,] # remove last row
    tech_label_2 = df_sec2.index[df_sec2.index.str.contains(r"tech", case=False, regex=True)][0]
    df_sec2 = df_sec2.rename(index={tech_label_2: tech_label_1})
    df_sec2.columns = ['Percentage', 'Notes']
    
    # join and modify tables
    df_sectors = pd.concat([df_sec1, df_sec2], axis='columns')
    
    df_sectors = (df_sectors
        .drop('Percentage', axis='columns')
        .apply(lambda x: x.str.strip())
        .apply(lambda x: x+'.' if x[-1] not in ['.', '!', '?', '%', '>'] else x, axis="rows")
        .fillna("")
        .apply(lambda x: " ".join(x.astype(str)).strip(), axis=1)
    )

    df_sectors = (pd.concat([df_sectors, df_sec2['Percentage']], axis='columns')
        .rename(columns={0: 'Notes'})
        .reset_index()
        .rename(columns={0: 'Sector'})
        .fillna("")
    )

    df_sectors = addButton(df_sectors, col_names=['Notes'])
    results['df_sectors'] = df_sectors # add to results
     
    # sectors plot
    df_sectors_plot = df_sectors.copy()
    df_sectors_plot['Percentage'] *= 100
    df_sectors_plot['Percentage'] = pd.to_numeric(df_sectors['Percentage'])

    # df_sectors_plot['Percentage'] = pd.to_numeric(df_sectors['Percentage'] \
    #     .replace(r"%", "", regex=True), errors='coerce')
    
    stocks_h_tooltip = f"""
                    <div>
                        <p style="margin:0;font-weight:bold;color:grey;">@Sector</p>
                        <p style="padding:0;margin:0;font-weight:bold;text-align:center;">@percentage_hover{{0}}%</p>
                    </div>
                """

    # stocks_sectors_plot = donut_chart(
    #     df=df_sectors_plot.iloc[1:,],
    #     x='Sector',
    #     y='Percentage',
    #     sizing_mode='scale_both',
    #     background_color='#2C2B2B',
    #     percentage_decimal=0,
    #     fig_height=90,
    #     label_distance=3.1,
    #     label_kwargs=dict(text_font_size='12pt', text_align='center', text_font_style='bold'),
    #     hover_tooltip=stocks_h_tooltip,
    #     legend_place='center',
    #     fig_kwargs={'width':100}
    # )

    # stocks_plot_js, stocks_plot_div = components(stocks_sectors_plot)
    # results['stocks_plot_js'] = stocks_plot_js
    # results['stocks_plot_div'] = stocks_plot_div
    
    return results
    
#stocks_script(ddf)

## Stocks

In [28]:
from dashboard.route.stocks import stocksScript
from dashboard.logic.constants import nav_names
from dashboard.logic.io import read_gsheet, getDFs, findRefRowCol, comment_button, styleDf

df_dict = stocksScript()
df_dict.keys()

dict_keys(['df_stocks_info', 'df_stocks', 'stocks_ana_title', 'df_stocks_ana', 'stock_sectors_title', 'df_sectors', 'stocks_plot_js', 'stocks_plot_div'])

In [13]:
stocks_dict = nav_names['Stocks']

df_raw = read_gsheet(stocks_dict['url'], header=None)

# extract data into sub DF-s
references_dict = {
    'stocks' : ("equals", "Company", "down"),
    'analysis' : ("contains", "Analysis ratio", "down"),
    'sectors1' : ("contains", "S&P500 Index", "down"),
    'sectors2' : ("contains", "Don't just buy crap! ", "up", 0)
}
df_dict_stocks = getDFs(df_raw, references_dict)

In [29]:
def dummy():

    ### ANALYSIS RATIOS TABLE & TITLE ###
    df_ana = df_dict_stocks['analysis'].copy()

    # title of the analysis section
    stocks_ana_title = df_ana.iloc[0,0]

    # set header
    header_idx = findRefRowCol(df_ana.astype('str'), r"Ratio:", 'contains')[0]
    
    df_ana.columns = df_ana.loc[header_idx]
    df_ana = df_ana.loc[header_idx+1:]
    df_ana.columns = df_ana.columns.fillna('') # fill NaN headers ""
        
    # Join column values using whitespace for columns with NaN header
    df_ana = df_ana.fillna('')
    joined_nan_cols = df_ana.loc[:, ""].apply(lambda x: "".join(x.astype(str)), axis=1)

    # capture first nan column pos index
    nan_col_idx = [i for i,col in enumerate(df_ana.columns) if col == ""][0]

    # remove original nan columns
    df_ana = df_ana.drop([''], axis='columns')
    df_ana.insert(nan_col_idx, "Comment", joined_nan_cols)

    # add comment button 
    df_ana.Comment[df_ana.Comment != ''] = df_ana.Comment[df_ana.Comment != ''].apply(comment_button)

    return df_ana
    
df_ana = dummy()

styleDf(df_ana, stocks_dict['route_name'])

KeyError: "['Comment', 'Oct 22', 'Jan 23', 'Mar 23'] not in index"

# CACHING

### Download Workbook as .xlsx with timestamp

In [5]:
from dashboard.logic.constants import nav_names
from dashboard.logic.io import *

#dct = nav_names['2023 Forecasts & Risks']
dct = nav_names['Investment Allocation Examples']


path = "/home/tonu/Documents/apps/dashboard_invest/dashboard/cache/example_portf"
df = read_gsheet(, header=None)
mask_df = df.apply(lambda x: x.str.startswith("IDEAL PORTFOLIO"))


In [7]:
{key for key in dct if key != 'routes'}

{'new name', 'page', 'route_name', 'symbol_id', 'title', 'url'}

In [115]:
import datetime
import requests
import os
import traceback
import re

spreadsheet_url = "https://docs.google.com/spreadsheets/d/15-kxhuk4h1BdFuiSIueamEifJpjsG6Loi621KQ8hGuY/edit#gid"


def getTimestamp(format: str="%Y-%m-%d_%H:%M:%S") -> str:
    """Generate a timestamp string in the format YYYY-MM-DD_HH:MM:SS""" 
    return datetime.datetime.now().strftime(format)

def getOldestFilename(files: list[str], ts_format:str="%Y-%m-%d_%H:%M:%S") -> str:
    """Returns the filename with oldest timestamp"""
    ts_pattern = r"\d[\d\-_:]+\d"
    return min(files, key=lambda x: datetime.datetime.strptime(re.search(ts_pattern, x)[0][:19], ts_format))

def getNewestFilename(files: list[str], ts_format:str="%Y-%m-%d_%H:%M:%S") -> str:
    """Returns the filename with newest timestamp"""
    ts_pattern = r"\d[\d\-_:]+\d"
    return max(files, key=lambda x: datetime.datetime.strptime(re.search(ts_pattern, x)[0][:19], ts_format))

def downloadWorkbook(spreadsheet_url: str, 
                     file_path: str,
                     file_name: str='workbook') -> None:
    """Download Google Spreadsheet as an .xlsx file.
    """
    
    # Define the file name and extension
    file_ext = ".xlsx"

    # Generate a timestamp string in the format YYYY-MM-DD_HH-MM-SS
    timestamp = getTimestamp()

    # Concatenate the timestamp string with the file name and extension
    timestamped_file_name = f"{file_path}/{timestamp}_{file_name}{file_ext}"

    # SAVE GOOGLE SPREADSHEET AS .XLSX FILE
    export_url = spreadsheet_url.replace("edit#gid", "export?format=xlsx")

    response = requests.get(export_url)

    with open(timestamped_file_name, "wb") as output_file:
        output_file.write(response.content)
        

def downloadSheet(spreadsheet_url: str, 
                  file_path: str,
                  file_name: str) -> None:
    """Download Google Spreadsheet as an .xlsx file.
    """
    
    # Define the file name and extension
    file_ext = ".xlsx"

    # Generate a timestamp string in the format YYYY-MM-DD_HH-MM-SS
    timestamp = getTimestamp()

    # Concatenate the timestamp string with the file name and extension
    timestamped_file_name = f"{file_path}/{timestamp}_{file_name}{file_ext}"

    # SAVE GOOGLE SPREADSHEET AS .XLSX FILE
    download_url = spreadsheet_url.replace('/edit#','/export?format=xlsx&')

    response = requests.get(download_url)

    with open(timestamped_file_name, "wb") as output_file:
        output_file.write(response.content)


def logError(route:str, exception:Exception) -> None:
    """Save occurred error traceback to file with timestamp."""
    
    log_path = f"dashboard/logs/routes/{route}"
    if not os.path.exists(log_path):
        os.mkdir(log_path)
    
    files = os.listdir(log_path)
    if len(files) >= 10:
        oldest_file = getOldestFilename(files=files)
        os.remove(os.path.join(log_path, oldest_file))
        
    error_name = exception.__class__.__name__
    
    with open(f'{log_path}/{getTimestamp()}_{error_name}', 'a') as f: 
        traceback.print_exc(file=f)

In [120]:
# native imports
from functools import wraps

# local imports
from dashboard.logic.io import read_gsheet
from dashboard.logic.constants import GSHEETS_URL

# 3rd party imports
import pandas as pd

xlsx_test_path = "dashboard/cache/main_backup/workbook_2023-03-10_17-11-59.xlsx"



def ioCacheAndLog(
    route: str,
    gsheet_dict:dict=None, 
    excel_dict:dict=None):
    
    def my_decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # try reading data from google sheets
            try:
                io = gsheet_dict['io']
                del gsheet_dict['io']
                df = read_gsheet(io, **gsheet_dict)
                result = func(df)
                
                # download spreadsheet to cache
                cache_path = os.path.join('dashboard', 'cache', route)
                if not os.path.exists(cache_path):
                    os.mkdir(cache_path)
                    
                # don't save more than 3 files to cache
                files = os.listdir(cache_path)
                if len(files) >= 3:
                    oldest_file = getOldestFilename(files=files)
                    os.remove(os.path.join(cache_path, oldest_file))
                
                # download google sheet as .xlsx file
                downloadSheet(spreadsheet_url=io, 
                              file_path=cache_path,
                              file_name=route)
                return result
            
            # read data from cache
            except Exception as e:
                # capture and save occurred error log
                logError(route=route, exception=e)
                
                # read data from the newest cached file
                io_cache_path = os.path.join('dashboard', 'cache', route)
                newest_file_name = getNewestFilename(os.listdir(io_cache_path))
                df  = read_excel(os.path.join(io_cache_path, newest_file_name), **excel_dict)
                
                result = func(df)
                return result
        return wrapper
    
    return my_decorator

dict1 = dict(io=GSHEETS_URL, header=None)
dict2 = dict(sheet_name='Investments', header=None)

@ioCacheAndLog(route='investments', gsheet_dict=dict1, excel_dict=dict2)
def test_script(df):
    return df.iloc[0,0]
    

'Cryptocurrencies'

# ADS

In [9]:
df = read_gsheet(nav_names['Investments']['url'], header=None)
# Extract sub DF to dictionary
references_dict = {
    'main' : ("contains", "Monthly Income", 'up'),
    'ads' : ("contains", 'My Finance Course', 'down'),
    'announce' : ("contains", "Jul 2022: I'm", 'up'),
    'advice' : ("contains", '3x Excellent', 'up'),
    'warning_msg' : ("contains", 'NOTE: Occasionally', 'one'),
    'risk' : ("equals", "RISK", 'down', 0),
    'historical' : ("equals", "My Historical Investments", "down"),
    'cash_pos' : ("contains", "CASH POSITION", 'one'),
    'general_notes' : ("equals", "GENERAL NOTES", "down"),
    'success' : ("equals", "Investment Success:", "down")
}

df_dict = getDFs(df, references_dict=references_dict)

# ADS
df_ads = df_dict['ads'].copy()
df_ads.columns = ['text', 'hyperlink']
df_ads['icon'] = df_ads.text.str.extract(r"\s*(\S)")
df_ads['text'] = df_ads.text.str.extract(r"(\b.+[^\s])")

# headers = ['My Finance Course', 'My UK Property Courses', 'Mentoring', 'Metals Globally', 
#         'Metals USA', 'Metals UK', 'Crypto Security', 'Stock Platform', 'Bank Account']

headers = [" ".join(string.split()[:4]) for string in df_ads['text']]
headers
# icons_html_dict = {'My Finance Course' : 'bi bi-graph-up-arrow', 
#                     'My UK Property Courses' : 'bi bi-house', 
#                     'Mentoring' : 'fa-regular fa-handshake', 
#                     'Metals Globally': 'bi bi-globe-asia-australia', 
#                     'Metals USA': 'bi bi-currency-dollar', 
#                     'Metals UK': 'bi bi-currency-pound', 
#                     'Crypto Security': 'bi bi-currency-bitcoin', 
#                     'Stock Platform': 'fa-solid fa-chart-column', 
#                     'Bank Account': 'bi bi-bank'}

# df_ads['header'] = headers
# df_ads['new_icon_html'] = df_ads['header'].map(icons_html_dict)

['My Finance Course',
 'My UK Property Courses',
 'Private 1-on-1 Sessions With',
 'Where I Buy Allocated',
 'Where I Buy Physical',
 'How I Protect My',
 'The Stock Platform I',
 'A Global bank account']

# Forecasts

In [133]:
def set_bg_color(val, cmap: dict) -> str:
    """Map colors to DF values based on mapping dictionary.

    Args:
        val ( any type): Any value type.
        cmap (dict): Dict of the form {val:'color'}
    """
    return f'background-color: {cmap[val]}'

cmap = {i:c for i,c in zip(df_risks['Risk Level in 2023'], df_risks['color'])}



# print((df_risks.style
#     .applymap(set_bg_color, cmap=cmap, subset=['Risk Level in 2023'])
#     .set_properties(subset=pd.IndexSlice[4,'Risk Level in 2023'], **{"border-top-right-radius": "0.6em"})
#     .hide(axis='index')
# ).to_html())

In [122]:
from dashboard.logic.constants import nav_names
from dashboard.logic.io import read_gsheet, getDFs
from bokeh.palettes import inferno

def riskPallette(series: pd.Series, scale: dict) -> pd.Series:
    """Apply color based on risk level (# between 0-100).

    Args:
        scale (dict): Dictionary of the form {0:color, ... , 9:color}

    Returns:
        pd.Series: color
    """
    if series >= 90: 
        return scale["9"]
    if series < 10: 
        return scale["0"]
    else: 
        return scale[str(series)[0]]

def get_risk_pallete(pallette: dict) -> dict['int':'color']:
    """Generate risk pallete in scale 0 to 100 in steps of 10.
    Args:
        pallette (dict): Dictionary {n: ['colors'....]}
    Returns:
        Dict: Dictionary {"0": 'color'}
    """
    return {str(i):color for i,color in enumerate(pallette(10)[::-1])}

FORECASTS_URL = nav_names['2023 Forecasts & Risks']['url']

# read spreadsheet in as DF
df_fore_raw = read_gsheet(url=FORECASTS_URL, header=None)

# extract data into sub DF-s
references_dict = {
    'forecasts' : ("contains", "Forecasts", "down"),
    'risks' : ("equals", "Risks", "down")
}
df_dict_fore = getDFs(df_fore_raw, references_dict)

# FORECASTS
df_fore = df_dict_fore['forecasts'].copy()
df_fore.columns = df_fore.iloc[0]
df_fore = df_fore.iloc[1:].reset_index(drop=True)

# RISKS
risks_scale = {str(i):color for i,color in enumerate(inferno(10)[::-1])}

df_risks = df_dict_fore['risks'].copy()
df_risks.columns = df_risks.iloc[0]
df_risks = df_risks.iloc[1:].reset_index(drop=True)
df_risks['numeric_risk'] = (df_risks
    .filter(regex=(r"[Rr]isk\s[Ll]evel.*"))
    .squeeze()
    .str.extract(r"(\d+)")
    .astype('float64')
)
df_risks = df_risks.sort_values('numeric_risk', ascending=False, ignore_index=True)
df_risks['color'] = df_risks.numeric_risk.apply(riskPallette, scale=get_risk_pallete(inferno))


df_risks


16,Risks,Risk Level in 2023,numeric_risk,color
0,Recession,90%,90.0,#000003
1,Stock Market Crash 40%+,65%,65.0,#781C6D
2,Cyber Pandemic,50%,50.0,#A42C60
3,Housing Crash 20%+,35%,35.0,#ED6825
4,Bank Bail Ins,30%,30.0,#ED6825


# STOCKS

In [4]:
def findRefRowCol(df: pd.DataFrame, pattern: str, method: str='contains') -> tuple[int, int]:
    """Find reference row and column numeric index values.

    Args:
        pattern (str): Character sequence or regular expression.
        method (str, optional): Finding reference via pattern within the text ('contains')
            or equalling the value exactly ('equals'). Defaults to 'contains'.

    Raises:
        ValueError: Raises error if other than ['contains', 'equals'] is specified for the method.

    Returns:
        tuple[int, int]: Tuple of ['row_i', 'col_i']
    """
    
    
    # validate that method is correctly entered
    if method not in ['contains', 'equals']:
        raise ValueError(f"{method} can take only values: 'contains' or 'equals'!")
    
    idx_series = None
    if method == 'contains':
        idx_series = df.apply(lambda x: x.str.contains(pattern)).idxmax()
    if method == 'equals':
        idx_series = (df == pattern).idxmax()
    
    return idx_series.max(), idx_series.idxmax() 
    
def sliceDF(df: pd.DataFrame, 
            row_idx1: int=None, 
            row_idx2:int=None, 
            col_idx1: int=None, 
            col_idx2: int=None,
            col_0: bool=True, 
            direction='down') -> pd.DataFrame:
    """Slice DF till the first occuring empty row in given direction.

    Args:
        col_0 (bool, optional): If col_idx1 is actually first column. Defaults to True.
        direction (str, optional): Slice upwards or downwards from given row index. Defaults to 'down'.

    Raises:
        ValueError: If no row indices are specified.

    Returns:
        pd.DataFrame
    """
    
    # assert that at least one of the row indices is specified
    if row_idx1 is None and row_idx2 is None:
        raise ValueError(f"Both row indices can't equal {None}!") 
    
    # if column index is not the first column then None
    col_idx1 = col_idx1 if col_0 is True else None
    
    # find missing row index
    if direction == 'down':
        nan_mask = (df.iloc[row_idx1:,col_idx1:col_idx2] == 'nan').all(axis='columns')
        row_idx2 = None if nan_mask.sum() == 0 else nan_mask.idxmax()
        
    
    if direction == 'up':
        nan_mask = (df.iloc[:row_idx2,col_idx1:col_idx2] == 'nan').all(axis='columns')
        row_idx1 = None if nan_mask.sum() == 0 else nan_mask[::-1].idxmax() + 1
        row_idx2 += 1
         
    return df.iloc[row_idx1:row_idx2,col_idx1:col_idx2]

def getDFs(df: pd.DataFrame, references_dict: dict[str, tuple[str,str,str]]) -> dict[str, pd.DataFrame]:
    """Get subset DFs from a bigger DF based on reference strings.

    Args:
        df (pd.DataFrame): Raw initial DF.
        references_dict (dict): Dictionary of df_name: tuple('method', 'string' , 'direction', int('col_idx1)).
            method has 2 options ['contains' and 'equals'], 'direction' has 3 options 
            ['one', 'down', 'up'] where one means only one line needs to be parsed, 'up' and 'down'
            respectively correspond to the parsing direction from the reference point.
            
            Returns:
        dict: Dictionary of {'df_name': df}
    """
    
    
    # set all cols str type, NaN -> 'nan'
    df = df.reset_index(drop=True).astype(str)
    
    # find reference indices
    dfs = {}
    for k,v in references_dict.items():
        
        # find ref position
        row_i, col_i = findRefRowCol(df, pattern=v[1], method=v[0])
        
        # shift ref position if specified
        col_i = v[3] if len(v) == 4 else col_i
        
        if v[2] == 'one':
            dfs[k] = df.iloc[row_i,col_i:]
        if v[2] == 'down':
            dfs[k] = sliceDF(df, row_idx1=row_i, col_idx1=col_i, direction=v[2])
        if v[2] == 'up':
            dfs[k] = sliceDF(df, row_idx2=row_i, col_idx1=col_i, direction=v[2])

    # strip all NaN cols and rows
    dfs_clean = {}
    for k,df_ in dfs.items():
        
        if isinstance(df_, pd.Series):
            dfs_clean[k] = (df_
            .replace({'nan': np.nan})
            .dropna(how='any', axis='rows')
            )
        else:
            dfs_clean[k] = (df_
                .replace({'nan': np.nan})
                .dropna(how='all', axis='columns')
                .dropna(how='all', axis='rows')
            )
    
    return dfs_clean

In [5]:
from dashboard.logic.constants import nav_names
from dashboard.logic.io import read_gsheet

STOCKS_URL = nav_names['Stocks']['url']

# read spreadsheet in as DF
df_stocks_raw = read_gsheet(url=STOCKS_URL, header=None)

# extract data into sub DF-s
references_dict = {
    'stocks' : ("equals", "Company", "down"),
    'analysis' : ("contains", "Analysis ratio", "down"),
    'sectors1' : ("contains", "S&P500 Index", "down"),
    'sectors2' : ("contains", "Don't just buy crap! ", "up", 0)
}
df_dict_stocks = getDFs(df_stocks_raw, references_dict)

df_dict_stocks.keys()

dict_keys(['stocks', 'analysis', 'sectors1', 'sectors2'])

In [6]:
def getNonSelectedDF(
    df_raw: pd.DataFrame, 
    df_dict: dict['df_name': pd.DataFrame], 
    start_idx: int=0, 
    stop_idx: int=None) -> pd.DataFrame:
    """Get data into DF that wasn't captured based on negative df_dict. 

    Args:
        df_raw (pd.DataFrame): Original DF where data wasn't cpatured.
        df_dict (_type_): Dict of the form {subdf_name: 'method', 'string' , 'direction', int('col_idx1))}.
        start_idx (int, optional): Starting row index (not positional).
        stop_idx (int, optional): Ending row index (not positional).

    Returns:
        pd.DataFrame: DF with rest of the info captured and NaN rows/cols stripped.
    """
    
    
    # find indicies already capture by "getDFs" function
    indices = np.array([])
    for df_ in df_dict.values():
        indices = np.concatenate((indices, df_.index.values))
    indices = [int(i) for i in indices]
    
    # subset DF
    df_sub = (df_raw
        .loc[~df_stocks_raw.index.isin(indices),:]
        .loc[start_idx:stop_idx,]
        .dropna(axis='columns', how='all')
        .dropna(axis='rows', how='all')
    )
    
    return df_sub
    
    
getNonSelectedDF(df_stocks_raw, df_dict_stocks, stop_idx=18)

Unnamed: 0,0
0,The absolute easiest strategy for stock market...
2,Start looking to move away from (Growth & Tech...
15,"Usually for a super safe play, I would suggest..."
16,I would suggest that since market growth is no...
17,"...reverse course, they will go back to QE & l..."


In [7]:
### STOCKS TABLE ###
df_stocks = df_dict_stocks['stocks'].copy().reset_index(drop=True)

# set column headers
df_stocks.columns = df_stocks.iloc[0]
df_stocks = df_stocks.iloc[1:].reset_index(drop=True)

# check if df is empty and if so is there extra info
all_nan = df_stocks.iloc[:,1:].isna().all(axis=1)
all_nan_idx = all_nan.loc[all_nan].index


df_stocks_info = df_stocks.loc[all_nan].dropna(axis='columns') # info
df_stocks = df_stocks.loc[~df_stocks.index.isin(all_nan_idx),] # df stocks

In [8]:
from dashboard.logic.io import comment_button

df_ana = df_dict_stocks['analysis'].copy()

# title of the analysis section
ana_title = df_ana.iloc[0,0]

# set header
header_idx = findRefRowCol(df_ana.astype('str'), r"Ratio:", 'contains')[0]
df_ana.columns = df_ana.loc[header_idx]
df_ana = df_ana.loc[header_idx+1:].reset_index(drop=True)
df_ana.columns = df_ana.columns.fillna('') # fill NaN headers ""

# Join column values using whitespace for columns with NaN header
df_ana = df_ana.fillna('')
joined_nan_cols = df_ana.loc[:, ""].apply(lambda x: "".join(x.astype(str)), axis=1)

# capture first nan column pos index
nan_col_idx = [i for i,col in enumerate(df_ana.columns) if col == ""][0]

# remove original nan columns
df_ana = df_ana.drop([''], axis='columns')
df_ana.insert(nan_col_idx, "Comment", joined_nan_cols)

# add comment button 
df_ana.Comment[df_ana.Comment != ''] = df_ana.Comment[df_ana.Comment != ''].apply(comment_button)
df_ana

8,Ratio:,Average,Current,Value?,Comment,Oct 22,Sep 22,Jan 23,Feb 23
0,S&P 500 (P/E),15,19.97,Overvalued,"<button type=""button"" class=""btn btn-secondary...",20.7,21.00,19.97,
1,Shiller P/E,16,28.09,Overvalued,"<button type=""button"" class=""btn btn-secondary...",29.4,31.00,28.09,
2,S&P Price to Book,2.9,3.83,Overvalued,"<button type=""button"" class=""btn btn-secondary...",3.98,4.10,3.83,
3,S&P Price to Sales,1.6,2.31,Overvalued,"<button type=""button"" class=""btn btn-secondary...",2.45,2.60,2.31,
4,S&P Dividend Yield,5.0%,5.0%,Fair,,4.9%,4.8%,5.0%,


In [9]:
### Suggested Sectors
df_sec1 = df_dict_stocks['sectors1'].copy().set_index(0)
df_sec2 = df_dict_stocks['sectors2'].copy().set_index(0)

# prepare sectors 1 table
df_sec1 = df_sec1 = df_sec1.apply(lambda x: x.str.strip())
tech_label_1 = df_sec1.index[df_sec1.index.str.contains(r"tech", case=False, regex=True)][0]

df_sec1 = (df_sec1
    .apply(lambda x: x.str.strip())
    .apply(lambda x: x+'.' if x[-1] not in ['.', '!', '?', '%', '>'] else x, axis="rows")
    .fillna("")
    .apply(lambda x: " ".join(x.astype(str)).strip(), axis=1)
)

# add empty index row entry to previous
if df_sec2.index.isna()[-1]:
    df_sec2.iloc[-2,-1] += ' ' + df_sec2.iloc[-1,-1]

# prepare sectors 2 table
df_sec2 = df_sec2.iloc[:-1,] # remove last row
tech_label_2 = df_sec2.index[df_sec2.index.str.contains(r"tech", case=False, regex=True)][0]
df_sec2 = df_sec2.rename(index={tech_label_2: tech_label_1})
df_sec2.columns = ['Proportion', 'Comment']

# join and modify tables
df_sectors = pd.concat([df_sec1, df_sec2], axis='columns')

df_sectors = (df_sectors
    .drop('Proportion', axis='columns')
    .apply(lambda x: x.str.strip())
    .apply(lambda x: x+'.' if x[-1] not in ['.', '!', '?', '%', '>'] else x, axis="rows")
    .fillna("")
    .apply(lambda x: " ".join(x.astype(str)).strip(), axis=1)
)

df_sectors = (pd.concat([df_sectors, df_sec2['Proportion']], axis='columns')
    .rename(columns={0: 'Comment'})
    .reset_index()
    .rename(columns={0: 'Sector'})
    .fillna("")
)


df_sectors.Comment[df_sectors.Comment != ''] = \
    df_sectors.Comment[df_sectors.Comment != ''].apply(comment_button)
    
df_sectors



Unnamed: 0,Sector,Comment,Proportion
0,#1: S&P500 Index,"<button type=""button"" class=""btn btn-secondary...",
1,ENERGY,"<button type=""button"" class=""btn btn-secondary...",30%
2,MINING,"<button type=""button"" class=""btn btn-secondary...",30%
3,AGRICULTURE,"<button type=""button"" class=""btn btn-secondary...",20%
4,CHEAP TECH,"<button type=""button"" class=""btn btn-secondary...",20%


# Plotting

In [10]:
import pandas as pd
import numpy as np

# bokeh
from bokeh.io import show, output_notebook
from bokeh.palettes import Category10
from bokeh.plotting import figure
from bokeh.models import ColumnDataSource, Legend, LabelSet, LegendItem, HoverTool, AnnularWedge
from bokeh.layouts import column

In [11]:
output_notebook()

In [12]:
df_sectors_plot = df_sectors.copy()
df_sectors_plot['Proportion'] = pd.to_numeric(df_sectors['Proportion'].replace(r"%", "", regex=True),
    errors='coerce')

df_sectors_plot

Unnamed: 0,Sector,Comment,Proportion
0,#1: S&P500 Index,"<button type=""button"" class=""btn btn-secondary...",
1,ENERGY,"<button type=""button"" class=""btn btn-secondary...",30.0
2,MINING,"<button type=""button"" class=""btn btn-secondary...",30.0
3,AGRICULTURE,"<button type=""button"" class=""btn btn-secondary...",20.0
4,CHEAP TECH,"<button type=""button"" class=""btn btn-secondary...",20.0


In [61]:
from dashboard.logic.plots import components

def donut_chart(
    df: pd.DataFrame, 
    x: str, 
    y: str,
    x_hover: str=None,
    y_hover: str=None, 
    inner_radius: float=0.4,
    outer_radius: float=0.8,
    label_distance: float=3,
    x_range: tuple[float, float]=(-1, 1.0),
    percentage_decimal: int=1,
    fig_height: int=350,
    background_color: str='#212529',
    pallette: dict=Category10,
    sizing_mode='scale_width',
    hover_tooltip: str='default',
    legend_place: str='center',
    fig_kwargs: dict={},
    wedge_kwargs: dict=dict(line_width=3, alpha=0.7),
    legend_kwargs: dict=dict(location='center', click_policy="hide",
                             label_text_color='white', border_line_width=0,
                             inactive_fill_color='#9fcf2e', inactive_fill_alpha=0.15,
                             background_fill_alpha=0),
    label_kwargs: dict=dict(text_font_size='10pt', text_align='center')
    ):
    
    
    # sort df by "y"
    df = df.sort_values(by=y, ignore_index=True)
    
    # calculate sector start and end angles
    df['angle'] = df[y] / df[y].sum() * 2 * np.pi
    df['cumsum_start'] = df['angle'].cumsum(axis='rows').shift(1).fillna(0)
    df['cumsum_end'] = df['angle'].cumsum(axis='rows')
    
    # calculate y percentages for hover & labels
    df['percentage_number'] = (df[y] / df[y].sum() * 100).round(percentage_decimal)
    df['percentage_hover'] = df['percentage_number'].astype(str)
    df['percentage_label'] = df['percentage_number'].apply(lambda x: "" if x < 5 else f"{x:.{percentage_decimal}f}%")
    
    # project label text coordinates to polar coordinates
    df['label_x_pos'] = np.cos(df['angle'].cumsum() - df['angle'].div(2)) * label_distance * outer_radius/4
    df['label_y_pos'] = np.sin(df['angle'].cumsum() - df['angle'].div(2)) * label_distance * outer_radius/4
    
    # remove assets that are 0
    df = df[df[y] > 0]
    
    # reset dataframe index to start with 0
    df = df.reset_index(drop=True)
    
    # init the figure/canvas for the plot
    p = figure(height=fig_height, 
               toolbar_location=None, 
               x_range=x_range,
               y_range=(-1.0, 1.0),
               sizing_mode=sizing_mode,
               **fig_kwargs)
    
    legend_items = []
    for idx, color in enumerate(pallette[df.shape[0]]):
        
        source = ColumnDataSource(df.iloc[idx,:].to_frame().T)
        
        # create the glyphs renderers
        wedge = p.annular_wedge(x=0, y=0, inner_radius=inner_radius, outer_radius=outer_radius, start_angle="cumsum_start", 
                        end_angle="cumsum_end", source=source, **wedge_kwargs,
                        fill_color=color, hover_fill_color=color,
                        line_color=background_color, hover_line_color=background_color,
                        line_alpha=1, hover_alpha=1, hover_line_alpha=1)
        
        
        label = LabelSet(x='label_x_pos', y='label_y_pos', text='percentage_label',
                         source=source, level='glyph', text_color=background_color, **label_kwargs)
        
        x_hover = x if x_hover is None else x_hover
        y_hover = y if y_hover is None else y_hover
        
        hover_tooltip = hover_tooltip if hover_tooltip != 'default' else \
            f"""
                <div>
                    <p style="margin:0;font-weight:bold;color:grey;">@{x_hover}</p>
                    <p style="padding:0;margin:0;font-weight:bold;">$@{y_hover}{{0,0.00}} (@percentage_hover%)</p>
                </div>
                <style>
                    .bk-tooltip {{
                        background-color: red!important;
                    }}
                </style>
            """
        
        p.add_layout(label)
        p.add_tools(HoverTool(renderers=[wedge],
                              tooltips=hover_tooltip))

        legend_items.append(LegendItem(label=df[x][idx], renderers=[wedge]))

    # legend
    legend = Legend(items=legend_items,  **legend_kwargs)
    
    p.add_layout(legend, place=legend_place)

    
    # figure attributes
    p.toolbar.active_drag = None
    p.axis.axis_label = None
    p.axis.visible = False
    p.grid.grid_line_color = None
    
    
    p.min_border=0
    p.outline_line_alpha=0
    p.outline_line_width=0
    p.outline_line_color = p.background_fill_color = p.border_fill_color = background_color

    return show(p)

h_tooltip = f"""
                <div>
                    <p style="margin:0;font-weight:bold;color:grey;">@Sector</p>
                    <p style="padding:0;margin:0;font-weight:bold;text-align:center;">@percentage_hover{{0}}%</p>
                </div>
                <style>
                    .bk-root .bk-tooltip {{
                        background-color: red;
                    }}
                </style>
            """

donut_chart(
        df=df_sectors_plot.iloc[1:,],
        x='Sector',
        y='Proportion',
        sizing_mode='fixed',
        background_color='#2C2B2B',
        percentage_decimal=0,
        fig_height=500,
        label_distance=3.1,
        label_kwargs=dict(text_font_size='12pt', text_align='center', text_font_style='bold'),
        legend_place='center',
        fig_kwargs={'width':500}
    )
#obj_js, obj_div = components(obj)

In [19]:
['a'] + ['b']

['a', 'b']

# Announcements

In [2]:
from dashboard.route.investments import df_dict, df_hist
import re

df_a = df_dict['announce'].copy()
df_a = df_a.reset_index(drop=True)
df_a = df_a[0].str.split(pat=':', n=1, expand=True)
df_a.columns = ['date', 'text']
no_date_idx = df_a['date'].str.contains(r"property for £980k", case=False).argmax()
df_a.loc[no_date_idx, 'text'] = df_a.loc[no_date_idx, 'date']
df_a.loc[no_date_idx, 'date'] = re.search(r"[A-Z]{1}[a-z]{2}\s202\d{1}", df_a.loc[no_date_idx, 'text'])[0]

df_a


Unnamed: 0,date,text
0,Feb 2023,Rather than save all your CASH in the bank (u...
1,Jan 2023,Property (Castle) Purchase 23rd Jan 2023. Cash...
2,Jan 2023,As central banks tighten (QT + Interest rate ...
3,Jan 2023,I wouldn't be surprised if we see a small ral...
4,Dec 2022,No change to last month. 2022 financial forec...
5,Nov 2022,Asset prices are now beginning to fall in cor...
6,Oct 2022,Cash is king right now. Assets are finally st...
7,Sep 2022,Cash currently held between multiple bank acc...
8,Aug 2022,"As per last month, I am still accumulating ca..."
9,Jul 2022,I'm following my February plan of going to CA...


# Stocks Watchlist

In [7]:
STOCKS_WATCH_URL = "https://docs.google.com/spreadsheets/d/12-GISr1efphjtpuJLCfQzI2akNXxaJ1iabsG24ib71c/edit#gid=845083323"

# Read in summary DF and drop empty rows
df = read_gsheet(
    STOCKS_WATCH_URL, 
    header=None
)

In [8]:
# find df header row index using regex pattern
header_idx = df.apply(lambda x: x.str.contains("Neil's Value", case=False)).any(axis='columns').argmax()

# separate disclaimer and df
df_disclaimer = df.iloc[:header_idx-1, 0]
df_watch = df.iloc[header_idx:,]

# set first row as header & reset row idxs
df_watch.columns = df_watch.iloc[0].values
df_watch = df_watch.iloc[1:].reset_index(drop=True)

# Generate buttons for 'Notes' column
df_watch.Notes[df_watch.Notes.notna()] = df_watch.Notes[df_watch.Notes.notna()].apply(comment_button)
df_watch = df_watch.fillna("")

# Color Ratings based on category
rating_colormap = {'Sig Undervalued':'green', 'Mod Undervalued':'blue', 'Fair Value':'grey', 'Value Trap?':'red'}
#df_watch["rating_color"] = df_watch.Rating.map(rating_colormap)


In [30]:
df1 = df_disclaimer.copy()

# add period end of the sentence if not so
df1 = df1.to_frame().rename(columns={0: 'info'})
df1.loc[:,'info'] = [x + '.' if x[-1] != '.' else x for x in df1['info']]

df_disc = df1.copy()

df_disc['color'] = ['warning', 'success', 'warning', 'success', 'success', 'danger']
df_disc['color'] = pd.Categorical(df_disc['color'],
                                  categories=['success', 'warning', 'danger'],
                                  ordered=True)
icon_dict = {'warning': 'exclamation-triangle-fill', 'success': 'check-lg', 'danger':'exclamation-octagon-fill'}
df_disc['icon_id'] = df_disc['color'].map(icon_dict)
df_disc = df_disc.sort_values('color')



Unnamed: 0,info,color,icon_id
1,Please make your own copy of the stocks sectio...,success,check-lg
3,These stocks were found using the techniques t...,success,check-lg
4,"Neil's Stocks Strategy: 80% into Index funds, ...",success,check-lg
0,These Valuations are for the purpose of Long T...,warning,exclamation-triangle-fill
2,NOTE! Many of the metrics below have to be man...,warning,exclamation-triangle-fill
5,"AT THIS TIME, I AM OUT OF ALL STOCKS. PLEASE D...",danger,exclamation-octagon-fill
