# *ml-1N-parallel-and-premade-0-NCRH.ipynb*

# Try dask and use premade train/val datasets



In [12]:
import numpy as np
import pandas as pd
import os
from timeit import default_timer

from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import LinearSVC
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier

In [13]:
def create_models_bank(architecture):
    models_bank = {}
    increment = 0
    if architecture in ('RF','ET','RFET','ETRF'):
        for n_estimators in [200, 500]:
            for max_features in [0.05, 0.1, 0.2, 0.4, 1.0]:
                for min_samples_split in [2, 4]:
                    for bootstrap in [False, True]:
                        for class_weight in [None, 'balanced']:

                            increment += 1

                            three_digit = str(increment).zfill(3)
                            models_bank[three_digit] = {
                                'n_estimators': n_estimators,
                                'max_features': max_features,
                                'bootstrap': bootstrap,
                                'min_samples_split': min_samples_split,
                                'class_weight': class_weight,
                                'n_jobs': -1
                            }

    if architecture in ('LR','PL'):
        # 50 regularized models
        for l1_ratio in [0, 0.1, 0.5, 0.9, 1]:
            for C in [1, 10**-2, 10**-4, 10**-6, 10**-8]:
                for class_weight in [None, 'balanced']:
                    increment += 1
                    three_digit = str(increment).zfill(3)
                    models_bank[three_digit] = {
                        'solver': 'saga',
                        'penalty': 'elasticnet',
                        'l1_ratio': l1_ratio,
                        'C': C,
                        'class_weight': class_weight,
                        'max_iter': 100000,
                        'random_state': 19
                    }
        # 2 nonregularized models
        for class_weight in [None, 'balanced']:
            increment += 1
            three_digit = str(increment).zfill(3)
            models_bank[three_digit] = {
                'solver': 'saga',
                'penalty': 'none',
                'class_weight': class_weight,
                'max_iter': 20000,
                'random_state': 19
            }
            
    if architecture in ('LS'):
        # 30 linear svms
        for dual_penalty_loss in [(False,'l1','squared_hinge'),
                             (False,'l2','squared_hinge'),
                             (True,'l2','hinge')]:
            for C in [1, 10**-2, 10**-4, 10**-6, 10**-8]:
                for class_weight in [None, 'balanced']:
                    increment += 1
                    three_digit = str(increment).zfill(3)
                    models_bank[three_digit] = {
                        'dual': dual_penalty_loss[0],
                        'penalty': dual_penalty_loss[1],
                        'loss': dual_penalty_loss[2],
                        'C': C,
                        'class_weight': class_weight,
                        'max_iter': 100000,
                    }
                    
    if architecture in ('SV'):
        # 150 nonlinear svms
        for kernel in ['rbf','poly','sigmoid']:
            for C in [1, 10**-2, 10**-4, 10**-6, 10**-8]:
                for gamma in [100, 10, 1, 10**-1, 10**-2]:
                    for class_weight in [None, 'balanced']:
                        increment += 1
                        three_digit = str(increment).zfill(3)
                        models_bank[three_digit] = {
                                     'kernel': kernel,
                                     'C': C,
                                     'gamma': gamma,
                                     'degree': 3,
                                     'coef0': 0,
                                     'class_weight': class_weight,
                                     'max_iter': -1,
                                                }
                        
    if architecture in ('NB'):
        # 
        for var_smoothing in [10**-6, 10**-7, 10**-8,
                              10**-9, 10**-10, 10**-11, 0]:
            increment += 1
            three_digit = str(increment).zfill(3)
            models_bank[three_digit] = {
                         'var_smoothing': var_smoothing
                                    }
            
    if architecture in ('KN','PK'):
        step1 = [int(np.round(1.4**x)) for x in range(7,21)]
        step2 = [x+1 if x%2==0 else x for x in step1]
        k_list = [1,3,5,7,9] + step2
        # 152 models
        for n_neighbors in k_list:
            for weights in ['uniform','distance']:
                for metric in ['manhattan','euclidean','chebyshev','canberra']:
                    increment += 1
                    three_digit = str(increment).zfill(3)
                    models_bank[three_digit] = {
                        'n_neighbors': n_neighbors,
                        'weights': weights,
                        'metric': metric
                    }
    
    return models_bank

