In [2]:
%load_ext autoreload

In [None]:
%load_ext blackcellmagic

In [3]:
%autoreload 2

In [9]:
from prefect import Flow, Parameter, unmapped
import pandas as pd
from prefect.engine.executors import DaskExecutor
from meta_model import MetaModel

In [10]:
input_df = pd.read_csv("../data/house-prices-advanced-regression-techniques/train.csv")
test= pd.read_csv("../data/house-prices-advanced-regression-techniques/test.csv")

In [11]:
from ml_flow import *

with Flow("data_cleaning") as flow:
    input_data = Parameter("input_data")
    problem, target, features = (
        Parameter("problem"),
        Parameter("target"),
        Parameter("features"),
    )
    tinydb = recreate_tinydb()
    nan_features = extract_nan_features(input_data)
    problematic_features = extract_problematic_features(input_data)
    undefined_features = extract_undefined_features(
        input_data, features, target, nan_features, problematic_features
    )
    input_data_with_missing = fit_transform_missing_indicator(
        input_data, undefined_features
    )

    train_valid_split = extract_train_valid_split(
        input_data=input_data_with_missing, problem=problem, target=target
    )
    train_data = extract_train_data(train_valid_split)
    valid_data = extract_valid_data(train_valid_split)
    numeric_features = extract_numeric_features(input_data, undefined_features)
    categorical_features = extract_categorical_features(input_data, undefined_features)

    # numeric columns work
    numeric_imputer = fit_numeric_imputer(train_data, numeric_features)
    imputed_train_numeric_df = impute_numeric_df(
        numeric_imputer, train_data, numeric_features
    )
    imputed_valid_numeric_df = impute_numeric_df(
        numeric_imputer, valid_data, numeric_features
    )

    yeo_johnson_transformer = fit_yeo_johnson_transformer(imputed_train_numeric_df)
    yeo_johnson_train_transformed = transform_yeo_johnson_transformer(
        imputed_train_numeric_df, yeo_johnson_transformer
    )
    yeo_johnson_valid_transformed = transform_yeo_johnson_transformer(
        imputed_valid_numeric_df, yeo_johnson_transformer
    )

    # categorical columns work
    categorical_imputer = fit_categorical_imputer(train_data, categorical_features)
    imputed_train_categorical_df = transform_categorical_data(
        train_data, categorical_features, categorical_imputer
    )
    imputed_valid_categorical_df = transform_categorical_data(
        valid_data, categorical_features, categorical_imputer
    )

    target_transformer = fit_target_transformer(problem, target, train_data)
    transformed_train_target = transform_target(
        problem, target, train_data, target_transformer
    )
    transformed_valid_target = transform_target(
        problem, target, valid_data, target_transformer
    )

    target_encoder_transformer = fit_target_encoder(
        imputed_train_categorical_df, transformed_train_target
    )
    target_encoded_train_df = target_encoder_transform(
        target_encoder_transformer, imputed_train_categorical_df
    )
    target_encoded_valid_df = target_encoder_transform(
        target_encoder_transformer, imputed_valid_categorical_df
    )

    # merge_data
    transformed_train_df = merge_transformed_data(
        target_encoded_train_df, yeo_johnson_train_transformed,
    )
    transformed_valid_df = merge_transformed_data(
        target_encoded_valid_df, yeo_johnson_valid_transformed,
    )

    # outlierness
    hbos_transformer = fit_hbos_transformer(transformed_train_df)
    hbos_transform_train_data = hbos_transform(transformed_train_df, hbos_transformer)
    hbos_transform_valid_data = hbos_transform(transformed_valid_df, hbos_transformer)

    # merge outlierness
    transformed_train_df = merge_hbos_df(
        transformed_train_df, hbos_transform_train_data
    )
    transformed_valid_df = merge_hbos_df(
        transformed_valid_df, hbos_transform_valid_data
    )
    save_data(transformed_train_df, "transformed_train.df",)
    save_data(transformed_valid_df, "transformed_valid.df",)
    
    #dimensionality reduction
    svd = fit_svd(transformed_train_df)
    svd_train = svd_transform(svd, transformed_train_df, "transformed_train_df",tinydb)
    svd_valid = svd_transform(svd, transformed_valid_df, "transformed_valid_df",tinydb)
    

    # models
    meta = MetaModel(problem="regression", db=tinydb)
    meta.default_models()
    models = meta.models
    fit_models = fit_model.map(
        model=models,
        train_data=unmapped(transformed_train_df),
        target=unmapped(transformed_train_target),
        problem=unmapped(problem),
    )
    predict_models = predict_model.map(
        model=fit_models, valid_data=unmapped(transformed_valid_df),
    )

In [12]:
executor = DaskExecutor()
flow_state = flow.run(
    input_data= input_df, 
    problem="regression", 
    target = "SalePrice", 
    features = "infer",
    executor=executor
)

