In [None]:
# pyright: reportMissingImports=false
# pyright: reportMissingModuleSource=false

import uuid
import random
import hashlib
import os
import sys
import time
import logging
import datetime
import json
from datetime import datetime, timedelta
import yaml
import pytest
import importlib
from dotenv import load_dotenv
import pandas as pd
import numpy as np
import requests
import pandas_gbq
from sklearn.model_selection import ParameterGrid, ParameterSampler
from scipy.signal import argrelextrema
from dreams_core.googlecloud import GoogleCloud as dgc
from dreams_core import core as dc
import matplotlib.pyplot as plt
import seaborn as sns
import progressbar


# load dotenv
load_dotenv()


# import local files if necessary
# pyright: reportMissingImports=false
sys.path.append('..//src')
import training_data as td
importlib.reload(td)
import feature_engineering as fe
importlib.reload(fe)
import coin_wallet_metrics as cwm
importlib.reload(cwm)
import modeling as m
importlib.reload(m)
import insights as i
importlib.reload(i)
import utils as u
importlib.reload(u)


# configure logger
logger = dc.setup_logger()
logger.setLevel(logging.DEBUG)

# Custom format function for displaying numbers
pd.set_option('display.float_format', lambda x: f'{x:.12g}')
# pd.reset_option('display.float_format')


# Load all configs as global variables
global CONFIG, METRICS_CONFIG, MODELING_CONFIG, EXPERIMENTS_CONFIG, MODELING_FOLDER

CONFIG = u.load_config('../config/config.yaml')
METRICS_CONFIG = u.load_config('../config/metrics_config.yaml')
MODELING_CONFIG = u.load_config('../config/modeling_config.yaml')
EXPERIMENTS_CONFIG = u.load_config('../config/experiments_config.yaml')
MODELING_FOLDER = MODELING_CONFIG['modeling']['modeling_folder']
modeling_folder = MODELING_FOLDER

In [None]:
importlib.reload(td)
importlib.reload(cwm)
importlib.reload(fe)
importlib.reload(m)
importlib.reload(i)
importlib.reload(u)
config = u.load_config('../config/config.yaml')
metrics_config = u.load_config('../config/metrics_config.yaml')
modeling_config = u.load_config('../config/modeling_config.yaml')
experiments_config = u.load_config('../config/experiments_config.yaml')


## Overall Sequencing

In [None]:
importlib.reload(td)
importlib.reload(cwm)
importlib.reload(fe)
importlib.reload(m)
importlib.reload(i)
importlib.reload(u)
config = u.load_config('../config/config.yaml')
metrics_config = u.load_config('../config/metrics_config.yaml')
modeling_config = u.load_config('../config/modeling_config.yaml')
experiments_config = u.load_config('../config/experiments_config.yaml')
logger.setLevel(logging.INFO)


start_date = config['training_data']['training_period_start']
end_date = config['training_data']['modeling_period_end']

# Retrieve market data
market_data_df = td.retrieve_market_data()
market_data_df, _ = cwm.split_dataframe_by_coverage(market_data_df, start_date, end_date, id_column='coin_id')
prices_df = market_data_df[['coin_id','date','price']].copy()

# retrieve profits data
profits_df = td.retrieve_profits_data(start_date, end_date)
profits_df, _ = cwm.split_dataframe_by_coverage(profits_df, start_date, end_date, id_column='coin_id')
profits_df, _ = td.clean_profits_df(profits_df, config['data_cleaning'])


# remove records from market_data_df that don't have transfers if configured to do so
if config['data_cleaning']['exclude_coins_without_transfers']:
    market_data_df = market_data_df[market_data_df['coin_id'].isin(profits_df['coin_id'])]


In [None]:
# profits_df_full = profits_df.copy(deep=True)
# market_data_df_full = market_data_df.copy(deep=True)
# prices_df_full = prices_df.copy(deep=True)

profits_df = profits_df_full.copy(deep=True)
market_data_df = market_data_df_full.copy(deep=True)
prices_df = prices_df_full.copy(deep=True)

In [None]:
importlib.reload(td)

In [None]:

profits_df = profits_df_full.copy(deep=True)
prices_df = prices_df_full.copy(deep=True)
target_date = '2024-08-31'

profits_df_filled = td.impute_profits_df_rows(profits_df,prices_df,target_date)