In [14]:
def grab_premade_X_y_train_val(training_sample_size,validation_sample_size,tile,
                     val_year,scheme_name,crop_of_interest_id,in_season):

    loc = f'../data/premade_{training_sample_size}_{validation_sample_size}'
    strings = []
    for arg in [tile,val_year,scheme_name,crop_of_interest_id,in_season]:
        strings.append(f'{arg}')
    most_of_name = '_'.join(strings) 
    
    Xy_trainval= ['X_train', 'X_val', 'y_train', 'y_val']
    
    d = {}
    for spec in Xy_trainval:
        d[spec] = np.load(f'{loc}/{most_of_name}_{spec}.npy')
    
    return d['X_train'], d['X_val'], d['y_train'], d['y_val']


In [15]:
def fit_predict_report(model_name,
                      model,
                      training_sample_size,
                      validation_sample_size,
                      tile,
                      years,
                      scheme_name,
                      crop_of_interest_id,
                      in_season,
                      from_premade=True
                      ):
    
    # produce csv_name
    exempt = ['years', 'model']
    param_value_strings = [f'{model_name}',
                      f'{training_sample_size}',
                      f'{validation_sample_size}',
                      f'{tile}',
                      f'{scheme_name}',
                      f'{crop_of_interest_id}',
                      f'{in_season}']
    csv_name = '_'.join(param_value_strings) +'.csv'

    # check whether previously run and, if so, end the effort
    if csv_name in os.listdir('../data/results/'):
        return 'If you see this, the specified model was previously run.'

    print(f'-- Process for {csv_name} --')
    
    # below is actually fitting and predicting and reporting
    
    conf = []

    for val_year in years:
        print('Starting a fold...')
        print('> Assembling the datasets')
        # NEED: X_train, y_train, X_val, y_val
        if from_premade==True: 
            X_train, X_val, y_train, y_val = grab_premade_X_y_train_val(training_sample_size,
                    validation_sample_size,tile,val_year,scheme_name,
                    crop_of_interest_id,in_season)
        
        if from_premade!=True:
            return 'This function in this notebook is only for from_premade=True'
        
        print('> Fitting the model on the training set')
        model.fit(X_train, y_train)
        print('> Predicting on the validation set')
        pred = model.predict(X_val)

        print('> Recording performance metrics')
        act = y_val
        ActPred_00 = sum((act==0) & (pred==0))
        ActPred_01 = sum((act==0) & (pred==1))
        ActPred_10 = sum((act==1) & (pred==0))
        ActPred_11 = sum((act==1) & (pred==1))
        conf_1yr = [ActPred_00, ActPred_01, ActPred_10, ActPred_11]

        conf.append(conf_1yr)
        print('Finished a fold.')

    carr = np.array(conf)

    carr = np.row_stack([carr,np.full((2,4),-1)])

    # above we added the totals row
    # now we need to add the columns for precision and recall

    # create dataframe
    cdf = pd.DataFrame(data = carr,
                      index = [f'ValYear{yr}' for yr in years]+['Mean','StdE'],
                      columns = ['ActPred_00', 'ActPred_01', 
                                 'ActPred_10', 'ActPred_11']
                      )

    cdf['Precision'] = cdf.ActPred_11 / (cdf.ActPred_01 + cdf.ActPred_11)
    cdf['Recall'] = cdf.ActPred_11 / (cdf.ActPred_10 + cdf.ActPred_11)
    cdf['F1'] = 2*cdf.Precision*cdf.Recall / (cdf.Precision + cdf.Recall)
    for col in ['Precision','Recall','F1']:
        cdf.at['Mean',col] = np.mean(cdf.loc[:'ValYear2022',col])
        cdf.at['StdE',col] = np.std(cdf.loc[:'ValYear2022',col])
    
    
    param_strings = [f'# model_name: {model_name}',
                     f'# model: {model}',
                      f'# training_sample_size: {training_sample_size}',
                      f'# validation_sample_size: {validation_sample_size}',
                      f'# tile: {tile}',
                      f'# scheme_name: {scheme_name}',
                      f'# crop_of_interest_id: {crop_of_interest_id}',
                      f'# in_season: {in_season}']
    comment = '\n'.join(param_strings) + '\n' 
    with open(f'../data/results/{csv_name}', 'a') as f:
        f.write(comment)
        cdf.to_csv(f)
    
    print(f'Find results in ../data/results/{csv_name}')
    
    return f'Find results in ../data/results/{csv_name}'

