# Executive Summary

This full example is meant to implement how the digital twin abstract class is supposed to be useful and work. It will show what needs to be done on the model end as well as what gets used on the digital twin abstract class end.

The directory for this model is "model2"

Everything here would be what should be going into a dt.py file that would create the digital twin framework.

# 1. Data Pipelines

# Model Specific

## Types

Under types, there is definitions for all types that are to be used in the system. Copious typing should be done, and for all data types with both raw/processed there should be a type.

## Data

While there is data processing functions, these are supposed to be light data functions and any hardcode data processing should be through a data infrastructure. The functions will be of the following categories, where N is a variable number of data pulls, and brackets will show how many of any given type there should be.

1. [1+] Data Connection Method: A method for how to connect to the database. Input: None Output: Connection
2. [N] Raw Data Pulls: Pulls that directly hit the data infrastructure held database tables. Input: Connection Output: Raw Data Type
3. [N] Data Processing: Any light data processing such as doing pivots since holding a pivot table would be space ineffecient on the SQL side. Input: Raw Data Type Output: Processed Data Type
4. [1] Backtest Data Pull: A pull that combines all raw pulls and data processing pulls, plus connects to the database and returns a Backtest Data Type. Input: None Output: Backtest Data Type
5. [1] Input Data Computation: A processing function that takes backtest data and then returns starting state, the historical data, the input data (what goes into the backtest) and the output data (what is being used to validate the backtest). Input: Backtest Data Type Output: Input Data Type
6. [1] Format Inputs: One function which takes the input data and formats the input into data classes for use within cadCAD.

# Digital Twin Specific

1. A DataPipeline class should be made which fills out pull_historical_data, compute_input_data, format_input_data corresponding to the functions defined above.
2. A function of load_data_initial should be made in the DT class which specifies how to save down pulled data. This can be csv, pickle, etc. 
3. A function of load_data_prior should be made in the DT class which specifies how to pull back old data used.

In [1]:
import digital_twin
from model2.data import pull_backtest_data, create_input_data, format_inputs
from model2.types import BacktestData

class ArbitrageDataPipeline(digital_twin.DataPipeline):
    
    def pull_historical_data(self):
        return pull_backtest_data()
    
    def compute_input_data(self, data):
        return create_input_data(data)
    
    def format_input_data(self, data):
        return format_inputs(data)

class ArbitrageDigitalTwin(digital_twin.DigitalTwin):
    
    def load_data_initial(self):
        self.historical_data = self.data_pipeline.pull_historical_data()
        
        self.historical_data.pure_returns.to_csv("pure_returns.csv")
        self.historical_data.prices_data.to_csv("prices_data.csv")
        self.historical_data.trades_data.to_csv("trades_data.csv")
    
    def load_data_prior(self):
        pure_returns = pd.read_csv("pure_returns.csv", index_col = 0)
        prices_data = pd.read_csv("prices_data.csv", index_col = 0)
        trades_data = pd.read_csv("trades_data.csv", index_col = 0)
        
        self.historical_data = BacktestData(pure_returns = pure_returns_data, 
                        prices_data = prices_data,
                        trades_data = trades_data)

In [2]:
TestDataPipeline = ArbitrageDataPipeline()
arb_dt = ArbitrageDigitalTwin(name = "Test",
                    data_pipeline = TestDataPipeline)
arb_dt.load_data_initial()
arb_dt.compute_input_data()

# 2. Backtest Model

# Model Specific

## Partial State Update Blocks

- The file of psub.py holds all the partial state update blocks. There is distinction between the backtesting based blocks and the extrapolation based blocks.

## Run 

This file has a few functionalities to be built.

1. load_config_backtest: This function is meant to load up the configuration for backtesting.
2. run: A function for running the model
3. post_processing: A function for post processing after the run

# Digital Twin Specific

1. A BacktestModel class needs to be made from digital twin model
2. The load config function should map the configuration loading for backtesting
3. The run model function fills in how the model will run
4. Post processing likewise is mapped through there

We add it into the full workflow below as an example.

In [3]:
from model2.run import load_config_backtest, run, post_processing

In [4]:
class BacktestModel(digital_twin.Model):
    def load_config(self, monte_carlo_runs, timesteps,
                    params, initial_state):
        exp = load_config_backtest(monte_carlo_runs = monte_carlo_runs,
            timesteps = timesteps,
            params = params,
            initial_state = initial_state)
        return exp
    
    def run_model(self, exp):
        raw = run(exp)
        return raw
    
    def post_processing(self, backtest_data):
        df = post_processing(backtest_data)
        return df

In [5]:
TestDataPipeline = ArbitrageDataPipeline()
TestBacktestModel = BacktestModel()
arb_dt = ArbitrageDigitalTwin(name = "Test",
                    data_pipeline = TestDataPipeline,
                    backtest_model = TestBacktestModel)
arb_dt.load_data_initial()
arb_dt.compute_input_data()

params_backtest = {}
monte_carlo_runs_backtest = 1

