In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


**<h1>Technical Design Documentation (Markdown as docstrings)
Project Overview
This project aims to create a system where each worker operates in its own isolated environment, processes its own data, and contributes to the overall machine learning task. The system will use multiprocessing to efficiently manage tasks and ensure data independence. The design will follow best practices for data analytics, machine learning, and isolated environments.</h1>**

-------------------------------------------------------------------------------------------------------------------

<h3>
Components
Isolated Environments
Data Handling
Task Management
Multiprocessing
Machine Learning
Aggregation
</h3>

<h3>1. Isolated Environments
Objective: Ensure each worker operates in its own isolated environment to prevent data leakage and dependency.

Best Practices:
> Use venv or virtualenv to create isolated environments.
> Install only necessary dependencies in each environment.
> Use environment variables to manage configurations.
> Use bottom cell after #7 to run total file with your data loaded-in. Use '/content/sample_data' path for google colab
> Implementation:</h3>

In [None]:
# Install required packages
!pip install pandas numpy scikit-learn

In [None]:
## venv only, not google colab. Full file prog. at bottom.
python -m venv worker_env_{worker_id}
source worker_env_{worker_id}/bin/activate
pip install -r requirements.txt

**<h1>2. Data Handling
Objective: Efficiently load, preprocess, and manage data for each worker.</h1>**

**<h3>Best Practices:
Store data in a structured format (e.g., JSON, CSV).
Use efficient data structures (e.g., Pandas DataFrames) for data manipulation.
Ensure data is cleaned and preprocessed before training.
Implementation:</h3>**

In [None]:
# Data Handling
class DataHandler:
    def __init__(self, file_path):
        self.file_path = file_path

    def load_data(self):
        if not os.path.exists(self.file_path):
            print(f"File {self.file_path} does not exist.")
            return pd.DataFrame()

        data = []
        with open(self.file_path, 'r') as f:
            for line in f:
                line = line.strip()
                if line and not line.startswith("Report Number") and not line.startswith("Business Date") and not line.startswith("Security ID"):
                    # Split the line into columns based on whitespace
                    columns = line.split()
                    data.append(columns)

        if not data:
            print("No valid data found in the file.")
            return pd.DataFrame()

        # Determine the maximum number of columns in the data
        max_columns = max(len(row) for row in data)

        # Create a DataFrame from the parsed data
        df = pd.DataFrame(data, columns=[f"Column_{i}" for i in range(max_columns)])
        print("Loaded data columns:", df.columns)
        print("Loaded data shape:", df.shape)
        return df

    def preprocess_data(self, df):
        # Implement preprocessing steps
        # For example, ensure 'label' column exists
        if 'label' not in df.columns:
            df['label'] = np.random.randint(0, 1000, size=len(df))  # Dummy label for demo only

        # Convert all columns to numeric, coercing errors to NaN
        df = df.apply(pd.to_numeric, errors='coerce')

        # Fill NaN values with 0 or any other appropriate value
        df = df.fillna(0)

        print("Preprocessed data columns:", df.columns)
        print("Preprocessed data shape:", df.shape)
        return df

**<h1>3.Task Management Objective: Efficiently manage and partition tasks across workers.</h1>**

**<h3>Best Practices: Use a task queue to dynamically assign tasks. Ensure tasks are independent and can be processed in parallel.Implementation:</h3>**


In [None]:
# Task Management
class TaskManager:
    def __init__(self):
        self.task_queue = Queue()

    def add_task(self, task):
        self.task_queue.put(task)

    def get_task(self):
        return self.task_queue.get()

**<h1>4. Multiprocessing
Objective: Use multiprocessing to efficiently manage and execute tasks.
Best Practices:
Use multiprocessing.Pool for managing worker processes.
Ensure proper synchronization and communication between processes.
Implementation:</h1>**

In [None]:
import multiprocessing

# Worker Task
def worker_task(worker_id):
    env_path = setup_environment(worker_id)
    data_handler = DataHandler(env_path)
    data = data_handler.load_data()
    if data.empty:
        print(f"No data loaded for worker {worker_id}")
        return None, None
    preprocessed_data = data_handler.preprocess_data(data)
    if preprocessed_data.empty:
        print(f"No data after preprocessing for worker {worker_id}")
        return None, None
    model = MLModel(preprocessed_data)
    monte_carlo = MonteCarloSimulation(model)
    mean_accuracy, std_accuracy = monte_carlo.run_simulation(preprocessed_data)
    return mean_accuracy, std_accuracy

# Setup Environment
def setup_environment(worker_id):
    env_path = f'worker_env_{worker_id}'
    os.makedirs(env_path, exist_ok=True)
    return env_path

