In [1]:
"""
Copyright:
    Copyright (C) 2020 Sauma Capital Inc - All Rights Reserved
    Unauthorized copying of this file, via any medium, is strictly prohibited
    Proprietary and confidential
Product: Fund Clustering Framework
Auther: Kai Fang
Description: Based class and based implementation of factory design pattern for
            clustering method, this would cover default cluster defined by client,
            and also machine learning based clustering method.
            For two layers clustering method, introduce pipeline structure design pattern 
            to do two step fitting at the same time.
"""

'\nCopyright:\n    Copyright (C) 2020 Sauma Capital Inc - All Rights Reserved\n    Unauthorized copying of this file, via any medium, is strictly prohibited\n    Proprietary and confidential\nProduct: Fund Clustering Framework\nAuther: Kai Fang\nDescription: Based class and based implementation of factory design pattern for\n            clustering method, this would cover default cluster defined by client,\n            and also machine learning based clustering method.\n            For two layers clustering method, introduce pipeline structure design pattern \n            to do two step fitting at the same time.\n'

In [2]:
class FundClusterBased:
    """Clustering algorithm to define the cluster of a specific clustering method used to define cateogry of mutual fund"""
    def __init__(self, cluster_method_name):
        self._cluster_method_name = cluster_method_name
    
    def set_up(self, **kwargs):
        """Function to setup any private variable for the allocator"""
        raise NotImplementedError("Subclasses should implement set_up function!")
    
    def cluster_method(self):
        """This provide identifier of the clustering strategy that you are implementing."""
        return self._cluster_method_name
    
    def machine_learning_based(self):
        """This method tells us whether the cluster category is machine learning 
            based and need to run fit to train model parameters or not
            
        Parameters:
            None

            Return
                bool
                True if the strategy need to run fit to be ready for prediction, otherwise No
        """
        raise NotImplementedError("Subclasses should implement machine_learning_based!")
    
    def load_raw_data(self, source_type, **kwargs):
        """Function to load raw data from source, should be able to support 
        reading data from flat file or sql database. Please just implement the one using flat file now,
        later we would provide the sql python package that we would want to utilize for the database task
        
        Parameters:
            source_type: str
                flat file type or sql, if it is flat file, file directory or 
                path need to be passed in as argument or in the setup function
                If it is sql, connection need to be extablished in setup function
                please avoid any hard coded name in the class, and set global variable to define those file name
        """
        raise NotImplementedError("Subclasses should implement load_raw_data function!")
    
    def set_hyper_parameter(self, **kwargs):
        """Function to re_config any hyper parameters that you need for your model, 
        the parameters should be initialized in your inherited setup function, by reading the config
        either from a config file or from argument, but please enable user to have a config file to set
        these hyper-parameters."""
        raise NotImplementedError("Subclasses should implement set_hyper_parameter!")

    def print_hyper_parameter(self, **kwargs):
        """Print all the hyper parameters that you set for your model."""
        raise NotImplementedError("Subclasses should implement print_hyper_parameter!")
    
    def fit(self, **kwargs):
        """Function to execute training either based on the data that you load from file or passed in as argument.
        When X, Y are passed in as argument, would train the model based on the training dataset passed in, and over write
        the existing data cached in the strategy obj. If you implement some new machine learning model rather than using
        existing machine learning model by some python package, please seperate the implementation of the model in another class,
        and initialize an instance of that model in your setup function rather than implement the model directly in the fit function,
        so that we could seprate the business logics with the machine learning model maintaining logics, and those model could be reused
        somewhere else too."""
        raise NotImplementedError("Subclasses should implement print_hyper_parameter!")
    
    def predict(self, **kwargs):
        """Run prediction after fitting the model, should throw error message when the model did not run fit yet."""
        raise NotImplementedError("Subclasses should implement predict")
    
    def model_summary(self):
        """Function that provide summary of model result: prediction accuracy, different matrix 
            to measure the model, and hyper-parameters of the model"

        Parameters:
            None

            Return
                dict {str: float/dataframe}
                key is the staticial measure name
                value is the statical measure, either a number or a matrix or a dataframe
        """
        raise NotImplementedError("Subclasses should implement model_summary")

    def output_result(self, **kwargs):
        """Function to output the model, could use pickle to cached the obj that 
        has been trained, so that you could load the obj later directly later, and you could also use this function
        to output the optimal cluster, please use arguments to config what you want to output
        
        Parameters:
            output_model: bool
                output model to pickle container
            output_cluster: bool
                output cluster for each fund
        """
        raise NotImplementedError("Subclasses should implement output_result")