arb_dt.run_backtest(monte_carlo_runs_backtest, params_backtest)


                  ___________    ____
  ________ __ ___/ / ____/   |  / __ \
 / ___/ __` / __  / /   / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/  |_/_____/
by cadCAD

cadCAD Version: 0.4.28
Execution Mode: local_proc
Simulation Dimensions:
Entire Simulation: (Models, Unique Timesteps, Params, Total Runs, Sub-States) = (1, 100, 1, 1, 2)
     Simulation 0: (Timesteps, Params, Runs, Sub-States) = (100, 1, 1, 2)
Execution Method: local_simulations
Execution Mode: single_threaded
Total execution time: 0.05s


Params can be dynamic to allow for different assumptions about what randomness looks like. For example, one set of params might be 

The name is for the name to be given to each series of data. The use_seeds param is whether to use seeds 1...N for setting randomness for reproducibility. The param_values are for the values in parameter setting. 

In [6]:
class TestStochasticFit(digital_twin.StochasticFit):
    
    def fit_index_return(self, param_value, input_data, historical_data):
        if param_value['type'] == 'Expert Model':
            #Already has values put in
            pass
        elif param_value['type'] == 'Normal Fitted':
            #Add mu and std
            data = historical_data.pure_returns["index_return"]
            param_value["mu"] = data.mean()
            param_value["std"] = data.std()
        else:
            raise NotImplementedError
        
    def fit_basket_return(self, param_value, input_data, historical_data):
        if param_value['type'] == 'Expert Model':
            #Already has values put in
            pass
        elif param_value['type'] == 'Normal Fitted':
            #Add mu and std
            data = historical_data.pure_returns["basket_return"]
            param_value["mu"] = data.mean()
            param_value["std"] = data.std()
        else:
            raise NotImplementedError
        
    def fit_param(self, param, input_data, historical_data):
        for pv in param["param_values"]:
            if pv == "index_return":
                self.fit_index_return(param["param_values"][pv], input_data, historical_data)
            elif pv == "basket_return":
                self.fit_basket_return(param["param_values"][pv], input_data, historical_data)
            else:
                raise NotImplementedError
                
                


In [7]:
TestDataPipeline = ArbitrageDataPipeline()
TestBacktestModel = BacktestModel()

params1 = {"name": "Normal Distribution",
           "monte_carlo_runs": 10,
           "use_seeds": True,
    "param_values": {"index_return": {"type": "Normal Fitted"},
         "basket_return": {"type": "Normal Fitted"}}}

params2 = {"name": "Expert Model",
           "monte_carlo_runs": 10,
           "use_seeds": True,
           "param_values": {"index_return": {"type": "Expert Model",
                                             "lambda": .9,
                          "mu": .015,
                          "std": .1},
                            
         "basket_return": {"type": "Expert Model",
                          "mu": .01,
                           "std": .05}}}

params_sf = [params1, params2]
test_stochastic_fit = TestStochasticFit(params_sf)

arb_dt = ArbitrageDigitalTwin(name = "Test",
                    data_pipeline = TestDataPipeline,
                    backtest_model = TestBacktestModel,
                    stochastic_fit = test_stochastic_fit)
arb_dt.load_data_initial()
arb_dt.compute_input_data()

params_backtest = {}
monte_carlo_runs_backtest = 1


arb_dt.run_backtest(monte_carlo_runs_backtest, params_backtest)
arb_dt.fit_stochastic_fit()


                  ___________    ____
  ________ __ ___/ / ____/   |  / __ \
 / ___/ __` / __  / /   / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/  |_/_____/
by cadCAD

cadCAD Version: 0.4.28
Execution Mode: local_proc
Simulation Dimensions:
Entire Simulation: (Models, Unique Timesteps, Params, Total Runs, Sub-States) = (1, 100, 1, 1, 2)
     Simulation 0: (Timesteps, Params, Runs, Sub-States) = (100, 1, 1, 2)
Execution Method: local_simulations
Execution Mode: single_threaded
Total execution time: 0.03s


# Signal Extrapolation

In [19]:
import numpy as np
from model2.types import Returns

class TestSignalExtrapolation(digital_twin.SignalExtrapolation):
    def extrapolate_index_return(self, param, t, n, signals):
        if param['type'] == 'Expert Model':
            signals["index_return"] = param["lambda"] * signals["basket_return"] +\
            (1 - param["lambda"]) * np.random.normal(param["mu"], param["std"], (n, t))
        elif param['type'] == 'Normal Fitted':
            signals["index_return"] = np.random.normal(param["mu"], param["std"], (n, t))
        else:
            raise NotImplementedError
    
    def extrapolate_basket_return(self, param, t, n, signals):
        if param['type'] == 'Expert Model':
            signals["basket_return"] = np.random.normal(param["mu"], param["std"], (n, t))
        elif param['type'] == 'Normal Fitted':
            signals["basket_return"] = np.random.normal(param["mu"], param["std"], (n, t))
        else:
            raise NotImplementedError
    
    def extrapolate_signals(self, stochastic_params, t):
        signals_total = []
        for stochastic_param_i in stochastic_params:
            n = stochastic_param_i["monte_carlo_runs"]
            signals = {}
            if stochastic_param_i["use_seeds"]:
                np.random.seed(1)
            self.extrapolate_basket_return(stochastic_param_i["param_values"]["basket_return"], t, n, signals)
            self.extrapolate_index_return(stochastic_param_i["param_values"]["index_return"], t, n, signals)
            signals_total.append(signals)
        return signals_total
    
    def process_signal(self, param, signal_raw):
        signal = []
        for i in range(param["monte_carlo_runs"]):
            signal_i = []
            br = signal_raw["basket_return"][:,i]
            ir = signal_raw["index_return"][:,i]
            run_n = i
            signal_name = param["name"]
            for br_i, ir_i in zip(br, ir):
                r = Returns(index_return = ir_i,
                           basket_return = br_i)
                signal_i.append({"returns": r,
                                "signal_name": signal_name,
                                "signal_run_number": run_n})
            signal.append(signal_i)
        return signal