## Dask initialization

In [16]:
import dask
from dask.distributed import print
from dask.distributed import Client

In [17]:
client = Client(#memory_limit='128GiB',
                #threads_per_worker=1,
               )

Perhaps you already have a cluster running?
Hosting the HTTP server on port 45305 instead


In [18]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:45305/status,

0,1
Dashboard: http://127.0.0.1:45305/status,Workers: 8
Total threads: 56,Total memory: 128.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:41835,Workers: 8
Dashboard: http://127.0.0.1:45305/status,Total threads: 56
Started: Just now,Total memory: 128.00 GiB

0,1
Comm: tcp://127.0.0.1:36398,Total threads: 7
Dashboard: http://127.0.0.1:45321/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:37295,
Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-z0cp2bh0,Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-z0cp2bh0

0,1
Comm: tcp://127.0.0.1:35546,Total threads: 7
Dashboard: http://127.0.0.1:42956/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:40885,
Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-115ykbp_,Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-115ykbp_

0,1
Comm: tcp://127.0.0.1:41411,Total threads: 7
Dashboard: http://127.0.0.1:36953/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:40628,
Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-412jbj_n,Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-412jbj_n

0,1
Comm: tcp://127.0.0.1:41945,Total threads: 7
Dashboard: http://127.0.0.1:38915/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:46633,
Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-k7n0u67o,Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-k7n0u67o

0,1
Comm: tcp://127.0.0.1:37938,Total threads: 7
Dashboard: http://127.0.0.1:37310/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:35005,
Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-n2i92lid,Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-n2i92lid

0,1
Comm: tcp://127.0.0.1:42999,Total threads: 7
Dashboard: http://127.0.0.1:38367/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:46716,
Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-3s766_wm,Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-3s766_wm

0,1
Comm: tcp://127.0.0.1:40878,Total threads: 7
Dashboard: http://127.0.0.1:44708/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:33390,
Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-um_c45_v,Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-um_c45_v

0,1
Comm: tcp://127.0.0.1:33668,Total threads: 7
Dashboard: http://127.0.0.1:33462/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:42404,
Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-prclwdb_,Local directory: /work/sds-lab/august/crops/notebooks/dask-worker-space/worker-prclwdb_


In [19]:
#client.close()

In [20]:
fit_predict_report = dask.delayed(fit_predict_report)

## Try running Logistic Regression models on the in-season 0 or NCRH datasets

In [None]:
start = default_timer()

architecture = 'LR'  ## SPECIFY HERE

models_bank = create_models_bank(architecture)
training_sample_size = 0.001
validation_sample_size = 0.001

r = []
for tile_coiid in [('10SFH',75),('15TVG',1)]:
    for scheme_name in ['14day','5day']:
        for in_season in [0,'NCRH']:
            for three_digit in models_bank.keys():

                p = {

                ## SPECIFY MODEL ##
                'model_name': architecture + three_digit,
                'model': make_pipeline(StandardScaler(),  ## SPECIFY HERE
                                       LogisticRegression(**models_bank[three_digit])
                                      ),
                'training_sample_size': training_sample_size,
                'validation_sample_size': validation_sample_size,

                ## SPECIFY TILE AND SCHEME ##
                'tile': tile_coiid[0],
                'years': [2018, 2019, 2020, 2021, 2022],
                'scheme_name': scheme_name,
                'crop_of_interest_id': tile_coiid[1], 
                'in_season': in_season
                }

                #fit_predict_report(**p) # run with the above parameters
                r.append(fit_predict_report(**p))
                
dask.compute(*r)

duration = default_timer() - start
print(duration)
with open(f'../data/times/time{start}.txt', 'a') as f:
    f.write(str(duration))