### Horovod on Spark

Source: https://horovod.readthedocs.io/en/stable/spark.html

**PURPOSE:**  
Demo of distributed model training using Horovod with Spark.

The Estimator API abstracts the data processing (from Spark DataFrames to deep learning datasets)

In [2]:
! pip install horovod[spark]

Defaulting to user installation because normal site-packages is not writeable


Load libraries

In [1]:
import os

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import VectorAssembler

import horovod.spark.keras as hvd

Import data and preprocess

In [2]:
DATA_DIR = '/sfs/gpfs/tardis/home/apt4c/distributed_computing/04_mllib_intro_and_supervised_learning/'
DATA_FILENAME = 'wisc_breast_cancer_w_fields.csv'
DATA_FILEPATH = os.path.join(DATA_DIR, DATA_FILENAME)

In [3]:
spark = SparkSession.builder \
        .master("local") \
        .appName("mllib_classifier") \
        .getOrCreate()

/opt/conda/lib/python3.7/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/10/24 18:42:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
df = spark.read.csv(DATA_FILEPATH, header=True, inferSchema=True)

                                                                                

In [5]:
diag_ind = when(col("diagnosis") == 'M', 1).otherwise(0)
df = df.withColumn("y", diag_ind)

In [6]:
train_df, test_df = df.randomSplit([0.6, 0.4], seed = 314)

In [7]:
assembler = VectorAssembler(inputCols=["f1", "f2"], outputCol="features")
train_df = assembler.transform(train_df)

In [8]:
train_df.select('y', 'features').show(10)

24/10/24 18:43:07 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+---+-------------+
|  y|     features|
+---+-------------+
|  1|[15.46,19.48]|
|  0|[12.89,13.12]|
|  0| [14.96,19.1]|
|  1|[13.17,18.66]|
|  0|[12.18,17.84]|
|  1|[22.27,19.67]|
|  1|[18.66,17.12]|
|  0|[11.15,13.08]|
|  0|  [10.8,9.71]|
|  1|[13.43,19.63]|
+---+-------------+
only showing top 10 rows



                                                                                

Set up neural network

In [9]:
model = Sequential()
model.add(Dense(8, activation = 'tanh', input_dim=2))
model.add(Dense(1, activation = 'sigmoid'))

Instructions for updating:
If using Keras pass *_constraint arguments to layers.


In [10]:
# NOTE: unscaled learning rate
optimizer = tf.keras.optimizers.SGD(learning_rate=0.1)
loss = 'binary_crossentropy'

Set up intermediate storage

In [11]:
from horovod.spark.common.store import Store

store = Store.create('/tmp/horovod/experiment')

In [12]:
keras_estimator = hvd.KerasEstimator(
    num_proc=2,
    model=model,
    store=store,
    optimizer=optimizer,
    loss=loss,
    feature_cols=['features'],
    label_cols=['y'],
    batch_size=32,
    epochs=1)

In [None]:
keras_model = keras_estimator.fit(train_df) \
    .setOutputCols(['predict'])

num_partitions=20
writing dataframes
train_data_path=file:///tmp/horovod/experiment/intermediate_train_data.0
val_data_path=file:///tmp/horovod/experiment/intermediate_val_data.0


                                                                                

train_partitions=20


                                                                                

train_rows=354
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


  train_data_schema = train_data.schema.to_arrow_schema()
  for piece in dataset.pieces:
  metadata, avg_row_size = make_metadata_dictionary(train_data_schema)
2024-10-24 14:43:32.119995: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 FMA
2024-10-24 14:43:32.192507: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 3000010000 Hz
2024-10-24 14:43:32.192829: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x557d21200bd0 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2024-10-24 14:43:32.193485: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
[Stage 6:>                                                          (0 + 1) / 2]

In [None]:
predict_df = keras_model.transform(test_df)