In [20]:
TestDataPipeline = ArbitrageDataPipeline()
TestBacktestModel = BacktestModel()

params1 = {"name": "Normal Distribution",
           "monte_carlo_runs": 10,
           "use_seeds": True,
    "param_values": {"index_return": {"type": "Normal Fitted"},
         "basket_return": {"type": "Normal Fitted"}}}

params2 = {"name": "Expert Model",
           "monte_carlo_runs": 10,
           "use_seeds": True,
           "param_values": {"index_return": {"type": "Expert Model",
                                             "lambda": .9,
                          "mu": .015,
                          "std": .1},
                            
         "basket_return": {"type": "Expert Model",
                          "mu": .01,
                           "std": .05}}}

extrapolation_epochs = 25

params_sf = [params1, params2]
test_stochastic_fit = TestStochasticFit(params_sf)
test_signal_extrapolation = TestSignalExtrapolation()

arb_dt = ArbitrageDigitalTwin(name = "Test",
                    data_pipeline = TestDataPipeline,
                    backtest_model = TestBacktestModel,
                    stochastic_fit = test_stochastic_fit,
                    signal_extrapolation = test_signal_extrapolation,
                    extrapolation_epochs = extrapolation_epochs)
arb_dt.load_data_initial()
arb_dt.compute_input_data()

params_backtest = {}
monte_carlo_runs_backtest = 1


arb_dt.run_backtest(monte_carlo_runs_backtest, params_backtest)
arb_dt.fit_stochastic_fit()
arb_dt.extrapolate_signals()


                  ___________    ____
  ________ __ ___/ / ____/   |  / __ \
 / ___/ __` / __  / /   / /| | / / / /
/ /__/ /_/ / /_/ / /___/ ___ |/ /_/ /
\___/\__,_/\__,_/\____/_/  |_/_____/
by cadCAD

cadCAD Version: 0.4.28
Execution Mode: local_proc
Simulation Dimensions:
Entire Simulation: (Models, Unique Timesteps, Params, Total Runs, Sub-States) = (1, 100, 1, 1, 2)
     Simulation 0: (Timesteps, Params, Runs, Sub-States) = (100, 1, 1, 2)
Execution Method: local_simulations
Execution Mode: single_threaded
Total execution time: 0.01s


In [21]:
arb_dt.signals

[[{'returns': Returns(index_return=-0.0025042968388038107, basket_return=0.09526022601615816),
   'signal_name': 'Normal Distribution',
   'signal_run_number': 0},
  {'returns': Returns(index_return=-0.02753262695582072, basket_return=-0.02163903994685972),
   'signal_name': 'Normal Distribution',
   'signal_run_number': 0},
  {'returns': Returns(index_return=0.11290801992241294, basket_return=0.028193423678909618),
   'signal_name': 'Normal Distribution',
   'signal_run_number': 0},
  {'returns': Returns(index_return=0.05978449932148622, basket_return=-0.08943002307305407),
   'signal_name': 'Normal Distribution',
   'signal_run_number': 0},
  {'returns': Returns(index_return=0.062369936924288126, basket_return=-0.009655761614224817),
   'signal_name': 'Normal Distribution',
   'signal_run_number': 0},
  {'returns': Returns(index_return=0.041030541814467254, basket_return=0.07559962232999975),
   'signal_name': 'Normal Distribution',
   'signal_run_number': 0},
  {'returns': Returns(i

In [10]:
# For extrapolation, convert to the input data, but also add in the name
# And an integer for the set

In [11]:
#Convert to inputs as well?

In [12]:
arb_dt.input_data.input_data

Unnamed: 0_level_0,returns
t,Unnamed: 1_level_1
0,"Returns(index_return=0.10871386253910743, bask..."
1,"Returns(index_return=0.015029483765100592, bas..."
2,"Returns(index_return=0.04183835929990093, bask..."
3,"Returns(index_return=0.12103416104564571, bask..."
4,"Returns(index_return=0.08280887550560694, bask..."
...,...
95,"Returns(index_return=0.040580329256415186, bas..."
96,"Returns(index_return=0.018690406444573593, bas..."
97,"Returns(index_return=0.0990992137653999, baske..."
98,"Returns(index_return=0.037843403664469796, bas..."
