In [None]:
#import
from rda_package import rda 
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from CosinorPy import file_parser, cosinor, cosinor1
import random
import os
from sklearn.metrics import confusion_matrix

N_TEST=1000
REPLICATES=1

In [None]:
def synt_rhythmic_data(filename,half_rnd=False,n_test=1,n_components=1,noise=0.5,replicates=1):
        """  
        Create test data  (rhythmic)
        ...

        Parameters
        ----------
        filename : str
            name of the output file
        half_rnd : bool
            make half of the data rhythmic and the other half non-rhythmic
        n_test : int
            number of line in the dataset
        n_components : int
            number of components in the cosinor data generator
        noise : int
            % of gaussian noise added
        replicates : int
            number of replicate in the dataset
        """
        os.makedirs(f'Out/{filename[:-4]}', exist_ok=True)
        if(n_components==1):
            df_rhd=file_parser.generate_test_data_group(N=n_test,n_components = n_components, noise=noise, replicates = replicates)
        else:
           df_rhd=file_parser.generate_test_data_group(N=n_test,n_components = n_components, noise=noise, replicates = replicates, 
                amplitudes = [1, np.random.random(), np.random.random()], phase = [2*np.pi*np.random.random(), 2*np.pi*np.random.random(), 2*np.pi*np.random.random()])
        df_str_col=pd.DataFrame()
        for i in range(1,replicates+1):
            str_col= 'ZT_'+ df_rhd["x"].astype(int).astype(str) + f'_{i}' 
            df_str_col=pd.concat([df_str_col,pd.Series(str_col.unique())],ignore_index=True)
        df_rhd_res =pd.DataFrame(index=df_rhd.test.unique(),columns=df_str_col.to_numpy().flatten()) 
        var = 0
        for i in range(len(df_rhd_res)):
            for col in df_rhd_res:
                df_rhd_res[col].iloc[i]= df_rhd.iloc[var].y
                var+=1  
        df_int_col=pd.DataFrame()
        for i in range(1,replicates+1):
            int_col=df_rhd["x"].astype(int).astype(str)
            df_int_col=pd.concat([df_int_col,pd.Series(int_col.unique())],ignore_index=True)
        df_rhd_res.columns=df_int_col.to_numpy().astype(int).flatten()
        df_results = df_rhd_res
        if(half_rnd==True):
            half=n_test//2
            df_rnd_res=df_rhd_res.iloc[:half,]
            print(df_rnd_res)
            df_rhd_res=df_rhd_res.iloc[half:,]
            print('---------ok-----------')
            print(df_rhd_res)
            #df_rnd_res.columns=random.sample(list(df_int_col.to_numpy().astype(int).flatten()), len(df_int_col.to_numpy().flatten()))  
            list_col=list(df_int_col.to_numpy().astype(int).flatten())
            random.shuffle(list_col) 
            df_rnd_res.columns=list_col
            df_rnd_res = df_rnd_res.sort_index(axis=1)
            print(df_rnd_res.columns) 
            df_results=pd.concat([df_rhd_res,df_rnd_res],ignore_index=True)
        print(df_results.columns)          
        df_results.to_csv(f"Out/{filename[:-4]}/{filename[:-4]}.csv")
        return df_results

In [None]:
for i in [3]:
    for j in [1,2]:
        FILENAME=f"stack_c{i}_n0{3*j}.csv"
        folder_in = f'Out/{FILENAME[:-4]}/'
        NOISE=float(0.3*j)
        N_COMPONENTS=int(i)
        synt_rhythmic_data(FILENAME,half_rnd=True,n_test=N_TEST,n_components=N_COMPONENTS,noise=NOISE,replicates=REPLICATES)
        rda.file_rda(folder_in+FILENAME,metrics=True,half_rnd=True,n_components=N_COMPONENTS)

