In [3]:
import csv
import statistics
import numpy as np
from datetime import datetime
from collections import defaultdict
from scipy.stats import kurtosis, skew, linregress
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import pearsonr
import matplotlib.dates as mdates
import os
import random
import csv

In [None]:
df = pd.read_csv('dataset.csv')
df = df.loc[:, df.columns.intersection(["block_time", "address", "volume", "fee", "pnl", "return", "liquidation"])]
# Only the needed bit of time value
df["block_time"] = df["block_time"].apply(lambda x: x[:10])

In [5]:
# Function to normalize a list of values to the range [0, 1]
def normalize_values(values):
    # Find the minimum and maximum values in the list
    min_value, max_value = min(values), max(values)
    
    # Normalize each value using the min-max scaling formula
    normalized_values = [(value - min_value) / (max_value - min_value) for value in values]
    
    return normalized_values

# Function to generate random weights for a given number of items, ensuring they sum to 1
def generate_weights(n, min_value=0.0, max_value=1.0):
    # Check if the provided range of weights is feasible
    if min_value * n > 1 or max_value * n < 1:
        return "Weights are not feasible."

    remaining_weights = 1  # Initialize the remaining weight to be distributed
    weights = []  # Initialize an empty list to store the generated weights

    # Generate 'n-1' random weights to ensure their sum is less than or equal to 1
    for _ in range(n - 1):
        weight = random.uniform(min_value, min(remaining_weights, max_value))
        weights.append(weight)  # Append the generated weight to the list
        remaining_weights -= weight  # Update the remaining weight

    # Add the last weight to ensure that all weights sum to exactly 1
    weights.append(remaining_weights)

    # Shuffle the weights to randomize their order
    random.shuffle(weights)

    return weights


In [7]:
# Define a class for regression tests
class RegressionTest:
    
    # Constructor to initialize the class with data
    def __init__(self, data):
        self.data = data
    
    # Function to calculate the regression coefficient for a given address
    def calculate_regression_coefficient(self, address):
        # Filter the data for the specified address
        df_filtered = self.data[self.data["address"] == address]
        
        # Create x and y arrays for regression
        x = np.arange(1, len(df_filtered) + 1)
        y = np.cumsum(df_filtered['return'])
        
        # Calculate linear regression statistics
        slope, intercept, r_value, p_value, std_err = linregress(x, y)
        
        # Normalize the slope
        normalized_slope = (np.rad2deg(np.arctan(slope)) + 90) / 180
        
        # Calculate the regression coefficient
        coefficient = (r_value ** 2 * normalized_slope) / (std_err + 1)
        
        # Adjust the coefficient if needed
        coefficient /= (1 - p_value) if 1 - p_value != 0 else 1
        
        return coefficient

    # Function to calculate and normalize regression coefficients for all unique addresses
    def Normalized_Regression_coefficient(self):
        addresses = self.data["address"].unique()
        regression_coefficient_values = [self.calculate_regression_coefficient(address) for address in addresses]
        normalized_regression_coefficient_values = normalize_values(regression_coefficient_values)
        return dict(zip(addresses, normalized_regression_coefficient_values))
    
    # Function to add the normalized regression coefficients to a CSV file
    def addToCSV(self, file):
        data = []
        NRC = self.Normalized_Regression_coefficient()
        
        # Create a list of dictionaries for CSV writing
        for i in range(len(NRC)):
            dic = {}
            dic["Address"] = list(NRC.keys())[i]
            dic["Normalized Regression Coefficient"] = list(NRC.values())[i]
            data.append(dic)
        
        # Write the data to the CSV file
        with open(file, 'w', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=data[0].keys())
            writer.writeheader()
            writer.writerows(data)

# Create an instance of the RegressionTest class with the provided DataFrame 'df'
RT = RegressionTest(df)