In [None]:
def create_partitions(profits_df, n_partitions):
    """
    Partition a DataFrame into multiple subsets based on unique coin_ids.

    Parameters:
    - profits_df (pd.DataFrame): The input DataFrame to be partitioned. Must contain
        a 'coin_id' column.
    - n_partitions (int): The number of partitions to create.

    Returns:
    - partition_dfs (List[pd.DataFrame]): A list of DataFrames, each representing
        a partition of the original data.
    """
    # Get unique coin_ids and convert to a regular list
    unique_coin_ids = profits_df['coin_id'].unique().tolist()

    # Shuffle the list of coin_ids
    np.random.seed(88)
    np.random.shuffle(unique_coin_ids)

    # Calculate the number of coin_ids per partition
    coins_per_partition = len(unique_coin_ids) // n_partitions

    # Create partitions
    partition_dfs = []
    for i in range(n_partitions):
        start_idx = i * coins_per_partition
        end_idx = start_idx + coins_per_partition if i < n_partitions - 1 else None
        partition_coin_ids = unique_coin_ids[start_idx:end_idx]

        # Create a boolean mask for the current partition
        mask = profits_df['coin_id'].isin(partition_coin_ids)

        # Add the partition to the list
        partition_dfs.append(profits_df[mask])

    return partition_dfs


n_partitions = 6
partitions = create_partitions(profits_df, n_partitions)

In [None]:
partitions[0].shape

In [None]:
logger.level

In [None]:
import threading
import queue
import pandas as pd
from functools import partial


def worker(partition, prices_df, target_date, result_queue):
    """
    Worker function to process a partition and put the result in the queue.
    """
    result = td.impute_profits_df_rows(partition, prices_df, target_date)
    result_queue.put(result)

def multithreaded_impute_profits(partitions, prices_df, target_date):
    """
    Process partitions using multithreading and merge results.

    Args:
        partitions (list): List of DataFrame partitions
        prices_df (pd.DataFrame): DataFrame containing price information
        target_date (str or datetime): The date for which to impute rows

    Returns:
        pd.DataFrame: Merged result of all processed partitions
    """
    # Create a thread-safe queue to store results
    result_queue = queue.Queue()

    # Create a list to hold thread objects
    threads = []

    # Create and start a thread for each partition
    for partition in partitions:
        thread = threading.Thread(
            target=worker,
            args=(partition, prices_df, target_date, result_queue)
        )
        thread.start()
        threads.append(thread)

    # Wait for all threads to complete
    for thread in threads:
        thread.join()

    # Collect results
    results = []
    while not result_queue.empty():
        results.append(result_queue.get())

    # Merge results
    merged_result = pd.concat(results, ignore_index=True)

    return merged_result


result = multithreaded_impute_profits(partitions, prices_df, target_date)

# Any post-processing on the result can be done here
print(f"Processed DataFrame shape: {result.shape}")


In [None]:
def test_partition_performance(profits_df, prices_df, target_date, partition_numbers):
    """
    Test the performance of the multithreaded_impute_profits function with different numbers of partitions.

    This function iterates through the provided partition numbers, running the multithreaded_impute_profits
    function for each. It measures the execution time, logs the result size, and generates a performance plot.

    Args:
        profits_df (pd.DataFrame): The input DataFrame containing profit information.
        prices_df (pd.DataFrame): The input DataFrame containing price information.
        target_date (str or datetime): The target date for imputation.
        partition_numbers (list of int): A list of partition numbers to test.

    Returns:
        list of tuple: A list of (partition_number, execution_time) tuples.

    Raises:
        ValueError: If the size of any result DataFrame differs from the others.

    Side effects:
        - Prints the execution time for each partition number to the console.
        - Logs the size of the result DataFrame for each partition number.
        - Generates and displays a plot of execution time vs. number of partitions.
    """
    results = []
    expected_size = None

    for n_partitions in partition_numbers:
        start_time = time.time()

        partitions = create_partitions(profits_df, n_partitions)
        result = multithreaded_impute_profits(partitions, prices_df, target_date)

        # Check size consistency
        current_size = result.shape[0]
        if expected_size is None:
            expected_size = current_size
        elif current_size != expected_size:
            raise ValueError(f"Inconsistent result size detected. Expected {expected_size} rows, "
                             f"but got {current_size} rows for {n_partitions} partitions.")

        end_time = time.time()
        execution_time = end_time - start_time

        results.append((n_partitions, execution_time))
        logger.info("Partitions: %s, Result Shape: %s , Time: %.2f seconds"
                    ,n_partitions
                    ,result.shape
                    ,execution_time)

    # Generate the plot
    partitions, times = zip(*results)
    plt.plot(partitions, times, marker='o')
    plt.xlabel('Number of Partitions')
    plt.ylabel('Execution Time (seconds)')
    plt.title('Performance vs Number of Partitions')
    plt.show()

    return results

# Assuming profits_df, prices_df, and target_date are already defined
partition_numbers = [11,12,13,16,22,24]
results = test_partition_performance(profits_df, prices_df, target_date, partition_numbers)

# Optional: Plot results
import matplotlib.pyplot as plt

partitions, times = zip(*results)
plt.plot(partitions, times, marker='o')
plt.xlabel('Number of Partitions')
plt.ylabel('Execution Time (seconds)')
plt.title('Performance vs Number of Partitions')
plt.show()

## Junkyard

In [None]:
# Create efficient columns
profits_df['coin_id'] = profits_df['coin_id'].astype('category')
coin_id_mapping = dict(enumerate(profits_df['coin_id'].cat.categories))
profits_df['coin_id'] = profits_df['coin_id'].cat.codes.astype('int16')

