In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from numpy import arange
import seaborn as sns
from collections import Counter
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split, cross_validate, GridSearchCV
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression, HuberRegressor, LassoCV, Lasso, Ridge
from xgboost.sklearn import XGBRegressor
from sklearn.metrics import confusion_matrix
from typing import List
# display formatting for floats
pd.options.display.float_format = '{:,.2f}'.format

token_id = [9093, 6485, 2578]
collection = 'boredapeyachtclub.csv'

In [2]:
raw = pd.read_csv(collection, parse_dates=['SaleDate']).rename(columns={'Unnamed: 0': 'TokenId'})
df = raw.loc[(raw['USDPrice'] != 0) & (~raw['LastSalePrice'].isna())]
df = df.sort_values(['SaleDate']).reset_index(level=0, drop=True)
df.fillna(0,inplace=True)

In [3]:
ts = df.groupby('SaleDate').agg({'USDPrice': 'mean'})
ts_outrm = ts.loc[ts['USDPrice'] < 10e5]
log_price = np.log(ts_outrm['USDPrice'])
rolling_median = log_price.rolling(window=14).median()

In [4]:
df['LogUSDPrice'] = np.log(df['USDPrice'])
mean = df['LogUSDPrice'].ewm(span=14).mean()  # exponentially weighted moving average with 14 point window
std = df['LogUSDPrice'].ewm(span=14).std()

mean_plus_std = mean + 1.7*std  # 1.7 worked well
# mean_minus_std = mean - 2*std

# is_outlier = (df['LogUSDPrice'] > mean_plus_std) | (df['LogUSDPrice'] < mean_minus_std)
is_outlier = df['LogUSDPrice'] > mean_plus_std
df['Outlier'] = 1
df.loc[is_outlier, 'Outlier'] = -1

In [5]:
dfo, dfi = df[is_outlier].copy(), df[~is_outlier].copy()
dfs = dfo, dfi
for data in dfs:
    rolling_median = data['LogUSDPrice'].rolling(window=7, min_periods=1).median()
    ewm = data['LogUSDPrice'].ewm(span=14).mean()
    data['LogUSDPriceEWM'] = (rolling_median + ewm) / 2
    # Percentage Extension from the Exponential Weighted Moving Average
    data['PctExtensionEWM'] = data.apply(lambda x: (x['LogUSDPrice'] - x['LogUSDPriceEWM']) / x['LogUSDPriceEWM'], axis=1)

In [6]:
def rolling_periods(data): 
    dfs = dfo, dfi
    for data in dfs:
        rolling_median = data['LogUSDPrice'].rolling(window=7, min_periods=1).median()
        ewm = data['LogUSDPrice'].ewm(span=14).mean()
        data['LogUSDPriceEWM'] = (rolling_median + ewm) / 2
        # Percentage Extension from the Exponential Weighted Moving Average
        data['PctExtensionEWM'] = data.apply(lambda x: (x['LogUSDPrice'] - x['LogUSDPriceEWM']) / x['LogUSDPriceEWM'], axis=1)
    return dfs

def rarity_columns(data): 
    rarity_cols = [c for c in data.columns if 'Rarity' in c]  # get all numeric rarity related cols

    def fill_trait_na(col): 
        fill_value = 1 - col.dropna().unique().sum()
        return fill_value
    for data in rolling_periods(data):
        data[rarity_cols] = data[rarity_cols].apply(lambda x: x.fillna(fill_trait_na(x)))
    return  rarity_cols
#cols = rarity_processing(data)
def traits(data): 
    traits = []
    for cols in rarity_columns(data): 
        col = cols.split("Rarity")
        traits.append(col[0])
    for data in dfs:
        data[traits] = data[traits].fillna('None')
    return traits 
rarity_cols = rarity_columns(data)
traits = traits(df)

In [7]:
def traits(data): 
    traits = []
    for cols in rarity_columns(data): 
        col = cols.split("Rarity")
        traits.append(col[0])
    for data in dfs:
        data[traits] = data[traits].fillna('None')
    return traits 
def top_rarity(df, n=2): 
    cols = [x for x in df.columns if 'Rarity' in x]
    value = pd.DataFrame()
    for col in cols:
        value[col] = 1/df[col]
    sum_total = value.loc['Total'] = value.sum()
    top_two = sum_total.nlargest(n)
    top_traits = top_two.index
    return top_traits 
def rare_traits(df): 
    rarest_trait1 = df[top_rarity(df, n=2)[0].split("Rarity")[0]]
    rarest_trait2 = df[top_rarity(df, n=2)[1].split("Rarity")[0]]
    traits_df = pd.DataFrame({'rarest_trait1':rarest_trait1, 'rarest_trait2':rarest_trait2})

    countnames1 = {}
    for name in rarest_trait1:
        if name in countnames1:
            countnames1[name] += 1
        else:
            countnames1[name] = 1
    rare_trait_1 = min(countnames1, key = countnames1.get)

    countnames2 = {}
    for name in rarest_trait2:
        if name in countnames2:
            countnames2[name] += 1
        else:
            countnames2[name] = 1
    rare_trait_2 = min(countnames2, key = countnames2.get)
    
    return  rare_trait_1,  rare_trait_2 