In [3]:
class FundClusterVisualizationHelperBased:
    """Based class to define result visualization for the fund cluster strategy"""
    
    def __init__(self, cluster_method):
        """Init function to link the helper to a specific fund clustering strategy obj, 
            or mutliple clustering method, these obj could either be just created and 
            trained in memory, or load from pickle
        
        Parameters:
            cluster_method: FundClusterBased or derived class obj
                represent the cluster method that we want to register
        """
        self._cluster_method = cluster_method
    
    def generate_cluster_label(self):
        """Generate lable information for cluster, and print the cluster label name 
        for each cluster, and also the charatersitics of each label"""
        raise NotImplementedError("Subclasses should implement generate_cluster_label!")
    
    def get_fund_list(self, cluster_name):
        """Get funds based on cluster name provide
        
        Parameters:
            cluster_name: str
                name of the cluster defined in the cluster label
        """
        raise NotImplementedError("Subclasses should implement get_fund_list!")
    
    def get_cluster_charateristics(self, cluster_name):
        """Get funds charatersitics based on cluster name provide
        
        Parameters:
            cluster_name: str
                name of the cluster defined in the cluster label
        """
        raise NotImplementedError("Subclasses should implement get_cluster_charateristics!")

    def get_top_funds_in_cluster(self, cluster_name):
        """Based on fund ranking provided in database, provide the top fund in the cluster, 
        this need connection to alternative data project, could just return list of fund for now"""
        raise NotImplementedError("Subclasses should implement get_top_funds_in_cluster!")


In [2]:
CLUSTER_CLASS_CONFIG = {} #{Fund_cluster_method_name(str): derived class of FundClusterBased(class)}
CLUSTER_VISUALIZER_CLASS_CONFIG = {} # {Fund_cluster_method_name(str): derived class of FundClusterVisualizationHelperBased(class)}


class StrategyFactory:
    """Factory that could be used to generate a strategy and visualier of the it.
    Please do not touch this class unless you find some bug, do the adjustment on FundClusterFactory"""

    def __init__(self, strategy_class_config, visualizer_class_config):
        self._strategy = {}
        self._visualizers = {}
        self._strategy_class_config = strategy_class_config
        self._visualizer_class_config = visualizer_class_config
    
    def register_cluster(self, strategy_name):
        """Function to register an strategy in the factory, avaialble alloator could be found in self._strategy_class_config
            Later this could be implemented to load cached allocator and visualizer from Pickle directly too
        Parameters:
            strategy_name: str
                represent the strategy that we want to register
        """
        if cluster_name not in self._strategy_class_config:
            raise ValueError(f'{strategy_name} Strategy Class could not be found in configuration')
        if  strategy_name not in self._visualizer_class_config: 
            raise ValueError(f'{strategy_name} Visualizer Class could not be found in configuration')

        self._strategy[strategy_name] = self._strategy_class_config[strategy_name](strategy_name)
        self._visualizer[strategy_name] = self._visualizer_class_config[strategy_name](self._allocator[strategy_name])
    
    def create_strategy(self, strategy_name):
        """Function to load the strategy from the factory center
        Parameters:
            strategy_name: str
                represent the allocator that we want to register
        """
        strategy = self._strategy.get(strategy_name)
        if allocator is None:
            raise ValueError(f'{strategy_name} Allocator has not been registered, please register before using it')
        return allocator
    
    def create_visualizer(self, strategy_name):
        """Function to load the visualizer from the factory center
        Parameters:
            strategy_name: str
                represent the allocator that we want to register
        """
        visualizer = self._visualizer.get(strategy_name)
        if visualizer is None:
            raise ValueError(f'{strategy_name} Visualizer has not been registered, please register before using it')
        return visualizer

class FundClusterFactory(StrategyFactory):
    """Factory to generate clustering strategy obj and visualizer obj"""
    
    def __init__(self):
        super().__iniit__(CLUSTER_CLASS_CONFIG, CLUSTER_VISUALIZER_CLASS_CONFIG)

In [5]:
from sklearn.pipeline import Pipeline
from shutil import rmtree
from tempfile import mkdtemp

