In [1]:
import ipywidgets as widgets
from cerebro.backend import SparkBackend
from cerebro.keras import SparkEstimator
import os

In [2]:
sli = widgets.IntSlider(
    value=7,
    min=0,
    max=10,
    step=1,
    description='Test:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
)
sli

IntSlider(value=7, continuous_update=False, description='Test:', max=10)

In [3]:
out = widgets.Output(layout={'border': '1px solid black'})
out

Output(layout=Layout(border='1px solid black'))

In [4]:
with out:
    for i in range(3):
        print(i, 'Hello world!')

In [5]:
out.append_display_data(sli)

IntSlider(value=10, continuous_update=False, description='Test:', max=10)

In [6]:
sli.value

10

In [2]:
import cerebro
print(os.path.abspath(cerebro.__file__))

/Users/Lee/opt/anaconda3/lib/python3.7/site-packages/cerebro_dl-1.0.0-py3.7.egg/cerebro/__init__.py


In [4]:
# datas storage for intermediate data and model artifacts.
from cerebro.storage import LocalStore, HDFSStore

# Model selection/AutoML methods.
from cerebro.tune import GridSearch, RandomSearch, TPESearch

# Utility functions for specifying the search space.
from cerebro.tune import hp_choice, hp_uniform, hp_quniform, hp_loguniform, hp_qloguniform

import tensorflow as tf
from pyspark.sql import SparkSession
import numpy as np

In [8]:
%gui asyncio

In [9]:
import asyncio
def wait_for_change(widget, value):
    future = asyncio.Future()
    def getvalue(change):
        # make the new value available
        future.set_result(change.new)
        widget.unobserve(getvalue, value)
    widget.observe(getvalue, value)
    return future

In [10]:
from ipywidgets import IntSlider
slider = IntSlider()

async def f():
    for i in range(10):
        print('did work %s'%i)
        x = await wait_for_change(slider, 'value')
        print('async function continued with value %s'%x)
asyncio.ensure_future(f())

slider

IntSlider(value=0)

did work 0
async function continued with value 26
did work 1
async function continued with value 42
did work 2
async function continued with value 75
did work 3
async function continued with value 42
did work 4
async function continued with value 23
did work 5
async function continued with value 65
did work 6
async function continued with value 51
did work 7
async function continued with value 35
did work 8
async function continued with value 15
did work 9
async function continued with value 44


In [6]:
spark = SparkSession \
    .builder \
    .appName("Cerebro Example") \
    .getOrCreate()

...

backend = SparkBackend(spark_context=spark.sparkContext, num_workers=1)
store = LocalStore(prefix_path='/Users/Lee/Documents/Research/MS_Thesis/Trial/')


# Initialize input DataFrames.
# You can download sample dataset from https://apache.googlesource.com/spark/+/master/data/mllib/sample_libsvm_data.txt
df = spark.read.format("libsvm").load("sample_libsvm_data.txt").repartition(8)
train_df, test_df = df.randomSplit([0.8, 0.2])

# Define estimator generating function.
# Input: Dictionary containing parameter values
# Output: SparkEstimator
def estimator_gen_fn(params):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Dense(100, input_dim=692))
    model.add(tf.keras.layers.Dense(1, input_dim=100))
    model.add(tf.keras.layers.Activation('sigmoid'))

    optimizer = tf.keras.optimizers.Adam(lr=params['lr'])
    loss = 'binary_crossentropy'

    estimator = SparkEstimator(
        model=model,
        optimizer=optimizer,
        loss=loss,
        metrics=['acc'],
        batch_size=params['batch_size'])

    return estimator

# Define dictionary containing the parameter search space.
search_space = {
    'lr': hp_choice([0.001]),
    'batch_size': hp_choice([96, 128]) #hp_quniform(96, 128, 16)
}