[2020-04-12 23:04:42,048] INFO - prefect.FlowRunner | Beginning Flow run for 'data_cleaning'
[2020-04-12 23:04:42,052] INFO - prefect.FlowRunner | Starting flow run.
[2020-04-12 23:04:42,149] INFO - prefect.TaskRunner | Task 'recreate_tinydb': Starting task run...
[2020-04-12 23:04:42,205] INFO - prefect.TaskRunner | Task 'target': Starting task run...
[2020-04-12 23:04:42,226] INFO - prefect.TaskRunner | Task 'problem': Starting task run...
[2020-04-12 23:04:42,293] INFO - prefect.TaskRunner | Task 'features': Starting task run...
[2020-04-12 23:04:42,330] INFO - prefect.TaskRunner | Task 'input_data': Starting task run...
[2020-04-12 23:04:42,347] INFO - prefect.TaskRunner | Task 'problem': finished task run for task with final state: 'Success'
[2020-04-12 23:04:42,360] INFO - prefect.TaskRunner | Task 'target': finished task run for task with final state: 'Success'
[2020-04-12 23:04:42,387] INFO - prefect.TaskRunner | Task 'features': finished task run for task with final state: 'Su

[2020-04-12 23:04:44,270] INFO - prefect.TaskRunner | Task 'transform_yeo_johnson_transformer': Starting task run...
[2020-04-12 23:04:44,279] INFO - prefect.TaskRunner | Task 'transform_yeo_johnson_transformer': finished task run for task with final state: 'Success'
[2020-04-12 23:04:44,298] INFO - prefect.TaskRunner | Task 'merge_transformed_data': Starting task run...
[2020-04-12 23:04:44,306] INFO - prefect.TaskRunner | Task 'merge_transformed_data': finished task run for task with final state: 'Success'
[2020-04-12 23:04:44,316] INFO - prefect.TaskRunner | Task 'transform_yeo_johnson_transformer': finished task run for task with final state: 'Success'
[2020-04-12 23:04:44,333] INFO - prefect.TaskRunner | Task 'merge_transformed_data': Starting task run...
[2020-04-12 23:04:44,342] INFO - prefect.TaskRunner | Task 'merge_transformed_data': finished task run for task with final state: 'Success'
[2020-04-12 23:04:44,359] INFO - prefect.TaskRunner | Task 'fit_hbos_transformer': Starti

  y = column_or_1d(y, warn=True)
  return self.model.fit(X, y)


[2020-04-12 23:04:45,046] INFO - prefect.TaskRunner | Task 'svd_transform': finished task run for task with final state: 'Success'
[2020-04-12 23:04:45,066] INFO - prefect.TaskRunner | Task 'svd_transform': finished task run for task with final state: 'Success'
[2020-04-12 23:04:45,167] INFO - prefect.TaskRunner | Task 'fit_model[5]': finished task run for task with final state: 'Success'
[2020-04-12 23:04:45,437] INFO - prefect.TaskRunner | Task 'fit_model[6]': finished task run for task with final state: 'Success'
[2020-04-12 23:04:45,452] INFO - prefect.TaskRunner | Task 'fit_model': finished task run for task with final state: 'Mapped'
[2020-04-12 23:04:45,475] INFO - prefect.TaskRunner | Task 'predict_model': Starting task run...
[2020-04-12 23:04:45,651] INFO - prefect.TaskRunner | Task 'predict_model[4]': Starting task run...
[2020-04-12 23:04:45,652] INFO - prefect.TaskRunner | Task 'predict_model[1]': Starting task run...
[2020-04-12 23:04:45,652] INFO - prefect.TaskRunner | T

In [6]:
#flow.visualize(flow_state=flow_state)

In [86]:
from tinydb import TinyDB, Query
db = TinyDB("db.json")
db.all()

[]

In [None]:
q = Query()
r = db.search(q.chunk == "svdname")

In [75]:
dir(q.chunk

['__and__',
 '__call__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__invert__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__or__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_generate_test',
 '_path',
 '_prepare_test',
 '_test',
 'all',
 'any',
 'exists',
 'hashval',
 'matches',
 'one_of',
 'search',
 'test']

In [119]:
flow._sorted_tasks()[38]

typing.Any

In [110]:
dir(flow._sorted_tasks()[38])

['__add__',
 '__and__',
 '__call__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__floordiv__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getitem__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__iter__',
 '__le__',
 '__lt__',
 '__mifflin__',
 '__mod__',
 '__module__',
 '__mul__',
 '__ne__',
 '__new__',
 '__or__',
 '__pow__',
 '__radd__',
 '__rand__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__rfloordiv__',
 '__rmod__',
 '__rmul__',
 '__ror__',
 '__rpow__',
 '__rsub__',
 '__rtruediv__',
 '__setattr__',
 '__sizeof__',
 '__slotnames__',
 '__str__',
 '__sub__',
 '__subclasshook__',
 '__truediv__',
 '__weakref__',
 'auto_generated',
 'bind',
 'cache_for',
 'cache_key',
 'cache_validator',
 'checkpoint',
 'copy',
 'inputs',
 'is_equal',
 'is_not_equal',
 'log_stdout',
 'logger',
 'map',
 'max_retries',
 'name',
 'not_',
 'or_',
 'outputs',
 'result_handler',
 'retry_delay',
 'run',
 'serialize',
 'set_dependencies',
 'set_