rarity_1 = top_rarity(df)[0]
rarity_2 = top_rarity(df)[1]
trait_1 = top_rarity(df)[0].split("Rarity")[0]
trait_2 = top_rarity(df)[1].split("Rarity")[0]
rare_trait_1 = rare_traits(df)[0]
rare_trait_2 = rare_traits(df)[1]
rarity_1, rarity_2, trait_1, trait_2, rare_trait_1, rare_trait_2


('Eye ColorRarity',
 'ClothingRarity',
 'Eye Color',
 'Clothing',
 'CLSD GRN Breezy',
 'BLCK-RD VARSITY JCKT')

In [8]:
def has_two_less_1pct(row, cols):
    """Retruns true if the NFT has at least two traits with rarities less than 1%"""
    rarity = row[cols].values
    n = len(rarity[np.where(rarity < 0.01)])
    if n> 1:
        return 1
    return 0
def rarity_1_or_rarity_2(row):
    """Return true if the NFT has clothes or fur with a rarity of less than 1%"""
    if row[rarity_1] < 0.01 or row[rarity_2] < 0.01:
        return 1
    return 0

def trait_1_or_trait_2(row):
    """Return true if the NFT has Black Suit for clothes or Solid Gold fur"""
    if row[trait_1] == rare_trait_1  or  row[trait_2] == rare_trait_2:
        return 1
    return 0

def find_matches(row, categories):
    traits = row[categories].values
    keywords = []
    for trait in traits:
        split = trait.split(' ')
        for word in split:
            keywords.append(word)
    if 'None' in keywords:
        keywords.remove('None')
    counts = Counter(keywords)
    most_common = counts.most_common(1)
    matches = most_common[0][1] - 1
    if matches:
        return 1
    return 0


In [None]:
dfs = rolling_periods(df)
rarity_cols = rarity_columns(df)
traits = traits(df)
for data in dfs:
        data['HasTwoLess1Pct'] = data.apply(lambda x: has_two_less_1pct(x, rarity_cols), axis=1)
        data['Rarity1OrRarity1'] = data.apply(rarity_1_or_rarity_2, axis=1)
        data['Trait1OrTrait2'] = data.apply(trait_1_or_trait_2, axis=1)
        data['HasMatches'] = data.apply(lambda x: find_matches(x, traits), axis=1)

In [None]:
for data in dfs:
    data.loc[(data['HasTwoLess1Pct']==1) | (data['Rarity1OrRarity1']==1) | (data['Trait1OrTrait2']==1), 'OutlierRule'] = -1
    data['OutlierRule'] = data['OutlierRule'].fillna(1)

In [None]:
def outlier_encoding(dfo): 
    categorical_feature_cols = traits 
    clf_features = ['HasTwoLess1Pct','Rarity1OrRarity1', 'Trait1OrTrait2', 'HasMatches']
    enc = OneHotEncoder(handle_unknown='ignore')
    enc_df = pd.DataFrame(enc.fit_transform(dfo[categorical_feature_cols]).toarray())
    enc_df.index = list(enc_df.index)  # convert index to int64 index
    enc_df.columns = enc.get_feature_names_out()
    dfp = dfo.reset_index(drop=True).merge(enc_df.reset_index(drop=True), left_index=True, right_index=True)
    dfo = dfp.drop(columns=categorical_feature_cols)
    return dfo


In [None]:
def inlier_encoding(dfi): 
    categorical_feature_cols = traits 
    clf_features = ['HasTwoLess1Pct','Rarity1OrRarity1', 'Trait1OrTrait2', 'HasMatches']
    enc = OneHotEncoder(handle_unknown='ignore')
    enc_df = pd.DataFrame(enc.fit_transform(dfi[categorical_feature_cols]).toarray())
    enc_df.columns = enc.get_feature_names_out()
    dfp = dfi.reset_index(drop=True).merge(enc_df.reset_index(drop=True), left_index=True, right_index=True)
    dfi = dfp.drop(columns=categorical_feature_cols)
    return dfi


In [None]:
dfo = outlier_encoding(dfo)
dfi = inlier_encoding(dfi)
df = pd.concat([dfi, dfo])
df.fillna(0,inplace=True)
df1 = df.copy()


In [None]:
def token_train_test(df, token_id):  
    features = [feature for feature in df.columns if '_' in feature]
    features.extend(['HasMatches', 'NumberOfSales'])
    token = df.loc[df['TokenId'].isin(token_id)]
    df = df[df['TokenId'].isin(token_id) == False]
    X = df.loc[:, features+['OutlierRule', 'Outlier']]
    y = df['PctExtensionEWM']
    x_token = token.loc[:, features+['OutlierRule', 'Outlier']]
    y_token = token['PctExtensionEWM']
    return X, y, x_token, y_token 