## At bottom
# # Main Function
# def main():
#     num_workers = 4
#     with multiprocessing.Pool(num_workers) as pool:
#         results = pool.map(worker_task, range(num_workers))
#     print(f'Aggregated Results: {results}')

# if __name__ == '__main__':
#     main()

# parsed_trade = parse_trade(trade_example)
# print(parsed_trade)

**<h1>5. Machine Learning Objective: Train and test machine learning models efficiently.
Best Practices:
Use appropriate algorithms and hyperparameters.
Ensure proper train-test split to avoid overfitting.
Use cross-validation for model evaluation.
Implementation:</h1>**

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

# Machine Learning Model
class MLModel:
    def __init__(self, data):
        self.data = data

    def train_model(self, data):
        X = self.data.drop('label', axis=1)
        y = self.data['label']
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
        model = RandomForestClassifier()
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        return accuracy_score(y_test, y_pred)

**<h1>6.Monte Carlo Simulation
Objective: Perform Monte Carlo simulations to evaluate the robustness of trading strategies.
Best Practices:
Use vectorized operations for efficiency.
Ensure reproducibility by setting random seeds.
Use a large number of simulations to ensure statistical significance.
Implementation:
Implement a Monte Carlo simulation class that runs multiple simulations.
Use vectorized operations with NumPy for efficiency.</h1>**

In [None]:
import numpy as np

# Monte Carlo Simulation
class MonteCarloSimulation:
    def __init__(self, model, num_simulations=1000):
        self.model = model
        self.num_simulations = num_simulations

    def run_simulation(self, data):
        results = []
        for _ in range(self.num_simulations):
            simulated_data = self.simulate_data(data)
            accuracy = self.model.train_model(simulated_data)
            results.append(accuracy)
        return np.mean(results), np.std(results)

    def simulate_data(self, data):
        # Implement data simulation logic
        return data

**<h1>7.Aggregation
Objective: Aggregate results from each worker to form the final output.
Best Practices:
Use statistical methods to aggregate results.
Ensure robustness and reliability of aggregated results.
Implementation:</h1>**

**<h1>Citation Machine: </h1>**

**<h1>[1]Ccxt, CCXT/Python/CCXT at master Â· Author: Igor Kroitor 2024. Code Owners: *    @kroitor @frosty00 CCXT/CCXT, GitHub. (n.d.). https://github.com/ccxt/ccxt/tree/master/python/ccxt https://github.com/ccxt/ccxt/tree/master (accessed August 17, 2024).</h1>**

**<h1>Data sample attribution to HitBtc API attribution. Citation for data and example test code to run: [1]About HITBTC API, API Documentation. (n.d.). https://api.hitbtc.com/ (accessed August 17, 2024). </h1>**

In [None]:
from datetime import datetime

def parse_trade(trade: dict, market: dict = None) -> dict:
    """
    Example data from hitbtc3.py
    """
    # Helper functions to safely extract values
    def safe_string(d, key, default=None):
        return str(d[key]) if key in d else default

    def safe_value(d, key, default=None):
        return d[key] if key in d else default

    def parse8601(timestamp):
        return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')

    # Extract values from the trade dictionary
    timestamp = parse8601(safe_string(trade, 'timestamp'))
    marketId = safe_string(trade, 'symbol')
    market = market if market else {'symbol': marketId}
    symbol = market['symbol']
    fee = None
    feeCostString = safe_string(trade, 'fee')
    taker = safe_value(trade, 'taker')
    takerOrMaker = 'taker' if taker else 'maker'

    # Construct the parsed trade dictionary
    parsed_trade = {
        'timestamp': timestamp,
        'datetime': timestamp.isoformat(),
        'symbol': symbol,
        'id': safe_string(trade, 'id'),
        'order': safe_string(trade, 'order_id'),
        'type': None,
        'side': safe_string(trade, 'side'),
        'price': float(safe_string(trade, 'price')),
        'amount': float(safe_string(trade, 'quantity')),
        'cost': float(safe_string(trade, 'price')) * float(safe_string(trade, 'quantity')),
        'fee': {
            'cost': float(feeCostString),
            'currency': marketId.split('_')[1] if '_' in marketId else None,
        } if feeCostString else None,
        'takerOrMaker': takerOrMaker,
        'info': trade,
    }

    return parsed_trade

# Example data from hitbtc3.py
trade_example = {
    "id": 4718564,
    "order_id": 58730811958,
    "client_order_id": "475c47d97f867f09726186eb22b4c3d4",
    "symbol": "BTCUSDT_PERP",
    "side": "sell",
    "quantity": "0.0001",
    "price": "41118.51",
    "fee": "0.002055925500",
    "timestamp": "2022-03-17T05:23:17.795Z",
    "taker": True,
    "position_id": 2350122,
    "pnl": "0.002255000000",
    "liquidation": False
}