In [8]:
# Define a class for risk tests
class RiskTest:
    
    # Constructor to initialize the class with data
    def __init__(self, data):
        self.data = data
    
    # Function to calculate the Coefficient of Variation (CV) for negative returns of a given address
    def calculate_negative_CV_returns(self, address):
        # Extract negative returns for the specified address
        returns = self.data[self.data["address"] == address]["return"].values.tolist()
        negative_returns = [r for r in returns if r < 0]
        
        # Calculate mean and standard deviation of negative returns
        mean_negative_returns = np.mean(negative_returns)
        std_dev_negative_returns = np.std(negative_returns)
        
        # Calculate CV for negative returns
        cv_negative_returns = std_dev_negative_returns / mean_negative_returns
        return cv_negative_returns
        
    # Function to normalize CV values for negative returns of all unique addresses
    def Normalized_CV_negative_returns(self):
        addresses = self.data["address"].unique()
        cv_negative_returns = [self.calculate_negative_CV_returns(address) for address in addresses]
        normalized_cv_negative_returns = normalize_values(cv_negative_returns)
        return dict(zip(addresses, normalized_cv_negative_returns))

    # Function to calculate Value at Risk (VaR) for a given address and confidence level
    def calculate_var(self, address, confidence_level):
        df_filtered = self.data[self.data["address"] == address]
        returns = np.array(df_filtered["return"])
        sorted_returns = np.sort(returns)
        index = int(np.floor(confidence_level * len(sorted_returns)))
        var = sorted_returns[index]
        return var

    # Function to normalize VaR values for all unique addresses at a specified confidence level
    def Normalized_VaR(self, confidence_level=0.05):
        addresses = self.data["address"].unique()
        var_values = [self.calculate_var(address, confidence_level) for address in addresses]
        normalized_var_values = normalize_values(var_values)
        return dict(zip(addresses, normalized_var_values))
    
    # Function to calculate Conditional Value at Risk (cVaR) for a given address and confidence level
    def calculate_cvar(self, address, confidence_level):
        df_filtered = self.data[(self.data["address"] == address) & (self.data["pnl"] < 0)]
        returns = np.array(df_filtered["return"])
        cvar = np.nan if len(returns) == 0 else np.mean(returns)
        return cvar

    # Function to normalize cVaR values for all unique addresses at a specified confidence level
    def Normalized_cVaR(self, confidence_level=0.05):
        addresses = self.data["address"].unique()
        cvar_values = [self.calculate_cvar(address, confidence_level) for address in addresses]
        normalized_cvar_values = normalize_values(cvar_values)
        return dict(zip(addresses, normalized_cvar_values))
    
    # Function to calculate the probability of liquidation for a given address
    def calculate_liquidation_chance(self, address):
        df_filtered = self.data[self.data["address"] == address]
        total_trades = len(df_filtered)
        liquidation_counts = df_filtered["liquidation"].apply(lambda x: 1 if x == "Yes" else 0).sum()
        liquidation_chance = liquidation_counts / total_trades
        return liquidation_chance

    # Function to normalize liquidation chances for all unique addresses
    def Normalized_Liquidation_chance(self):
        addresses = self.data["address"].unique()
        liquidation_chance_values = [self.calculate_liquidation_chance(address) for address in addresses]
        normalized_liquidation_chance_values = normalize_values(liquidation_chance_values)    
        return dict(zip(addresses, normalized_liquidation_chance_values))
    
    # Function to calculate the maximum drawdown for a given address
    def calculate_max_drawdown(self, address):
        filtered_data = self.data[self.data['address'] == address]
        cumulative_returns = np.cumprod(1 + filtered_data['return']) - 1
        peaks = np.maximum.accumulate(cumulative_returns)
        troughs = np.minimum.accumulate(cumulative_returns)
        drawdowns = peaks - troughs
        max_drawdown = np.max(drawdowns)
        return max_drawdown
    
    # Function to normalize maximum drawdown values for all unique addresses
    def Normalized_Max_Drawdown(self):
        values = normalize_values([self.calculate_max_drawdown(address) for address in self.data["address"].unique()])
        risk_df = {}
        for i, address in enumerate(self.data["address"].unique()):
            risk_df[address] = values[i]
        return risk_df
    
    # Function to add risk test results to a CSV file
    def addToCSV(self, file):
        data = []
        NCVNR = self.Normalized_CV_negative_returns()
        NV = self.Normalized_VaR()
        NCV = self.Normalized_cVaR()
        NLC = self.Normalized_Liquidation_chance()
        current = 0
        weights = [0.3, 0.3, 0.3, 0.1]
        dic = {}
        for i in range(len(NV)):
            dic["Address"] = list(NV.keys())[i]
            dic["Normalized VaR"] = list(NV.values())[i]
            dic["Normalized CVaR"] = list(NCV.values())[i]
            dic["Normalized Liquidation Chance"] = list(NLC.values())[i]
            dic["Weighted"] = (
                list(NV.values())[i] * weights[1] +
                list(NCV.values())[i] * weights[2] +
                list(NLC.values())[i] * weights[0]
            )
        with open(file, 'w', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=dic[0].keys())
            writer.writeheader()
            writer.writerows(dic)

    # Function to run multiple simulations with different weights and find the best combination
    def runMultiple(self, simulations=100):
        NCVNR = self.Normalized_CV_negative_returns()
        NV = self.Normalized_VaR()
        NCV = self.Normalized_cVaR()
        NLC = self.Normalized_Liquidation_chance()
        data = []
        for j in range(simulations):
            weights = generate_weights(4, 0.20, 0.45)
            dic = {}
            for key in NCV.keys():
                dic[key] = (
                    NCVNR[key] * weights[0] +
                    NV[key] * weights[1] +
                    NCV[key] * weights[2] +
                    NLC[key] * weights[3]
                )
            data.append(dic)
        drawdowns = self.Normalized_Max_Drawdown()
        drawdowns = {key: drawdowns[key] for key in data[0]}
        curr, currentMinimumMSE = None, float('inf')
        for nvs in data:
            totalSquaredError = sum([(nvs[key] - drawdowns[key]) ** 2 for key, value in nvs.items()])
            meanSquaredError = totalSquaredError / len(nvs) 
            if meanSquaredError < currentMinimumMSE:
                currentMinimumMSE, curr = meanSquaredError, nvs
        print(weights)
        
    # Function to rank the weights based on their mean squared errors
    def rankWeights(self):
        NCVNR = self.Normalized_CV_negative_returns()
        NV = self.Normalized_VaR()
        NCV = self.Normalized_cVaR()
        NLC = self.Normalized_Liquidation_chance()
        drawdowns = self.Normalized_Max_Drawdown()
        drawdowns = {key: drawdowns[key] for key in NV.keys()}
        
        dictionary = {}
        dictionary["NCVNR"] = sum([(NCVNR[key] - drawdowns[key]) ** 2 for key in NCVNR.keys()]) / len(NCVNR)
        dictionary["NV"] = sum([(NV[key] - drawdowns[key]) ** 2 for key in NV.keys()]) / len(NV)
        dictionary["NCV"] = sum([(NCV[key] - drawdowns[key]) ** 2 for key in NCV.keys()]) / len(NCV)
        dictionary["NLC"] = sum([(NLC[key] - drawdowns[key]) ** 2 for key in NLC.keys()]) / len(NLC)
        
        # Sort the weights based on mean squared errors
        dictionary = dict(sorted(dictionary.items(), key=lambda item: item[1]))
        
        for i, item in enumerate(dictionary.items()):
            print(f"{i + 1} - {item[0]}:{item[1]}")