# Convert date column to store the difference in days relative to target_date
profits_df['date'] = (profits_df['date'] - target_date).dt.days.astype('int16

In [None]:
# # vars
# target_date = '2024-08-31'
# # new_rows_df = generate_new_row(profits_df, prices_df, target_date)

# target_date = pd.to_datetime(target_date)

# # # Create efficient indexes
# # profits_df = profits_df.set_index(['coin_id', 'wallet_address', 'date']).copy(deep=True)
# # prices_df = prices_df.set_index(['coin_id', 'date']).copy(deep=True)

# # # Identify pairs needing new rows
# # logger.debug('Identifying pairs that need a row for %s...', target_date)
# # all_pairs = profits_df.index.droplevel('date').unique()
# # existing_pairs = profits_df.loc(axis=0)[:, :, target_date].index.droplevel('date')
# # pairs_needing_rows = all_pairs.difference(existing_pairs)
# # logger.debug('Identified %s pairs that will need rows imputed.', len(pairs_needing_rows))


# new_rows = []

# logger.debug('Imputing new rows...')
# for coin_id, wallet_address in pairs_needing_rows:
#     # Get most recent record
#     recent_record = profits_df.loc[coin_id, wallet_address].loc[:target_date].iloc[-1]

#     # Get prices
#     price_previous = prices_df.loc[(coin_id, recent_record.name), 'price']
#     price_current = prices_df.loc[(coin_id, target_date), 'price']

#     # Calculate new values
#     price_ratio = price_current / price_previous
#     new_usd_balance = recent_record['usd_balance'] * price_ratio
#     profits_change = new_usd_balance - recent_record['usd_balance']
#     profits_cumulative = recent_record['profits_cumulative'] + profits_change

#     new_row = {
#         'coin_id': coin_id,
#         'wallet_address': wallet_address,
#         'date': target_date,
#         'profits_change': profits_change,
#         'profits_cumulative': profits_cumulative,
#         'usd_balance': new_usd_balance,
#         'usd_net_transfers': 0,
#         'usd_inflows': 0,
#         'usd_inflows_cumulative': recent_record['usd_inflows_cumulative'],
#         'total_return': profits_cumulative / max(recent_record['usd_inflows_cumulative'], 0.01)
#     }

#     new_rows.append(new_row)

# new_rows_df = pd.DataFrame(new_rows)

# logger.debug('Generated new_rows_df with shape %s.', new_rows_df.shape)


In [None]:
# Get the most recent data for pairs needing rows
most_recent_data = profits_df.loc[pairs_needing_rows]
most_recent_data = most_recent_data.groupby(level=['coin_id', 'wallet_address']).last().reset_index()

# Ensure the date column is properly formatted
most_recent_data['date'] = pd.to_datetime(most_recent_data['date'])

# Reset index of prices_df for the merge operation
prices_df_reset = prices_df.reset_index()

# Perform asof merge to get the most recent price before or on the date of each record
merged_data = pd.merge_asof(most_recent_data.sort_values('date'),
                            prices_df_reset.sort_values('date'),
                            on='date',
                            by='coin_id',
                            direction='backward')

# Now get the price at the target date
target_prices = prices_df.loc(axis=0)[:, target_date].reset_index()
target_prices = target_prices.rename(columns={'price': 'target_price'})

# Merge the target prices
merged_data = pd.merge(merged_data, target_prices[['coin_id', 'target_price']], on='coin_id', how='left')

# Calculate price ratio
merged_data['price_ratio'] = merged_data['target_price'] / merged_data['price']

logger.debug('Merged data shape: %s', merged_data.shape)
logger.debug('Merged data columns: %s', merged_data.columns.tolist())

In [None]:
# Get the most recent row for each pair needing a new row
most_recent_data = profits_df.loc[profits_df.index.isin(pairs_needing_rows, level=['coin_id', 'wallet_address'])]
# most_recent_data = most_recent_data.groupby(level=['coin_id', 'wallet_address']).last().reset_index()

# # Ensure the date column is properly formatted
# most_recent_data['date'] = pd.to_datetime(most_recent_data['date'])
# prices_df['date'] = pd.to_datetime(prices_df['date'])

# # Perform asof merge to get the most recent price before or on the date of each record
# merged_data = pd.merge_asof(most_recent_data.sort_values('date'),
#                             prices_df[['date', 'coin_id', 'price']].sort_values('date'),
#                             on='date',
#                             by='coin_id',
#                             direction='backward')

# # Now get the price at the target date
# target_prices = prices_df[prices_df['date'] == target_date][['coin_id', 'price']]
# target_prices = target_prices.rename(columns={'price': 'target_price'})

# # Merge the target prices
# merged_data = pd.merge(merged_data, target_prices, on='coin_id', how='left')

# # Calculate price ratio
# merged_data['price_ratio'] = merged_data['target_price'] / merged_data['price']


## tests failing