In [None]:
i=3
j=3
filename=f"stack_c{i}_n0{3*j}.csv"
qv = pd.read_csv(f"Out/{filename[:-4]}/qv_{filename[:-4]}.csv")
y = pd.DataFrame([1] * (len(qv)//2), columns=['y'])
y = pd.concat([y,pd.DataFrame([0] * (len(qv)//2), columns=['y'])],ignore_index=True)
qv['y']=y
qv = qv.sample(frac=1).reset_index(drop=True)
qv=qv.drop('CycID',axis=1)
qv

In [None]:
#mcc models evaluation qvalue
ncols = 3
nrows = 3
fig, axes = plt.subplots(ncols = ncols, nrows = nrows, sharey=False)
axes = axes.flatten()         
fig.set_size_inches(30, 30)
sns.set_style("white")
flatui = ['#d9d9d9','#bdbdbd','#969696','#737373','#525252','#252525']
filenames = []
for i in [1,2,3]:
    for j in [1,2,3]:
        filename = f"stack_c{i}_n0{3*j}.csv"
        filenames.append(filename)
for ax, filename in zip(axes,filenames):
            df_metrics = pd.read_csv(f"Out/{filename[:-4]}/qv_metrics_{filename[:-4]}.csv")
            sns.barplot(data=df_metrics, x='model', y='mcc', ax=ax, ci=68, capsize=.2, palette=flatui) # ci=68 --> standard error!
            ax.set_ylabel(f'n_components = {filename[1]}')
            ax.set_xlabel(f'noise = 0.{filename[-5]}')
plt.suptitle(f"Matthew's Correlation Coefficient for models evaluation qv")
fig.subplots_adjust(top=0.95)
plt.savefig(f"Out/models_qv_mcc.png", bbox_inches="tight", facecolor='white')
plt.show()

In [None]:
from sklearn.metrics import confusion_matrix
def plot_result(grid,X_test,y_test):
    y_pred = grid.predict(X_test)
    cf_matrix = confusion_matrix(y_test, y_pred)
    group_names = ['True Neg','False Pos','False Neg','True Pos']
    group_counts = ['{0:0.0f}'.format(value) for value in cf_matrix.flatten()]
    group_percentages = ['{0:.2%}'.format(value) for value in cf_matrix.flatten()/np.sum(cf_matrix)]
    labels = [f'{v1}\n{v2}\n{v3}' for v1, v2, v3 in zip(group_names,group_counts,group_percentages)]
    labels = np.asarray(labels).reshape(2,2)
    sns.heatmap(cf_matrix, annot=labels, fmt='', cmap='Blues')
    print('Score : ',grid.score(X_test,y_test),'\n',grid.best_estimator_)

In [None]:
import xgboost as xgb
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import train_test_split
X_train,X_test,y_train,y_test = train_test_split(qv.drop('y',axis=1),qv['y'], test_size=0.5)
print(X_train,y_train)
X_train

In [None]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.tree import plot_tree
clf = DecisionTreeClassifier()
params = {
           'max_depth': [3],
           'min_samples_leaf': [1,10,30,70],
           'criterion':['gini','entropy']
    }
grid = GridSearchCV(clf,param_grid=params,cv=3)
grid.fit(X_train, y_train)

#Get the confusion matrix
plot_result(grid,X_test,y_test)
fig = plt.figure(figsize=(50,50))
plot_tree(grid.best_estimator_, feature_names=X_test.columns,  
                   class_names=['0','1'],
                   filled=True)
fig.savefig("decistion_tree.png", facecolor='white')

In [None]:
import xgboost as xgb

clf = xgb.XGBClassifier(objective='binary:logistic',eval_metric='mlogloss',use_label_encoder=False )
params = {
           'n_estimators':[250,350],
           'max_depth': [10,70,100],
           'learning_rate': [0.1,1]
}

grid = GridSearchCV(clf, param_grid=params,verbose=1,return_train_score=True,scoring='f1_micro',cv=3)
grid.fit(X_train, y_train)
plot_result(grid,X_test,y_test)
#Get the confusion matrix


In [None]:
import pandas as pd
df_metrics= pd.DataFrame()
for i in [1,2,3]:
    for j in [1,2,3]:
        filename=f"stack_c{i}_n0{3*j}.csv"
        df_tmp = pd.read_csv(f"Out/{filename[:-4]}/qv_metrics_{filename[:-4]}.csv")
        df_tmp['filename']=filename
        df_metrics =pd.concat([df_metrics,df_tmp])
df_metrics