In [1]:
from cerebro.backend import SparkBackend
from cerebro.keras import SparkEstimator

# datas storage for intermediate data and model artifacts.
from cerebro.storage import LocalStore, HDFSStore

# Model selection/AutoML methods.
from cerebro.tune import GridSearch, RandomSearch, TPESearch

# Utility functions for specifying the search space.
from cerebro.tune import hp_choice, hp_uniform, hp_quniform, hp_loguniform, hp_qloguniform

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from pyspark.sql import SparkSession
import numpy as np


spark = SparkSession \
    .builder \
    .appName("Cerebro Example") \
    .getOrCreate()

...
work_dir = '/Users/zijian/Desktop/ucsd/cse234/project/cerebro-system/'
backend = SparkBackend(spark_context=spark.sparkContext, num_workers=1)
store = LocalStore(prefix_path=work_dir + 'test/')

df = spark.read.format("libsvm") \
    .option("numFeatures", "784") \
    .load("/Users/zijian/Desktop/ucsd/cse234/project/mnist/mnist.scale")

21/12/01 06:18:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


CEREBRO => Time: 2021-12-01 06:18:52, Running 1 Workers


In [2]:
from pyspark.ml.feature import OneHotEncoderEstimator

encoder = OneHotEncoderEstimator(dropLast=False)
encoder.setInputCols(["label"])
encoder.setOutputCols(["label_OHE"])

encoder_model = encoder.fit(df)
encoded = encoder_model.transform(df)

feature_columns=['features']
label_columns=['label_OHE']
train_df, test_df = encoded.randomSplit([0.8, 0.2], seed=100)

                                                                                

In [3]:
from keras_tuner.engine import hyperparameters
import autokeras as ak
from cerebro.nas.hphpmodel import HyperHyperModel

img_shape = (28, 28, 1)

input_node = ak.ImageInput()
output_node = ak.ConvBlock(
    kernel_size=hyperparameters.Fixed('kernel_size', value=3),
    num_blocks=hyperparameters.Fixed('num_blocks', value=1),
    num_layers=hyperparameters.Fixed('num_layers', value=2),
)(input_node)
output_node = ak.ClassificationHead()(output_node)
am = HyperHyperModel(input_node, output_node, seed=2000)

am.resource_bind(
    backend=backend, 
    store=store,
    feature_columns=feature_columns,
    label_columns=label_columns,
    evaluation_metric='accuracy', 
)

am.tuner_bind(
    tuner="randomsearch", 
    hyperparameters=None, 
    objective="val_accuracy",
    max_trials=20,
    overwrite=True,
)

In [4]:
am.sys_setup(train_df)

CEREBRO => Time: 2021-12-01 06:18:56, Preparing Data
CEREBRO => Time: 2021-12-01 06:18:56, Num Partitions: 12
CEREBRO => Time: 2021-12-01 06:18:56, Writing DataFrames
CEREBRO => Time: 2021-12-01 06:18:56, Train Data Path: file:///Users/zijian/Desktop/ucsd/cse234/project/cerebro-system/test/intermediate_train_data
CEREBRO => Time: 2021-12-01 06:18:56, Val Data Path: file:///Users/zijian/Desktop/ucsd/cse234/project/cerebro-system/test/intermediate_val_data


                                                                                

CEREBRO => Time: 2021-12-01 06:19:04, Train Partitions: 9


                                                                                

CEREBRO => Time: 2021-12-01 06:19:19, Val Partitions: 2


                                                                                

CEREBRO => Time: 2021-12-01 06:19:37, Train Rows: 38401
CEREBRO => Time: 2021-12-01 06:19:37, Val Rows: 9634
CEREBRO => Time: 2021-12-01 06:19:37, Initializing Workers
CEREBRO => Time: 2021-12-01 06:19:37, Initializing Data Loaders


[Stage 9:>                                                          (0 + 1) / 1]

In [5]:
# Check parquet dataset
from cerebro.backend.spark import util

ms = am.model_selection
store = ms.store
dataset_idx = None
label_columns = ms.label_cols
feature_columns = ms.feature_cols
train_rows, val_rows, metadata, avg_row_size = \
        util.get_simple_meta_from_parquet(store,
                                          schema_cols=label_columns + feature_columns,
                                          sample_weight_col=None,
                                          dataset_idx=dataset_idx)