class MultipleLayerModelBased:
    """This implement a design pattern for multiple layer algorithm, this interface only handle the algorithm logics part
    And do not cached any data, so as to make the design light weighted. The data processing and preparation part is handled in the Fund Cluster
    other Based class. For more information, review the following docs: https://scikit-learn.org/stable/modules/compose.html"""
    
    def __init__(self, estimaters, cahced = False):
        """
        Set up the pipeline for the multiple layer that we use in the model construction
        
        Parameters:
        
        estimators: list of turple 
            example: [('reduce_dim', pca2), ('clf', svm2)] this would be a list of ('model_name', estimator), the estimator could be
            a customized class inherit from BaseEstimator, based on the model you have, the estimator could be transformer or estimator,
            and based on the machine model, estimator could be classifer or regressor, check the next class defintion for more detail
        """
        self.estimaters = estimators
        self.cachedir = mkdtemp()
        self.cached = cached
        if cached:
            self.pipe = Pipeline(estimators, memory=cachedir)
        else:
            self.pipe = Pipeline(estimators)
    
    def remove_cahce(self):
        """Remove the cached model parameters in the pipeline"""
        if self.cached:
            rmtree(self.cachedir)

    def fit(self, **kwargs):
        """Fit the pipeline based on the parameters
        Parameters:
            X: df/np.array/any customized type, but you need to make sure that all estimator could handle this data type
                independent variable, possibly you could use data_hlper class you define for this purpose
            Y: df/np.array/any customized type, but you need to make sure that all estimator could handle this data type
                dependent variable, if it is just a unsupervised problem, you may not have Y, possibly you could use 
                data_hlper class you define for this purpose
        """
        raise NotImplementedError("Subclasses should implement fit, expected to be something like could be something like return self.pipe.fit(X, Y)")
    
    def predict(self, **kwargs):
        """Run prediction after fitting the model, should throw error message when the model did not run fit yet.
        
        Parameters:
            X: df/np.array/any customized type, but you need to make sure that all estimator could handle this data type
                independent variable
            Y: df/np.array/any customized type, but you need to make sure that all estimator could handle this data type
                dependent variable, if it is just a unsupervised problem, you may not have Y
        """
        raise NotImplementedError("Subclasses should implement predict")
    
    def model_summary(self):
        """Function that provide summary of model result: prediction accuracy, different matrix 
            to measure the model, and hyper-parameters of the model"

        Parameters:
            None

            Return
                dict {str: float/dataframe}
                key is the staticial measure name
                value is the statical measure, either a number or a matrix or a dataframe
        """
        raise NotImplementedError("Subclasses should implement model_summary")

    def output_result(self, **kwargs):
        """Function to output the model, could use pickle to cached the obj that 
        has been trained, so that you could load the obj later directly later, and you could also use this function
        to output the optimal portfolio, please use arguments to config what you want to output
        
        Parameters:
            output_model: bool
                output model to pickle container
            output_portfolio: bool
                output optimal portfolio generated
        """
        raise NotImplementedError("Subclasses should implement output_result")
    

In [4]:
from sklearn.base import BaseEstimator, ClassifierMixin, TransformerMixin

class TransformerBased(BaseEstimator, TransformerMixin):  
    """Transformer based class to be define for each unsupervised learning processing, this define the 
    required method that you need to define for the model in order to pass the model into the pipeline
    docs: 1) https://towardsdatascience.com/pipelines-custom-transformers-in-scikit-learn-the-step-by-step-guide-with-python-code-4a7d9b068156
          2) https://towardsdatascience.com/custom-transformers-and-ml-data-pipelines-with-python-20ea2a7adb65
    related reading: estimator
    docs: http://danielhnyk.cz/creating-your-own-estimator-scikit-learn/
    https://gist.github.com/amberjrivera/8c5c145516f5a2e894681e16a8095b5c"""

    def __init__(self, **kwargs):
        """
        initialize the Estimator based on arguments
        """
        pass
    
    def fit(self, **kwargs):
        """
        Implement you model here for the estimators, please try to generate private method and modularize the model setup if
        the model is complicate

        """
        raise NotImplementedError("Subclasses should implement fit")

    def transform(self, **kwargs):
        raise NotImplementedError("Subclasses should implement predict")

class EstimatorBased(BaseEstimator):
    """Estimator based class to be define for each clustering layer,
    this define the required method that you need to define for the 
    model in order to pass the model into the pipeline docs: https://scikit-learn.org/stable/developers/develop.html """

    def __init__(self, **kwargs):
        """
        initialize the Estimator based on arguments
        """
        pass

    def fit(self, **kwargs):
        """
        Implement you model here for the estimators, please try to generate private method and modularize the model setup if
        the model is complicate

        """
        raise NotImplementedError("Subclasses should implement fit")

    def predict(self, **kwargs):
        raise NotImplementedError("Subclasses should implement predict")    