X, y, x_token, y_token  = token_train_test(df, token_id)

In [None]:
def train_test(X, y):  
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)
    X_train_in = X_train[X_train['Outlier']==1].drop(columns=['OutlierRule', 'Outlier'])
    y_train_in = y_train[X_train['Outlier']==1]
    X_train_out = X_train[X_train['Outlier']==-1].drop(columns=['OutlierRule', 'Outlier'])
    y_train_out = y_train[X_train['Outlier']==-1]
    return X_train, X_test, y_train, y_test, X_train_in, y_train_in,  X_train_out,  y_train_out
X_train, X_test, y_train, y_test, X_train_in, y_train_in,  X_train_out,  y_train_out = train_test(X, y)

In [None]:
def model_selection(models: List[dict], scores: List[str], X_train, y_train) -> dict:
    """Find the best model"""
    results = {}
    for candidate in models:
        model = GridSearchCV(
            candidate['constructor'], param_grid=candidate['param_grid'], scoring=scores
        )
        model.fit(X_train, y_train)
        mean_test_score = model.cv_results_['mean_test_score'].mean()
        results[candidate['name']] = mean_test_score
    print("The winner is: {}".format(
        max(results, key=results.get)
    ))
    return results
models = [
    {
        'name': 'Lasso',
        'constructor': Lasso(),
        'param_grid': {'alpha': [0.2, 0.4, 0.6, 0.8, 1.0]}
    },
    {
        'name': 'RandomForest',
        'constructor': RandomForestRegressor(random_state=0),
        'param_grid': {}
    },
    {
        'name': 'Huber',
        'constructor': HuberRegressor(),
        'param_grid': {'epsilon': [10], 'max_iter': [10000]}
    }
]
model_selection(models, 'neg_root_mean_squared_error', X_train, y_train)

In [None]:
from sklearn.ensemble import VotingRegressor
from sklearn import metrics
pd.options.display.float_format = '{:,.2f}'.format

In [None]:
estimators = [('randomforest', RandomForestRegressor(random_state=42, n_estimators=200)),
              ('linear', LassoCV(random_state=42, max_iter=10000)),
              ('huber', HuberRegressor(epsilon=2, max_iter=10000))]
vreg_in, vreg_out = VotingRegressor(estimators),  VotingRegressor(estimators)
cross_validate(vreg_in, X_train_in, y_train_in, cv=3,
               scoring=('neg_mean_squared_error'),
               return_train_score=True)
vreg_in.fit(X_train_in, y_train_in)
vreg_out.fit(X_train_out, y_train_out)

In [None]:
model = {'o': vreg_out, 'i': vreg_in}
import pickle
pickle_out = open("model.pkl","wb")
pickle.dump(model, pickle_out)
pickle_out.close()

In [None]:
def predict(X, models: dict):
    """Custom prediction function to combine two regression models"""
    outliers = X['OutlierRule'] == -1
    X_o = X.loc[outliers].drop(columns=['OutlierRule', 'Outlier'])
    X_i = X.loc[~outliers].drop(columns=['OutlierRule', 'Outlier'])
    y = np.empty(X.shape[0])  # store combined predictions
    # predict target values separately accorindg to the two models
    
    if X_o.shape[0]:
        y_o_pred = models['o'].predict(X_o)
        y[outliers] = y_o_pred
    if X_i.shape[0]:
        y_i_pred = models['i'].predict(X_i)
        y[~outliers] = y_i_pred
    
    return y

In [None]:
def predict_mapping(y_test): 
    y_actual, y_pred = y_test, predict(x_token, {'o': vreg_out, 'i': vreg_in})
    predict_df = pd.DataFrame({'Actual': y_actual, 'Predicted': y_pred})
    predicted_merged = df.merge(predict_df, how='left', left_index=True, right_index=True)
    predicted_merged = predicted_merged.loc[~predicted_merged['Predicted'].isna()]
    cols = predicted_merged.columns[0:]
    pm = predicted_merged.loc[:, ['TokenId', 'SaleDate', 'LogUSDPriceEWM', 'Actual', 'Predicted', 'USDPrice', 'PctExtensionEWM']]
    pm['PredictedUSDPrice'] = np.exp(pm['LogUSDPriceEWM'] * (1 + pm['Predicted']))
    pm['Accuracy'] = 1 - abs(pm['PredictedUSDPrice'] - pm['USDPrice']) / ((pm['PredictedUSDPrice'] + pm['USDPrice']) / 2)
    return pm[['TokenId', 'USDPrice', 'PredictedUSDPrice', 'Accuracy']]


In [None]:
predict_mapping(y_token)