In [6]:
metadata

{'label_OHE': {'spark_data_type': pyspark.sql.types.BinaryType,
  'is_sparse_vector_only': False,
  'shape': None,
  'intermediate_format': 'nochange',
  'max_size': None},
 'features': {'spark_data_type': pyspark.sql.types.BinaryType,
  'is_sparse_vector_only': False,
  'shape': None,
  'intermediate_format': 'nochange',
  'max_size': None}}

In [7]:
train_data_path = store.get_train_data_path(dataset_idx)
train_data = store.get_parquet_dataset(train_data_path)
schema = train_data.schema.to_arrow_schema()
schema.field('features').type

DataType(binary)

In [8]:
# Create trial to generate estimator
x = np.array(train_df.select(feature_columns).head(100))
y = np.array(train_df.select(label_columns).head(100))
x = [x[:,i] for i in range(x.shape[1])]
x = [r.reshape((-1, *img_shape)) for r in x]
y = np.squeeze(y,1)
if len(y.shape) > 2:
    raise ValueError(
        "We do not support multiple labels. Expect the target data for {name} to have shape "
        "(batch_size, num_classes), "
        "but got {shape}.".format(name=self.name, shape=self.shape)
    )
dataset, validation_data = am._convert_to_dataset(
    x=x, y=y, validation_data=None, batch_size=32
)

"""
Analyze data analyse input and output data and config model inputs and heads
"""
am._analyze_data(dataset)

"""
Build preprocessing pipeline with tunable parameters

Since the model is trained from workers which reads data from pre-distributed permanent storage, we will not consider tuning preprocessing currently.
"""
# self._build_hyper_pipeline(dataset)
am.tuner.hyper_pipeline = None
am.tuner.hypermodel.hyper_pipeline = None

# Initial space
tuner = am.tuner
tuner.hypermodel.hypermodel.set_fit_args(0.2, epochs=10)

# Populate initial search space.
hp = tuner.oracle.get_space()
tuner._prepare_model_IO(hp, dataset=dataset)
tuner.hypermodel.build(hp)
tuner.oracle.update_space(hp)

2021-12-01 06:19:55.571168: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-12-01 06:19:55.571582: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
[Stage 9:>                                                          (0 + 1) / 1]

In [9]:
estimators = []
trials = tuner.oracle.create_trials(1, tuner.tuner_id)
estimators = tuner.trials2estimators(trials, dataset)

estimator = estimators[0]
keras_utils = estimator._get_keras_utils()
run_id = estimator.getRunId()
input_shapes, output_shapes = estimator.get_model_shapes()
output_names = estimator.getModel().output_names
sample_weight_col = estimator.getSampleWeightCol()
make_dataset = keras_utils.make_dataset_fn(
        feature_columns, label_columns, sample_weight_col, metadata,
        input_shapes, output_shapes, output_names, 32)

In [10]:
floatx = tf.keras.backend.floatx()
tf.keras.backend.set_floatx(floatx)

from cerebro.backend import constants
def _data_readers_fn(remote_store, shard_count, schema_fields, avg_row_size, cache_size_limit, pool_type, num_readers):
    def _data_readers(index):
        from petastorm import make_reader

        PETASTORM_HDFS_DRIVER = constants.PETASTORM_HDFS_DRIVER

        train_reader = make_reader(remote_store.train_data_path, shuffle_row_groups=False, num_epochs=None,
                                   cur_shard=index,
                                   shard_count=shard_count,
                                   hdfs_driver=PETASTORM_HDFS_DRIVER,
                                   schema_fields=schema_fields,
                                   reader_pool_type=pool_type, workers_count=num_readers,
                                   cache_type='local-disk',
                                   cache_size_limit=cache_size_limit,
                                   cache_row_size_estimate=avg_row_size,
                                   cache_extra_settings={'cleanup': True})

        if remote_store.val_data_path != '' and remote_store.val_data_path is not None:
            val_reader = make_reader(remote_store.val_data_path, shuffle_row_groups=False, num_epochs=None,
                                     cur_shard=index,
                                     shard_count=shard_count,
                                     hdfs_driver=PETASTORM_HDFS_DRIVER,
                                     schema_fields=schema_fields,
                                     reader_pool_type=pool_type, workers_count=num_readers,
                                     cache_type='local-disk',
                                     cache_size_limit=cache_size_limit,
                                     cache_row_size_estimate=avg_row_size,
                                     cache_extra_settings={'cleanup': True})
        else:
            val_reader = None

        return train_reader, val_reader

    return _data_readers

