In [1]:
from random import sample
from time import time
import pandas as pd
import pymongo
from sklearn import ensemble
import numpy as np
import os
from sklearn.model_selection import cross_val_score, train_test_split, KFold
from sklearn.metrics import mean_squared_error
from math import sqrt

import random

import matplotlib.pyplot as plt
from sklearn.cluster import KMeans

from sklearn.experimental import enable_halving_search_cv
from sklearn.model_selection import HalvingGridSearchCV

<h3><u>GROUPING CLUSTERS FROM CSV</u></h3>

In [2]:
df_clusters = pd.read_csv("~/ucc-21/clusters.csv")

In [3]:
time1 = time()

df_clusters

gk = df_clusters.groupby('cluster_id')

gk

<pandas.core.groupby.generic.DataFrameGroupBy object at 0x7fd7e4464d60>

In [4]:
parent_maps = {}
child_to_parent = {}

for name, group in gk:
    row = group[group.distance == group.distance.min()]
    row_max = group[group.distance == group.distance.max()]
    
    children = list(group.gis_join)
    distances = list(group.distance)
    
    dist_min = row['distance'].item()
    dist_max = row_max['distance'].item()
    
    pg = str(row['gis_join'].item())
    
    parent_index = children.index(pg)
    children.pop(parent_index)
    distances.pop(parent_index)
    
    inner_dict = {}
    inner_dict['dist_min'] = dist_min
    inner_dict['dist_max'] = dist_max
    inner_dict['children'] = children
    inner_dict['distances'] = distances
    
    parent_maps[pg] = inner_dict
    
    for c in children:
        child_to_parent[c] = pg
    
                           
print(parent_maps)
print(child_to_parent)