# For the usecase we have here, we only need unsupervised learning method, so we could directly implenet transformer,
# But if we are handling the case of estimator, we need to implement estimator
# For the pipeline implementation, the last estimator passed in should be estimator type rather than reansformer type, and there could
# be only one estimator in the pipeline.

# Please implement the multiple step clustering method using the estimator framework
class FirstLayerCluster(EstimatorBased):
    pass

class SecondLayerCluster(EstimatorBased):
    pass

# Please implement the MutlipleLayerClustering based on FundClusterBased and FirstLayerCluster, SecondLayerCluster, MultipleLayerModelBased

In [None]:
# please define the setup_connection in this class
class SQLHandlerMixin:
    """Mixin class that you would include in the inheritance hierarchy to migarte all possible operation to SQL
    so as to speed up calculation, you would need to integrate the sauma.core package and utilize the connection obj here"""
    
    def setup_connection(self, username, password):
        """initilize the connection obj here, and use it for any operation"""
#         self.conn = Connection(username, password)
        pass
    
    def setup_table_template(self):
        """define the table template as local variable in this method for all derived class, and utilize this method to setup tables"""
        raise NotImplementedError("Derived Class need to implement this method")
    
    def check_table_exist_or_not(self, schemas, table_name):
        """Please define this method to check whether a table under certain schemas exist or not"""
        pass

    def look_up_or_create_table(self, template, custom_table_name=None, custom_schemas_name=None):
        """Please define this method to create a table based on the template if a table does not exist, do nothing if table already exist,
        you may want to use self.check_table_exist_or_not here, if custom_table_name is none, you should be able to find it in template"""
        pass
    
    def drop_table(self, schemas, table_name):
        pass
    
    def chunks_update_table(self, schema, table_name, dataframe, **kwargs):
        """when you have a large dataframe, it mays takes a long time to update the sql table if you upload it at once, you could actually
        divide the table into smaller chunks and upload them piece by piece to speed up the process, as it is more memory efficient and use less cpu,
        try to implement this method here too"""
        pass

# redefine the data input function and output function in your derived FundClusterStrategy Class using SQL query to enable database connection
# to handle data processing, please follow the provide python package on how to update a table, you need to provide a table template as configuration
# of the table, and use the configuration to update the sql table, please setup your own sql db in your local machine and test based on it
# The following show some function that you need to define in your derived strategy class

class DerivedFundClusterStrategySQL(DerivedFundClusterStrategy, SQLHandlerMixin):
    """Inlcude SQL Operation in Mutual Fund Performance Feature Calculation"""
    def setup_table_templates(self):
        """Define the all table template as local variable here, all these table template should be defined as a global variable in a
        python file, and import here for this class to use, please check the sauma.core documentation, the template format should be something like:
        {
            "tableName": "Test",
            "schema": "test_db",
            "primaryKey":["id"],
            "columns": [{"name": "id",       "type":"INTEGER"},                           // case insensitive
                        {"name": "text_col", "type":"STRING", "size":50},
                        {"name": "int_col",  "type":"INT"}
            ],
            "primaryKey":["id"],
            "description": 'sample table to know about the format'
        }
        
        
        for example, you define a list of template under performance_feature/custom.py
        so you could do from performance_feature.custom import TEMPLATE_A, TEMPLATE_B, TEMPLATE_C
        and do self.templateA = TEMPLATE_A inside this class
        and do self.look_up_or_create_table(self.templateA) to setup the table in the setup_table function
        as the sauma.core package require a json obj as input, you may need to transoform the dictionary into a json obj by doing 
        import json
        # Data to be written   
            dictionary ={   
              "id": "04",   
              "name": "sunil",   
              "depatment": "HR"
            }   

            # Serializing json    
            json_object = json.dumps(dictionary)
        """
        pass
    
    def setup_tables(self):
        """setup all table based on the setup_table_Templates"""
        pass
    
    def __init__(self, username, password):
        super().__init__()
        self.setup_connection(username, password)
        self.setup_table_templates()
        self.setup_tables()
    
    def update_raw_data(self):
        """assuming that your data source is the csv file containing all the raw data, load the raw data from csv, and update the table
        which you already setup based on your template"""
        pass
    