**<h1>def main():</h1>**

In [None]:
def aggregate_results(results):
    # Implement aggregation logic (e.g., averaging accuracies)
    return sum(results) / len(results)

# Main Function
def main():
    num_workers = 4
    with multiprocessing.Pool(num_workers) as pool:
        results = pool.map(worker_task, range(num_workers))
    print(f'Aggregated Results: {results}')

if __name__ == '__main__':
    main()

parsed_trade = parse_trade(trade_example)
print(parsed_trade)

**<h1>Full Simplest Toy Ex. Result</h1>**

**<h1>Load your data: json, txt, csv,... Make sure there's handling data for your specific case. Mainly handling for .json, then .txt</h1>**

In [2]:
import os

##### Verify code exists #####
file_path = '/content/sample_data/report550_2023-12-04.txt'
if os.path.exists(file_path):
    print(f"File {file_path} exists.")
else:
    print(f"File {file_path} does not exist.")

File /content/sample_data/report550_2023-12-04.txt exists.


**<h1>Need to upload your own data file (only within google colab)! Promise its worth it &#x1F600; (#hexadecimal) ! </h1>**

In [None]:
from google.colab import files
uploaded = files.upload()

In [10]:
![17-facts-about-hexadecimal-reboot-1691980921](./17-facts-about-hexadecimal-reboot-1691980921.jpg)

/bin/bash: -c: line 1: syntax error near unexpected token `./17-facts-about-hexadecimal-reboot-1691980921.jpg'
/bin/bash: -c: line 1: `[17-facts-about-hexadecimal-reboot-1691980921](./17-facts-about-hexadecimal-reboot-1691980921.jpg)'


In [None]:
import os
import pandas as pd
import numpy as np
import multiprocessing
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from queue import Queue

# Data Handling
class DataHandler:
    def __init__(self, file_path):
        self.file_path = file_path

    def load_data(self):
        if not os.path.exists(self.file_path):
            print(f"File {self.file_path} does not exist.")
            return pd.DataFrame()

        data = []
        with open(self.file_path, 'r') as f:
            for line in f:
                line = line.strip()
                if line and not line.startswith("Report Number") and not line.startswith("Business Date") and not line.startswith("Security ID"):
                    # Split the line into columns based on whitespace
                    columns = line.split()
                    data.append(columns)

        if not data:
            print("No valid data found in the file.")
            return pd.DataFrame()

        # Determine the maximum number of columns in the data
        max_columns = max(len(row) for row in data)

        # Create a DataFrame from the parsed data
        df = pd.DataFrame(data, columns=[f"Column_{i}" for i in range(max_columns)])
        print("Loaded data columns:", df.columns)
        print("Loaded data shape:", df.shape)
        return df

    def preprocess_data(self, df):
        # Implement preprocessing steps
        # For example, ensure 'label' column exists
        if 'label' not in df.columns:
            df['label'] = np.random.randint(0, 1000, size=len(df))  # Dummy label for demo only

        # Convert all columns to numeric, coercing errors to NaN
        df = df.apply(pd.to_numeric, errors='coerce')

        # Fill NaN values with 0 or any other appropriate value
        df = df.fillna(0)

        print("Preprocessed data columns:", df.columns)
        print("Preprocessed data shape:", df.shape)
        return df

# Machine Learning Model
class MLModel:
    def __init__(self, data):
        self.data = data

    def train_model(self, data):
        X = self.data.drop('label', axis=1)
        y = self.data['label']
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
        model = RandomForestClassifier()
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        return accuracy_score(y_test, y_pred)

# Monte Carlo Simulation
class MonteCarloSimulation:
    def __init__(self, model, num_simulations=1000):
        self.model = model
        self.num_simulations = num_simulations

    def run_simulation(self, data):
        results = []
        for _ in range(self.num_simulations):
            simulated_data = self.simulate_data(data)
            accuracy = self.model.train_model(simulated_data)
            results.append(accuracy)
        mean_accuracy = np.mean(results)
        std_accuracy = np.std(results)
        normal_dist = np.random.normal(mean_accuracy, std_accuracy, self.num_simulations)
        return mean_accuracy, std_accuracy, normal_dist

    def simulate_data(self, data):
        # Implement data simulation logic
        return data

# Task Management
class TaskManager:
    def __init__(self):
        self.task_queue = Queue()

    def add_task(self, task):
        self.task_queue.put(task)

    def get_task(self):
        return self.task_queue.get()