# Create an instance of the RiskTest class with the provided DataFrame 'df'
RT = RiskTest(df)

1 - NLC:0.047993521956239016
2 - NCVNR:0.4523533051742929
3 - NCV:0.7907727419680378
4 - NV:0.8839590500515858


In [12]:
# Define a class for profitability tests
class ProfitabilityTest:
    
    # Constructor to initialize the class with data
    def __init__(self, data):
        self.data = data
        
    # Function to calculate PNL to volume ratio for a given address
    def calculate_pnltovolume(self, address):
        df_filtered = self.data[self.data["address"] == address]
        if len(df_filtered) == 0:
            return None
        total_pnl = sum(df_filtered['pnl'])
        total_volume = sum(df_filtered['volume'])    
        return total_pnl / total_volume

    # Function to normalize PNL to volume ratios for all unique addresses
    def Normalized_Pnltovolume_y(self):
        addresses = self.data['address'].unique()
        pnltovolume_values = [self.calculate_pnltovolume(address) for address in addresses]
        normalized_pnltovolume_values = normalize_values(pnltovolume_values)
        return dict(zip(addresses, normalized_pnltovolume_values))
    
    # Function to calculate PNL to collateral ratio for a given address
    def calculate_pnltocollateral(self, address):
        grouped = self.data.groupby("address").agg({"pnl": "sum", "volume": "sum"})
        grouped["pnltocollateral"] = grouped["pnl"] / grouped["volume"]
        return grouped.loc[address, "pnltocollateral"]

    # Function to normalize PNL to collateral ratios for all unique addresses
    def Normalized_Pnltocollateral(self):
        addresses = self.data["address"].unique()
        pnltocollateral_values = [self.calculate_pnltocollateral(address) for address in addresses]
        normalized_pnltocollateral_values = normalize_values(pnltocollateral_values)
        return dict(zip(addresses, normalized_pnltocollateral_values))
    
    # Function to calculate expected return for a given address
    def calculate_expected_return(self, address):
        expected_return = self.data[self.data["address"] == address]["return"].mean()
        return expected_return

    # Function to normalize expected return values for all unique addresses
    def Normalized_Expected_return_y(self):
        addresses = self.data["address"].unique()
        expected_return_values = [self.calculate_expected_return(address) for address in addresses]
        normalized_expected_return_values = normalize_values(expected_return_values)
        return dict(zip(addresses, normalized_expected_return_values))
    
    # Function to calculate Sharpe ratio for a given address
    def calculate_sharpe_ratio(self, address, risk_free_rate=0.02):
        filtered_data = self.data[self.data['address'] == address]
        excess_returns = filtered_data['return'] - risk_free_rate
        sharpe_ratio = np.mean(excess_returns) / np.std(excess_returns)
        return sharpe_ratio
    
    # Function to normalize Sharpe ratios for all unique addresses
    def Normalized_Sharpe_Ratio(self):
        values = normalize_values([self.calculate_sharpe_ratio(address) for address in self.data["address"].unique()])
        profitability_df = {}
        for i, address in enumerate(self.data["address"].unique()):
            profitability_df[address] = values[i]
        return profitability_df
    
    # Function to add profitability test results to a CSV file
    def addToCSV(self, file):
        data = []
        NPV = self.Normalized_Pnltovolume_y()
        NPC = self.Normalized_Pnltocollateral()
        NER = self.Normalized_Expected_return_y()
        current = 0
        weights = [0.3, 0.3, 0.4]
        for i in range(len(NPV)):
            dic = {}
            dic["Address"] = list(NPV.keys())[i]
            dic["Normalized PNL To Volume"] = list(NPV.values())[i]
            dic["Normalized PNL To Collateral"] = list(NPC.values())[i]
            dic["Normalized Expected Return"] = list(NER.values())[i]
            dic["Weighted Value"] = (
                list(NPV.values())[i] * weights[0] +
                list(NPC.values())[i] * weights[1] +
                list(NER.values())[i] * weights[2]
            )
            data.append(dic)
        with open(file, 'w', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=data[0].keys())
            writer.writeheader()
            writer.writerows(data)
    
    # Function to run multiple simulations with different weights and find the best combination
    def runMultiple(self, simulations=100):
        NPV = self.Normalized_Pnltovolume_y()
        NPC = self.Normalized_Pnltocollateral()
        NER = self.Normalized_Expected_return_y()
        data = []
        for j in range(simulations):
            weights = generate_weights(3, 0.275, 0.425)
            dic = {}
            for key in NPV.keys():
                dic[key] = (
                    NPV[key] * weights[0] +
                    NPC[key] * weights[1] +
                    NER[key] * weights[2]
                )
            data.append(dic)
        sharpeRatios = self.Normalized_Sharpe_Ratio()
        sharpeRatios = {key: sharpeRatios[key] for key in data[0]}
        curr, currentMinimumMSE = None, float('inf')
        for nvs in data:
            totalSquaredError = sum([(nvs[key] - sharpeRatios[key]) ** 2 for key, value in nvs.items()])
            meanSquaredError = totalSquaredError / len(nvs) 
            if meanSquaredError < currentMinimumMSE:
                currentMinimumMSE, curr = meanSquaredError, nvs
        print(weights)
        
    # Function to rank the weights based on their mean squared errors
    def rankWeights(self):
        NPV = self.Normalized_Pnltovolume_y()
        NPC = self.Normalized_Pnltocollateral()
        NER = self.Normalized_Expected_return_y()
        sharpeRatios = self.Normalized_Sharpe_Ratio()
        sharpeRatios = {key: sharpeRatios[key] for key in NPV.keys()}
        
        dictionary = {}
        dictionary["NPV"] = sum([(NPV[key] - sharpeRatios[key]) ** 2 for key in NPV.keys()]) / len(NPV)
        dictionary["NPC"] = sum([(NPC[key] - sharpeRatios[key]) ** 2 for key in NPC.keys()]) / len(NPC)
        dictionary["NER"] = sum([(NER[key] - sharpeRatios[key]) ** 2 for key in NER.keys()]) / len(NER)
        
        # Sort the weights based on mean squared errors
        dictionary = dict(sorted(dictionary.items(), key=lambda item: item[1]))
        
        for i, item in enumerate(dictionary.items()):
            print(f"{i + 1} - {item[0]}:{item[1]}")
            
            
# Create an instance of the ProfitabilityTest class with the provided DataFrame 'df'
PT = ProfitabilityTest(df)

1 - NPV:0.021273964493066882
2 - NPC:0.021273964493066892
3 - NER:0.028792707927005372
