In [66]:
%load_ext blackcellmagic

In [None]:
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client

t_water = pd.read_csv(
    "https://raw.githubusercontent.com/jdills26/Tanzania-water-table/master/training_set_values.csv"
)
t_water_tgt = pd.read_csv(
    "https://raw.githubusercontent.com/jdills26/Tanzania-water-table/master/training_set_labels.csv"
)

In [3]:
#turning pandas dataframe into dask dataframe
t_water['target']=t_water_tgt['status_group']
wd=dd.from_pandas(t_water, npartitions=3)

In [None]:
region_dict = {
    "Arusha": 2,
    "Dar es Salaam": 7,
    "Dodoma": 1,
    "Iringa": 11,
    "Kagera": 18,
    "Kigoma": 16,
    "Kilimanjaro": 3,
    "Lindi": 80,
    "Manyara": 21,
    "Mara": 20,
    "Mbeya": 12,
    "Morogoro": 5,
    "Mtwara": 90,
    "Mwanza": 19,
    "Pwani": 6,
    "Rukwa": 15,
    "Ruvuma": 10,
    "Shinyanga": 17,
    "Singida": 13,
    "Tabora": 14,
    "Tanga": 4,
}


def clean_region(frame):
    frame["region_code"] = frame["region"].map(region_dict)


clean_region(wd)

In [None]:
# make a dataframe to work out average longitude, latitude, gps_height by region
# wd['my_area_code']=100*wd['region_code']+wd['district_code']
averages = (
    wd[wd["longitude"] != 0]
    .groupby(["region_code"])[["longitude", "latitude"]]
    .mean()
    .compute()
)
longitude_map = averages["longitude"].to_dict()
latitude_map = averages["latitude"].to_dict()
wd["avg_longitude"] = wd["region_code"].map(longitude_map)
wd["avg_latitude"] = wd["region_code"].map(latitude_map)
wd["new_longitude"] = wd["longitude"].where(wd["longitude"] != 0, wd["avg_longitude"])
wd["new_latitude"] = wd["latitude"].where(wd["longitude"] != 0, wd["avg_latitude"])

In [None]:
# dates
wd["date_recorded"] = dd.to_datetime(wd["date_recorded"], format="%Y-%m-%d")
wd["month"] = wd["date_recorded"].map(lambda x: x.month)
wd["year"] = wd["date_recorded"].map(lambda x: x.year)
wd["date_recorded"] = wd["date_recorded"].map(lambda x: x.toordinal())

In [54]:
wd["rot45X"] = .707* wd["new_latitude"] - .707* wd["new_longitude"]
wd["rot30X"]  = (1.732/2)* wd["new_latitude"] - (1./2)* wd["new_longitude"]
wd["rot60X"]  = (1./2)* wd["new_latitude"] - (1.732/2)* wd["new_longitude"]
wd["radial_r"] = np.sqrt( np.power(wd["new_latitude"],2) + np.power(wd["new_longitude"],2) )

In [55]:
wd['radial_r'].isna().sum().compute()

0

In [None]:
features = [
    "basin",
    "scheme_management",
    "extraction_type_group",
    "extraction_type_class",
    "month",
    "payment",
    "quantity",
    "source",
    "waterpoint_type",
    "amount_tsh",
    "gps_height",
    "new_longitude",
    "new_latitude",
    "population",
    "construction_year",
    "district_code",
    "region_code",
    "date_recorded",
    "permit",
    "public_meeting",
    "rot45X",
    "radial_r",
]

In [None]:
X = wd[features]
from sklearn.ensemble import RandomForestClassifier
from dask_ml.preprocessing import (
    RobustScaler,
    Categorizer,
    DummyEncoder,
    OrdinalEncoder,
)
from sklearn.pipeline import make_pipeline

preprocessor = make_pipeline(
    Categorizer(), DummyEncoder(), RobustScaler()
)  # ,SimpleImputer()#ce.OrdinalEncoder(),

X = preprocessor.fit_transform(X)

In [41]:
len(X.columns),(len(X))

(86, 59400)

In [45]:
y_dict={'functional':1,'non functional':0,'functional needs repair':2}
y=wd['target'].map(y_dict)

In [58]:
#just to check it works on dask collection
rfc = RandomForestClassifier()
rfc.fit(X,y)

RandomForestClassifier(bootstrap=True, class_weight=None, criterion='gini',
            max_depth=None, max_features='auto', max_leaf_nodes=None,
            min_impurity_decrease=0.0, min_impurity_split=None,
            min_samples_leaf=1, min_samples_split=2,
            min_weight_fraction_leaf=0.0, n_estimators=10, n_jobs=None,
            oob_score=False, random_state=None, verbose=0,
            warm_start=False)

In [None]:
# i had to use .values here to get this to run.  am not sure why as docs say
# should work straight on the dask dataframe
from dask_ml.model_selection import RandomizedSearchCV
from scipy.stats import randint

param_distributions_f = {
    "n_estimators": randint(100, 140),
    "max_depth": randint(16, 23),
}

search_f = RandomizedSearchCV(
    estimator=RandomForestClassifier(
        criterion="entropy", warm_start=True, oob_score=True, n_jobs=-1, random_state=42
    ),
    param_distributions=param_distributions_f,
    n_iter=10,
    scoring="accuracy",
    n_jobs=-1,
    cv=3,
    return_train_score=True,
)

search_f.fit(X.values, y.values)

In [65]:
pd.DataFrame(search_f.cv_results_).sort_values(by='rank_test_score').head(5)

Unnamed: 0,params,mean_fit_time,std_fit_time,mean_score_time,std_score_time,split0_test_score,split1_test_score,split2_test_score,mean_test_score,std_test_score,rank_test_score,split0_train_score,split1_train_score,split2_train_score,mean_train_score,std_train_score,param_max_depth,param_n_estimators
8,"{'max_depth': 21, 'n_estimators': 133}",45.065397,0.688802,4.182663,1.018419,0.807071,0.80702,0.805808,0.806633,0.000584,1,0.964293,0.964848,0.966566,0.965236,0.000967,21,133
5,"{'max_depth': 18, 'n_estimators': 126}",41.029127,0.779754,4.678832,1.112311,0.805909,0.808434,0.805354,0.806566,0.001341,2,0.919141,0.922727,0.923687,0.921852,0.001956,18,126
1,"{'max_depth': 18, 'n_estimators': 127}",47.489294,5.53775,3.70585,0.465812,0.805707,0.808283,0.805303,0.806431,0.00132,3,0.918889,0.922652,0.923939,0.921827,0.002143,18,127
6,"{'max_depth': 21, 'n_estimators': 101}",34.443672,4.335718,4.559653,0.843609,0.807323,0.807374,0.804293,0.80633,0.001441,4,0.963611,0.964646,0.96697,0.965076,0.001404,21,101
3,"{'max_depth': 19, 'n_estimators': 134}",50.118286,2.508439,4.206169,0.390526,0.806111,0.808333,0.804444,0.806296,0.001593,5,0.93654,0.939823,0.940379,0.938914,0.001694,19,134


In [60]:
type(X),type(y)

(dask.dataframe.core.DataFrame, dask.dataframe.core.Series)

In [62]:
type(y.values)

dask.array.core.Array