In [1]:
import tensorflow as tf
import json
import os
import numpy as np

In [2]:
def fit_node(nodes=1, lbatch=16, tsamples=None):
    samples_per_node = ( tsamples - (tsamples % nodes) ) / nodes
    samples_per_node = samples_per_node - (samples_per_node % lbatch)
    print("samples per node {}".format(samples_per_node))

In [3]:
# Set up input files
path = '/lambda_stor/data/enums-100-test/'
filenames = [
    'ml.ADRP_6W02_A_1_H.Orderable_zinc_db_enaHLL.sorted.4col.descriptors.parquet.xform-smiles.csv.reg.00',
    #'ml.ADRP_6W02_A_1_H.Orderable_zinc_db_enaHLL.sorted.4col.descriptors.parquet.xform-smiles.csv.reg.01',
    #'ml.ADRP_6W02_A_1_H.Orderable_zinc_db_enaHLL.sorted.4col.descriptors.parquet.xform-smiles.csv.reg.02',
    #'ml.ADRP_6W02_A_1_H.Orderable_zinc_db_enaHLL.sorted.4col.descriptors.parquet.xform-smiles.csv.reg.03'
]
for n in range(len(filenames)):
    filenames[n] = path + filenames[n]


In [4]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [5]:


# os.environ['TF_CONFIG'] = json.dumps(tf_config)

tf_config = {
    'cluster': {
        'worker': ['rbdgx1.cels.anl.gov:12345', 'rbdgx2.cels.anl.gov:12346']
    },
    'task': {'type': 'worker', 'index': 1}
}

tf_config = {
    'cluster': {
        'worker': ['rbdgx1.cels.anl.gov:12345', 'rbdgx2.cels.anl.gov:12346']
    },
    'task': {'type': 'worker', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

num_workers = len(tf_config['cluster']['worker'])
print(os.environ['TF_CONFIG'])     

{"cluster": {"worker": ["rbdgx1.cels.anl.gov:12345", "rbdgx2.cels.anl.gov:12346"]}, "task": {"type": "worker", "index": 0}}


In [6]:
mirrored_strategy = tf.distribute.MirroredStrategy()


# Create a tf.data.Dataset object.
dataset = tf.data.experimental.CsvDataset(
    filenames,
    [tf.float32, tf.string]
)

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.FILE
dataset = dataset.with_options(options)
print( type(dataset) )

INFO:tensorflow:Initializing local devices since in-graph multi-worker training with `MirroredStrategy` is not supported in eager mode. TF_CONFIG will be ignored when when initializing `MirroredStrategy`.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3', '/job:localhost/replica:0/task:0/device:GPU:4', '/job:localhost/replica:0/task:0/device:GPU:5', '/job:localhost/replica:0/task:0/device:GPU:6', '/job:localhost/replica:0/task:0/device:GPU:7')
<class 'tensorflow.python.data.ops.dataset_ops._OptionsDataset'>


2022-08-22 19:19:46.417617: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-08-22 19:19:50.221824: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 38420 MB memory:  -> device: 0, name: NVIDIA A100-SXM4-40GB, pci bus id: 0000:07:00.0, compute capability: 8.0
2022-08-22 19:19:50.224749: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:1 with 38420 MB memory:  -> device: 1, name: NVIDIA A100-SXM4-40GB, pci bus id: 0000:0f:00.0, compute capability: 8.0
2022-08-22 19:19:50.227410: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/devi

In [7]:
# iterator = iter(dataset)
# iterator.get_next()
# iterator.get_next()[0]
# iterator.get_next()[1]
# x = iterator.get_next()[1]
# y = iterator.get_next()[0]

In [8]:
import numpy as np
np_dataset = np.array(list(dataset.as_numpy_iterator()))
np_dataset


array([[b'13.835799',
        b'c1cc2c(cc1F)c(c[nH]2)CCNC(=O)[C@@H]3CCC[N@](C3)c4c5c([nH]cn5)ncn4'],
       [b'13.835799',
        b'c1cc2c(cc1F)c(c[nH]2)CCNC(=O)[C@@H]3CCC[N@](C3)c4c5c([nH]cn5)ncn4'],
       [b'13.633498',
        b'c1ccc(cc1)OCC[C@@]2(CCC[N@@](C2)c3c4cc[nH]c4ncn3)CO'],
       ...,
       [b'3.751021',
        b'Cc1cc(n2c(n1)c3c(n2)NC(=O)C[C@@H]3c4ccc5c(c4)nnn5C)C'],
       [b'3.699315',
        b'CCOc1ccc(cc1)c2nc(cs2)CC(=O)Nc3nc(c(s3)SCC(=O)N)C'],
       [b'3.727015', b'c1cnc(nc1)N2CCC[C@H]2c3cc([nH]n3)CCO']],
      dtype='|S150')

In [9]:
x_train = np_dataset[:,1]
y_train = np_dataset[:,0].reshape(-1,1)

In [13]:
x_train.shape[0]

500000

In [14]:
vocab_size = 40000  # 
maxlen = 250  # 

vectorize_layer = tf.keras.layers.TextVectorization(
    output_mode='int',
    standardize=None,
    max_tokens=vocab_size,
    split='character',
    output_sequence_length=maxlen,
    pad_to_max_tokens=True
)
samples=x_train.shape[0]
batch_size=1000
steps=samples/batch_size
vectorize_layer.adapt(x_train, batch_size=batch_size, steps=steps)

In [15]:
fn = lambda x: vectorize_layer(x)
x_train = fn(x_train)

In [16]:
print(x_train)
print(y_train)

tf.Tensor(
[[2 9 2 ... 0 0 0]
 [2 9 2 ... 0 0 0]
 [2 9 2 ... 0 0 0]
 ...
 [3 2 9 ... 0 0 0]
 [3 3 6 ... 0 0 0]
 [2 9 2 ... 0 0 0]], shape=(500000, 250), dtype=int64)
[[b'13.835799']
 [b'13.835799']
 [b'13.633498']
 ...
 [b'3.751021']
 [b'3.699315']
 [b'3.727015']]