{'G0800650': {'dist_min': 391.0929075096816, 'dist_max': 424.1308766333965, 'children': ['G4000770', 'G0601150', 'G4500190', 'G1301230', 'G3800270', 'G3800590', 'G1801290', 'G1701070', 'G1701170', 'G5101770', 'G4801290', 'G4001390', 'G1900290', 'G3600090', 'G5100030', 'G2701570', 'G3100750', 'G2601090', 'G4600470', 'G4800810', 'G4600910', 'G1700650', 'G1801330', 'G3901690', 'G0600670', 'G0600110', 'G4804710', 'G4805070', 'G2001390', 'G3701790', 'G2900550', 'G4700350', 'G2700290', 'G4600070', 'G4700210', 'G4800130', 'G3901390', 'G4001470', 'G4804130', 'G4500630', 'G4800590', 'G2400450', 'G0501310', 'G1702010', 'G0100710', 'G4804790', 'G2300190', 'G4802790', 'G4000290', 'G4001310', 'G1301390', 'G5500590', 'G4201270', 'G2400170', 'G4001110', 'G5100910', 'G2001650', 'G3600250', 'G3101370', 'G2601230', 'G4200630', 'G0500170', 'G2600290', 'G2800510', 'G4600830', 'G5101250', 'G0100750', 'G4700850', 'G2601350', 'G1302430', 'G4001410', 'G4701830', 'G2201230', 'G2801290', 'G4000490', 'G3901490',

<h3><u>CONSTANTS AND HELPER FUNCTIONS</u></h3>

In [5]:
sample_min = 0.05
sample_max = 0.25

query_collection = "macav2"

mongo_urls = [
    'mongodb://lattice-100:27018/',
    'mongodb://lattice-101:27018/',
    'mongodb://lattice-102:27018/',
    'mongodb://lattice-103:27018/',
    'mongodb://lattice-104:27018/'
]

mongo_db_name = "sustaindb"
query_fild = "gis_join"
train_test = 0.8


training_labels = ["min_surface_downwelling_shortwave_flux_in_air", "max_surface_downwelling_shortwave_flux_in_air",
                   "max_specific_humidity", "min_max_air_temperature", "max_max_air_temperature"]
target_labels = ["max_min_air_temperature"]


# QUERY projection
client_projection = {}
for val in training_labels:
    client_projection[val] = 1
for val in target_labels:
    client_projection[val] = 1
    
    

<h1><u>MODELING</u></h1>

In [6]:
saved_models = {}

# ACTUAL QUERYING
def query_sustaindb(query_gisjoin, sustain_db):
    sustain_collection = sustain_db[query_collection]
    client_query = {query_fild: query_gisjoin}
    query_results = list(sustain_collection.find(client_query, client_projection)) 
    return list(query_results)

# SAMPLE FROM QUERY RESULTS
def data_sampling(query_results, exhaustive, sample_percent=1):
    if exhaustive:
        all_data = query_results
    else:
        data_size = int(len(query_results) * sample_percent)
        all_data = sample(query_results, data_size)

    return pd.DataFrame(all_data)

# GET SAMPLE % BASED ON DISTANCE FROM CENTROID
def get_sample_percent(gis_join):
    parent_gis = child_to_parent[gis_join]
    inner_dict = parent_maps[parent_gis]
    d_max = inner_dict['dist_max']
    d_min = inner_dict['dist_min']
    children = inner_dict['children']
    distances = inner_dict['distances']
    
    my_index = children.index(gis_join)
    my_distance = distances[my_index]
    
    frac = (my_distance - d_min)/(d_max - d_min)
    
    perc = sample_min + (sample_max - sample_min) * frac
    
    perc*=100
    perc = int(perc)
    perc = perc - (perc%5)
    
    perc = perc/100
    return perc


# GET PERCENTAGE DISTANCE FROM CENTROID
def get_distance_percentage(gis_join):
    parent_gis = child_to_parent[gis_join]
    inner_dict = parent_maps[parent_gis]
    d_max = inner_dict['dist_max']
    d_min = inner_dict['dist_min']
    children = inner_dict['children']
    distances = inner_dict['distances']
    
    my_index = children.index(gis_join)
    my_distance = distances[my_index]
    
    frac = (my_distance - d_min)/(d_max - d_min)
    
    return frac * 100

def exhaustive_training(X,Y, gis_join):
    X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2)
    
    param_grid = {'max_depth': [2, 3], 'min_samples_split': [15, 20, 50]}
#     base_est = ensemble.RandomForestRegressor(random_state=0)
    base_est = ensemble.GradientBoostingRegressor(random_state=0)
    sh = HalvingGridSearchCV(base_est, param_grid, cv=5, verbose=1, 
                             factor=2, resource='n_estimators', max_resources=600).fit(X, pd.Series.ravel(Y))
    
    clf_best = sh.best_estimator_
    rmse = sqrt(mean_squared_error(pd.Series.ravel(y_test), clf_best.predict(X_test)))
    
    print("PARENT GISJOIN: ", gis_join, "RMSE:", rmse)
    return clf_best
    

def sampled_training(X, Y, gis_join, saved_models):
    parent_gis = child_to_parent[gis_join]
    clf = saved_models[parent_gis]
    
    X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2)
    
    clf.fit(X_train, pd.Series.ravel(y_train))

    rmse = sqrt(mean_squared_error(pd.Series.ravel(y_test), clf.predict(X_test)))
    
    print("CHILD GISJOIN: ", gis_join, "RMSE:", rmse)
    return clf
    

def train_gisjoin(gis_join, exhaustive=True, saved_models={}):
    mongo_url = mongo_urls[random.randint(0, len(mongo_urls) - 1)]
    sustainclient = pymongo.MongoClient(mongo_url)
    sustain_db = sustainclient[mongo_db_name]

    sample_percent = 1
    if not exhaustive:
        #print("SAMPLED CHILD TRAINING.....")
        sample_percent = get_sample_percent(gis_join)
        
        
    #QUERY
    results = query_sustaindb(gis_join, sustain_db)
    
    df_sampled = data_sampling(results, exhaustive, sample_percent)
    
    Y = df_sampled.loc[:,target_labels]
    X = df_sampled.loc[:, training_labels]
    #print(X.shape, Y.shape)
    
    if exhaustive:
        clf = exhaustive_training(X,Y, gis_join)
    else:
        clf = sampled_training(X,Y, gis_join, saved_models)
    
    #saved_models[gis_join] = clf
    return (gis_join,clf)
    
#'G1303070': 'G0800010'
#train_gisjoin('G0800010', True)
#train_gisjoin('G1303070', False)



In [7]:
sampling_perc_to_children_map = {}
for ck in child_to_parent.keys():
#     perc = get_distance_percentage(ck)
    perc = get_sample_percent(ck) * 100
    sampling_perc_to_children_map[ck] = perc
    
keys_0_to_15 = []
keys_15_to_25 = []
    
for gis_join, perc in sampling_perc_to_children_map.items():
    if perc < 15:
        keys_0_to_15.append(gis_join)
    elif perc > 15:
        keys_15_to_25.append(gis_join)
    
print(f'keys_0_to_15: {len(keys_0_to_15)}')
print(f'keys_15_to_25: {len(keys_15_to_25)}')

keys_0_to_15: 1870
keys_15_to_25: 175


In [8]:
import dask
from dask import delayed
from dask.distributed import Client

client = Client('localhost:9010')


+-------------+---------------+----------------+----------------+
| Package     | client        | scheduler      | workers        |
+-------------+---------------+----------------+----------------+
| blosc       | 1.10.4        | 1.10.2         | 1.10.2         |
| dask        | 2021.07.0     | 2021.08.0      | 2021.08.0      |
| distributed | 2021.07.0     | 2021.08.0      | 2021.08.0      |
| numpy       | 1.21.0        | 1.21.1         | 1.21.1         |
| python      | 3.8.6.final.0 | 3.8.10.final.0 | 3.8.10.final.0 |
+-------------+---------------+----------------+----------------+


In [9]:
import pickle
saved_models = pickle.load(open('parent_models.pkl', 'rb'))

In [13]:
outputs2 = []

# TRAINING CHILDREN NEXT
# for ck in child_to_parent.keys():
time1 = time()
for ck in keys_15_to_25:
    ret = delayed(train_gisjoin)(ck, False, saved_models)
    outputs2.append(ret)
    break

futures2 = dask.persist(*outputs2)  # trigger computation in the background
results2 = dask.compute(*futures2)

for sm in results2:
    (gis_join, model) = sm
    saved_models[gis_join] = model

print(saved_models)
print(f'time taken : {time() - time1} s')

CHILD GISJOIN:  G4805070 RMSE: 1.8752110725512565
{'G0800650': GradientBoostingRegressor(min_samples_split=20, n_estimators=600,
                          random_state=0), 'G0600750': GradientBoostingRegressor(min_samples_split=20, n_estimators=600,
                          random_state=0), 'G4805070': GradientBoostingRegressor(min_samples_split=20, n_estimators=600,
                          random_state=0), 'G4800130': GradientBoostingRegressor(min_samples_split=20, n_estimators=600,
                          random_state=0), 'G4804790': GradientBoostingRegressor(min_samples_split=20, n_estimators=600,
                          random_state=0), 'G2800470': GradientBoostingRegressor(min_samples_split=20, n_estimators=600,
                          random_state=0), 'G1200650': GradientBoostingRegressor(min_samples_split=20, n_estimators=600,
                          random_state=0), 'G2200950': GradientBoostingRegressor(min_samples_split=20, n_estimators=600,
                        

In [11]:
time2 = time()
print(f'Time Taken: {time2 - time1} s')

Time Taken: 38.9023175239563 s


distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