# Worker Task
def worker_task(worker_id, file_path):
    data_handler = DataHandler(file_path)
    data = data_handler.load_data()
    if data.empty:
        print(f"No data loaded for worker {worker_id}")
        return None, None, None
    preprocessed_data = data_handler.preprocess_data(data)
    if preprocessed_data.empty:
        print(f"No data after preprocessing for worker {worker_id}")
        return None, None, None
    model = MLModel(preprocessed_data)
    monte_carlo = MonteCarloSimulation(model)
    mean_accuracy, std_accuracy, normal_dist = monte_carlo.run_simulation(preprocessed_data)
    return mean_accuracy, std_accuracy, normal_dist

from datetime import datetime

def parse_trade(trade: dict, market: dict = None) -> dict:
    """
    Example data from hitbtc3.py
    """
    # Helper functions to safely extract values
    def safe_string(d, key, default=None):
        return str(d[key]) if key in d else default

    def safe_value(d, key, default=None):
        return d[key] if key in d else default

    def parse8601(timestamp):
        return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ')

    # Extract values from the trade dictionary
    timestamp = parse8601(safe_string(trade, 'timestamp'))
    marketId = safe_string(trade, 'symbol')
    market = market if market else {'symbol': marketId}
    symbol = market['symbol']
    fee = None
    feeCostString = safe_string(trade, 'fee')
    taker = safe_value(trade, 'taker')
    takerOrMaker = 'taker' if taker else 'maker'

    # Construct the parsed trade dictionary
    parsed_trade = {
        'timestamp': timestamp,
        'datetime': timestamp.isoformat(),
        'symbol': symbol,
        'id': safe_string(trade, 'id'),
        'order': safe_string(trade, 'order_id'),
        'type': None,
        'side': safe_string(trade, 'side'),
        'price': float(safe_string(trade, 'price')),
        'amount': float(safe_string(trade, 'quantity')),
        'cost': float(safe_string(trade, 'price')) * float(safe_string(trade, 'quantity')),
        'fee': {
            'cost': float(feeCostString),
            'currency': marketId.split('_')[1] if '_' in marketId else None,
        } if feeCostString else None,
        'takerOrMaker': takerOrMaker,
        'info': trade,
    }

    return parsed_trade

# Example data from hitbtc3.py
trade_example = {
    "id": 4718564,
    "order_id": 58730811958,
    "client_order_id": "475c47d97f867f09726186eb22b4c3d4",
    "symbol": "BTCUSDT_PERP",
    "side": "sell",
    "quantity": "0.0001",
    "price": "41118.51",
    "fee": "0.002055925500",
    "timestamp": "2022-03-17T05:23:17.795Z",
    "taker": True,
    "position_id": 2350122,
    "pnl": "0.002255000000",
    "liquidation": False
}

# Main Function
def main():
    file_path = '/content/sample_data/report550_2023-12-04.txt'
    if not os.path.exists(file_path):
        print(f"File {file_path} does not exist.")
        return

    num_workers = 4
    with multiprocessing.Pool(num_workers) as pool:
        results = pool.starmap(worker_task, [(i, file_path) for i in range(num_workers)])
    print(f'Aggregated Results: {results}')

if __name__ == '__main__':
    main()

parsed_trade = parse_trade(trade_example)
print(parsed_trade)

Loaded data columns: Index(['Column_0', 'Column_1', 'Column_2', 'Column_3', 'Column_4', 'Column_5',
       'Column_6', 'Column_7', 'Column_8', 'Column_9', 'Column_10',
       'Column_11', 'Column_12', 'Column_13', 'Column_14', 'Column_15',
       'Column_16', 'Column_17'],
      dtype='object')Loaded data columns:Loaded data columns:
 Loaded data columns:Loaded data shape: Index(['Column_0', 'Column_1', 'Column_2', 'Column_3', 'Column_4', 'Column_5',
       'Column_6', 'Column_7', 'Column_8', 'Column_9', 'Column_10',
       'Column_11', 'Column_12', 'Column_13', 'Column_14', 'Column_15',
       'Column_16', 'Column_17'],
      dtype='object') (19803, 18)
Index(['Column_0', 'Column_1', 'Column_2', 'Column_3', 'Column_4', 'Column_5',
       'Column_6', 'Column_7', 'Column_8', 'Column_9', 'Column_10',
       'Column_11', 'Column_12', 'Column_13', 'Column_14', 'Column_15',
       'Column_16', 'Column_17'],
      dtype='object') Loaded data shape:

Index(['Column_0', 'Column_1', 'Column_2',

**<h1>Conclusion: This design ensures that each worker operates independently, processes its own data, and contributes to the overall machine learning task without data dependency issues. By following best practices for isolated environments, data handling, task management, multiprocessing, machine learning, and aggregation, we can achieve efficient and reliable data analytics.</h1>**