In [11]:
# Autokeras model

trial = trials[0]
tuner._prepare_model_IO(trial.hyperparameters, dataset=dataset)
model = tuner.hypermodel.build(trial.hyperparameters)
tuner.adapt(model, dataset)
optimizer_real = tf.keras.optimizers.Adam(lr=0.001)
loss = 'categorical_crossentropy'
model.compile(optimizer=optimizer_real, loss=loss, metrics=['accuracy'])
model.save('debug_cpkt_ak.h5')

In [12]:
model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 28, 28, 1)]       0         
_________________________________________________________________
cast_to_float32 (CastToFloat (None, 28, 28, 1)         0         
_________________________________________________________________
separable_conv2d (SeparableC (None, 26, 26, 256)       521       
_________________________________________________________________
separable_conv2d_1 (Separabl (None, 24, 24, 32)        10528     
_________________________________________________________________
max_pooling2d (MaxPooling2D) (None, 12, 12, 32)        0         
_________________________________________________________________
flatten (Flatten)            (None, 4608)              0         
_________________________________________________________________
dropout (Dropout)            (None, 4608)              0     

[Stage 9:>                                                          (0 + 1) / 1]

In [13]:
tf.compat.v1.disable_eager_execution()

backend = ms.backend
remote_store = store.to_remote(backend.spark_job_group, None)
shard_count = backend._num_workers()
schema_fields = ms.feature_cols + ms.label_cols
_, _, _, avg_row_size = util.get_simple_meta_from_parquet(store, schema_fields, None, dataset_idx)
data_readers_fn = _data_readers_fn(remote_store, shard_count, schema_fields, avg_row_size,
                                               backend.settings.disk_cache_size_bytes,
                                               backend.settings.data_readers_pool_type, backend.settings.num_data_readers)
train_reader, val_reader = data_readers_fn(0)           
user_shuffle_buffer_size = estimator.getShufflingBufferSize()
if not user_shuffle_buffer_size:
    shuffle_buffer_size = 1024 * 3
else:
    shuffle_buffer_size = user_shuffle_buffer_size
train_data = make_dataset(train_reader, shuffle_buffer_size, shuffle=False)

Instructions for updating:
Use output_signature instead
Instructions for updating:
Use output_signature instead


[Stage 9:>                                                          (0 + 1) / 1]

In [16]:
# training 
import io
import h5py
import math

def _deserialize_keras_model_fn():
    def deserialize_keras_model(model_bytes, load_model_fn):
        """Deserialize model from byte array encoded in base 64."""
        # model_bytes = codec.loads_base64(model_bytes)
        bio = io.BytesIO(model_bytes)
        with h5py.File(bio, 'r') as f:
            return load_model_fn(f)

    return deserialize_keras_model

deserialize_keras_model = _deserialize_keras_model_fn()
custom_objects = estimator.getCustomObjects()
with tf.keras.utils.custom_object_scope(custom_objects):
        model = deserialize_keras_model(
        store.read('debug_cpkt_ak.h5'), lambda x: tf.keras.models.load_model(x))
steps_per_epoch = int(math.ceil(train_rows / 32 / ms.backend._num_workers()))
result = model.fit(train_data, epochs=1, steps_per_epoch=steps_per_epoch).history

Train on 1201 steps
 237/1201 [====>.........................] - ETA: 44s - batch: 118.0000 - size: 1.0000 - loss: 2.0814 - accuracy: 0.2744

[Stage 9:>                                                          (0 + 1) / 1]



[Stage 9:>                                                          (0 + 1) / 1]

In [None]:
# Pure keras model training using parquet

In [17]:
model = keras.Sequential(
    [
        keras.Input(shape=img_shape),
        layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Flatten(),
        layers.Dropout(0.5),
        layers.Dense(10, activation="softmax"),
    ]
)

optimizer = tf.keras.optimizers.Adam(lr=0.001)
loss = 'categorical_crossentropy'
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])
model.save('debug_cpkt_normal.h5')
# fit_sub_epoch_fn = keras_utils.fit_sub_epoch_fn()

In [18]:
# training 
import io
import h5py
import math

def _deserialize_keras_model_fn():
    def deserialize_keras_model(model_bytes, load_model_fn):
        """Deserialize model from byte array encoded in base 64."""
        # model_bytes = codec.loads_base64(model_bytes)
        bio = io.BytesIO(model_bytes)
        with h5py.File(bio, 'r') as f:
            return load_model_fn(f)

    return deserialize_keras_model

deserialize_keras_model = _deserialize_keras_model_fn()
custom_objects = estimator.getCustomObjects()
with tf.keras.utils.custom_object_scope(custom_objects):
        model = deserialize_keras_model(
        store.read('debug_cpkt_normal.h5'), lambda x: tf.keras.models.load_model(x))
steps_per_epoch = int(math.ceil(train_rows / 32 / ms.backend._num_workers()))
result = model.fit(train_data, epochs=1, steps_per_epoch=steps_per_epoch).history

Train on 1201 steps

KeyboardInterrupt: 

In [57]:
from petastorm.tf_utils import make_petastorm_dataset
from petastorm import make_reader
shard_count = backend._num_workers()
PETASTORM_HDFS_DRIVER = constants.PETASTORM_HDFS_DRIVER
pool_type = backend.settings.data_readers_pool_type
num_readers = backend.settings.num_data_readers
cache_size_limit = backend.settings.disk_cache_size_bytes
train_reader = make_reader(remote_store.train_data_path, 
                           shuffle_row_groups=False, num_epochs=None,
                           cur_shard=0,
                           shard_count=shard_count,
                           hdfs_driver=PETASTORM_HDFS_DRIVER,
                           schema_fields=schema_fields,
                           reader_pool_type=pool_type, workers_count=num_readers,
                           cache_type='local-disk',
                           cache_size_limit=cache_size_limit,
                           cache_row_size_estimate=avg_row_size,
                           cache_extra_settings={'cleanup': True})

In [58]:
dataset = make_petastorm_dataset(train_reader)

Instructions for updating:
Use output_signature instead
Instructions for updating:
Use output_signature instead


In [59]:
dataset

<DatasetV1Adapter shapes: petastorm_schema_view_view(features=(784,), label_OHE=(10,)), types: petastorm_schema_view_view(features=tf.float64, label_OHE=tf.float64)>

[Stage 9:>                                                          (0 + 1) / 1]

In [60]:
from cerebro.keras.spark.util import _prep_data_fn
has_sparse_col = any(metadata[col]['is_sparse_vector_only']
                             for col in label_columns + feature_columns)

prep_data_tf_keras = _prep_data_fn(
            has_sparse_col, sample_weight_col, feature_columns,
            label_columns, input_shapes, output_shapes, output_names)

bdataset = dataset.batch(32) \
        .map(prep_data_tf_keras, num_parallel_calls=tf.data.experimental.AUTOTUNE)
bdataset

<DatasetV1Adapter shapes: (((None, 28, 28, 1),), ((None, 10),)), types: ((tf.float64,), (tf.float64,))>

In [61]:
for d in bdataset:
    print(d)

RuntimeError: __iter__() is only supported inside of tf.function or when eager execution is enabled.

In [62]:
tf.compat.v1.disable_eager_execution()

model = keras.Sequential(
    [
        keras.Input(shape=img_shape),
        layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),
        layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Flatten(),
        layers.Dropout(0.5),
        layers.Dense(10, activation="softmax"),
    ]
)

optimizer = tf.keras.optimizers.Adam(lr=0.001)
loss = 'categorical_crossentropy'
model.compile(optimizer=optimizer, loss=loss, metrics=['accuracy'])

In [63]:
model.fit(bdataset)



2021-12-01 05:17:03.443549: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:196] None of the MLIR optimization passes are enabled (registered 0 passes)


Train on None steps
    668/Unknown - 21s 31ms/step - batch: 333.5000 - size: 1.0000 - loss: 0.8228 - accuracy: 0.7258

[Stage 9:>                                                          (0 + 1) / 1]

   1244/Unknown - 38s 30ms/step - batch: 621.5000 - size: 1.0000 - loss: 0.5248 - accuracy: 0.8285 36s 30ms/step - batch: 602.5000 - size: 1.

KeyboardInterrupt: 