model_selection = TPESearch(backend=backend, store=store, estimator_gen_fn=estimator_gen_fn, search_space=search_space,
            num_models=30, num_epochs=10, validation=0.25, evaluation_metric='loss',
            feature_columns=['features'], label_columns=['label'])
# model_selection = HILGridSearch(backend=backend, store=store, estimator_gen_fn=estimator_gen_fn, search_space=search_space,
#             num_epochs=10, validation=0.25, evaluation_metric='loss',
#             feature_columns=['features'], label_columns=['label'])

# Perform model selection. Returns best model.
model = model_selection.fit(train_df)

# Inspect best model training history.
model_history = model.get_history()

# Perform inference using the best model and Spark DataFrame.
output_df = model.set_output_columns(['label_predicted']).transform(test_df)
output_df.select('label', 'label_predicted').show(n=10)

# Access all models.
all_models = model.get_all_models()
all_model_training_history = model.get_all_model_history()

# Convert the best model to Keras and perform inference using numpy data.
keras_model = model.keras()
pred = keras_model.predict([np.ones([1, 692], dtype=np.float32)])
# Save the keras checkpoint file.
ckpt_path = "./ckpt_models/"
keras_model.save(ckpt_path)

# Convert all the model to Keras.
all_models_keras = [m.keras() for m in all_models]

CEREBRO => Time: 2021-02-23 15:22:50, Running 1 Workers
CEREBRO => Time: 2021-02-23 15:22:51, Preparing Data
CEREBRO => Time: 2021-02-23 15:22:51, Num Partitions: 8
CEREBRO => Time: 2021-02-23 15:22:51, Writing DataFrames
CEREBRO => Time: 2021-02-23 15:22:51, Train Data Path: file:///Users/Lee/Documents/Research/MS_Thesis/Trial/intermediate_train_data
CEREBRO => Time: 2021-02-23 15:22:51, Val Data Path: file:///Users/Lee/Documents/Research/MS_Thesis/Trial/intermediate_val_data
CEREBRO => Time: 2021-02-23 15:22:54, Train Partitions: 6
CEREBRO => Time: 2021-02-23 15:23:04, Val Partitions: 2
CEREBRO => Time: 2021-02-23 15:23:11, Train Rows: 65
CEREBRO => Time: 2021-02-23 15:23:11, Val Rows: 22
CEREBRO => Time: 2021-02-23 15:23:11, Initializing Workers
CEREBRO => Time: 2021-02-23 15:23:11, Initializing Data Loaders
CEREBRO => Time: 2021-02-23 15:23:12, Launching Model Selection Workload
CEREBRO => Time: 2021-02-23 15:23:12, Model: model_0_1614064992, batch_size: 96, lr: 0.001
CEREBRO => Ti

KeyboardInterrupt: 

In [4]:
import ipywidgets as widgets

In [1]:
import widgets_background

In [2]:
widgets_background.test_func()

Checkbox(value=False, description='something 0')

Checkbox(value=False, description='something 1')

input something!: a
[Checkbox(value=False, description='something 0'), Checkbox(value=False, description='something 1')]
False
False


In [5]:
checkboxes = []
for param in range(2):
    name = 'something ' + str(param)
    ckb = widgets.Checkbox(
        value=False,
        description=name,
        disabled=False,
        sync = True
    )
    display(ckb)
    checkboxes.append(ckb)

Checkbox(value=False, description='something 0')

Checkbox(value=False, description='something 1')

In [6]:
for i in range(len(checkboxes)):
    print(checkboxes[i].value)
    if checkboxes[i].value:
        print("Ticked: " + checkboxes[i].description)

True
Ticked: something 0
False


In [6]:
ckb.value

True

In [1]:
%load_ext tensorboard
%tensorboard --logdir /Users/Lee/Documents/Research/MS_Thesis/Trial/runs/logs

Reusing TensorBoard on port 6006 (pid 8860), started 0:05:29 ago. (Use '!kill 8860' to kill it.)