In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import tensorflow as tf
import numpy as np
import healpy as hp
import matplotlib.pyplot as plt

from time import time
from icecream import ic

from msfm.utils import analysis, parameters
from msfm import fiducial_pipeline

import matplotlib.pyplot as plt

### strategy

In [3]:
strategy = tf.distribute.MirroredStrategy()
# strategy = tf.distribute.MirroredStrategy(['/gpu:0', '/gpu:1'])
# strategy = tf.distribute.get_strategy()

print(tf.config.list_physical_devices("GPU"))
print(strategy.num_replicas_in_sync)

n_gpus = strategy.num_replicas_in_sync

2023-03-02 08:58:51.910426: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-03-02 08:58:53.730630: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 38277 MB memory:  -> device: 0, name: NVIDIA A100-SXM4-40GB, pci bus id: 0000:03:00.0, compute capability: 8.0
2023-03-02 08:58:53.733067: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/device:GPU:1 with 38277 MB memory:  -> device: 1, name: NVIDIA A100-SXM4-40GB, pci bus id: 0000:41:00.0, compute capability: 8.0
2023-03-02 08:58:53.734216: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1532] Created device /job:localhost/replica:0/task:0/devi

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:2', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:3', device_type='GPU')]
4


### constants

In [4]:
conf = analysis.load_config()

tfr_pattern = "/pscratch/sd/a/athomsen/DESY3/v2/fiducial/DESy3_fiducial_???.tfrecord"

params = ["Om"]
pert_labels = parameters.get_fiducial_perturbation_labels(params)
n_perts = len(pert_labels)

global_batch_size = n_gpus
local_batch_size = global_batch_size // n_gpus

ic(global_batch_size)
ic(local_batch_size)

n_readers = 1
n_prefetch = 3

23-03-02 08:58:54  analysis.py INF   Loaded the config 


ic| global_batch_size: 4
ic| local_batch_size: 1


# Every example is seen exactly once (in eval mode)

In [5]:
n_runs = 200
n_patches = 4
n_examples = n_runs * n_patches

print(n_examples/global_batch_size)

200.0


## not distributed

In [6]:
# this should contain 800/global_batch_size batches
dset = fiducial_pipeline.get_fiducial_dset(
    tfr_pattern,
    pert_labels,
    global_batch_size,
    conf=None,
    i_noise=0,
    n_readers=n_readers,
    n_prefetch=n_prefetch,
    is_eval=True
)

23-03-02 08:58:54 fiducial_pip INF   Starting to generate the fiducial training set for i_noise = 0 
23-03-02 08:58:54  analysis.py INF   Loaded the config 
23-03-02 08:58:54  analysis.py INF   Loaded the pixel file 
23-03-02 08:58:54  analysis.py INF   Loaded the config 
23-03-02 08:58:54  analysis.py INF   Loaded the pixel file 
[1m[93m23-03-02 08:58:55 tfrecords.py WAR   Tracing parse_inverse_fiducial [0m
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module, class, method, function, traceback, frame, or code object was expected, got cython_function_or_method
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module, class, method, function, traceback, frame, or code object was expected, got cython_function_or_method
23-03-02 08:58:56  analysis.py 

In [7]:
indices = []

i = 0
for data_vectors, index in dset:
    if i % 10 == 0:
        print(i)
    i += 1
    
    indices.append(index[0])
    
index_tensor = tf.concat(indices, axis=0)

0
10
20
30
40
50
60
70
80
90
100
110
120
130
140
150
160
170
180
190


### every index is included exactly once

In [8]:
index_array = index_tensor.numpy()

vals, counts = np.unique(index_array, return_counts=True)
# should be 800
print(len(vals))
# should only contain 1
print(np.unique(counts))

800
[1]


## distributed with strategy

In [9]:
# don't shuffle
# don't repeat
def dataset_fn(input_context):
    dset = fiducial_pipeline.get_fiducial_dset(
        tfr_pattern,
        pert_labels,
        local_batch_size,
        conf=None,
        i_noise=0,
        n_readers=n_readers,
        n_prefetch=n_prefetch,
        input_context=input_context,
        is_eval=True
    )

    return dset

dist_dset = strategy.distribute_datasets_from_function(dataset_fn)

23-03-02 08:59:26 fiducial_pip INF   Starting to generate the fiducial training set for i_noise = 0 
23-03-02 08:59:26  analysis.py INF   Loaded the config 
23-03-02 08:59:26  analysis.py INF   Loaded the pixel file 
23-03-02 08:59:27  analysis.py INF   Loaded the config 
23-03-02 08:59:27  analysis.py INF   Loaded the pixel file 
23-03-02 08:59:27 fiducial_pip INF   Sharding the dataset according to the input_context 
[1m[93m23-03-02 08:59:27 tfrecords.py WAR   Tracing parse_inverse_fiducial [0m
23-03-02 08:59:27  analysis.py INF   Loaded the config 
[1m[93m23-03-02 08:59:27 fiducial_pip WAR   Tracing dset_add_bias [0m
[1m[93m23-03-02 08:59:27 fiducial_pip WAR   Tracing dset_add_noise [0m
23-03-02 08:59:27 fiducial_pip INF   Batching into 1 elements locally 
[1m[93m23-03-02 08:59:27 fiducial_pip WAR   Tracing dset_concat_perts [0m
23-03-02 08:59:27 fiducial_pip INF   Successfully generated the fiducial training set with element_spec (TensorSpec(shape=(3, 463872, 4), dtype=

In [10]:
indices = []

i = 0
for data_vectors, index in dist_dset:
    if i % 10 == 0:
        print(i)
    i += 1
    
    indices.append(strategy.gather(index[0], axis=0))
    
index_tensor = tf.stack(indices)

0
INFO:tensorflow:Gather to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Gather to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Gather to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Gather to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Gather to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Gather to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Gather to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Ga

### every index is included exactly once

In [11]:
index_array = index_tensor.numpy().ravel()

vals, counts = np.unique(index_array, return_counts=True)
# should be 800
print(len(vals))
# should only contain 1
print(np.unique(counts))

800
[1]


### the different workers are disjoint

In [12]:
index_array = index_tensor.numpy()

for i in range(index_array.shape[1]):
    exclude_index = i
    bool_index = np.arange(n_gpus) != exclude_index

    # intersection between the ith worker and all of the rest
    print(np.intersect1d(index_array[:,i], index_array[:,bool_index].ravel()))


[]
[]
[]
[]


# simple example

In [13]:
import tensorflow as tf
from icecream import ic

In [14]:
# this is at the start of the real pipeline
tfr_pattern = "/pscratch/sd/a/athomsen/DESY3/v2/fiducial/DESy3_fiducial_???.tfrecord"
dset = tf.data.Dataset.list_files(tfr_pattern, shuffle=False)

for x in dset:
    print(x)

tf.Tensor(b'/pscratch/sd/a/athomsen/DESY3/v2/fiducial/DESy3_fiducial_000.tfrecord', shape=(), dtype=string)
tf.Tensor(b'/pscratch/sd/a/athomsen/DESY3/v2/fiducial/DESy3_fiducial_001.tfrecord', shape=(), dtype=string)
tf.Tensor(b'/pscratch/sd/a/athomsen/DESY3/v2/fiducial/DESy3_fiducial_002.tfrecord', shape=(), dtype=string)
tf.Tensor(b'/pscratch/sd/a/athomsen/DESY3/v2/fiducial/DESy3_fiducial_003.tfrecord', shape=(), dtype=string)
tf.Tensor(b'/pscratch/sd/a/athomsen/DESY3/v2/fiducial/DESy3_fiducial_004.tfrecord', shape=(), dtype=string)
tf.Tensor(b'/pscratch/sd/a/athomsen/DESY3/v2/fiducial/DESy3_fiducial_005.tfrecord', shape=(), dtype=string)
tf.Tensor(b'/pscratch/sd/a/athomsen/DESY3/v2/fiducial/DESy3_fiducial_006.tfrecord', shape=(), dtype=string)
tf.Tensor(b'/pscratch/sd/a/athomsen/DESY3/v2/fiducial/DESy3_fiducial_007.tfrecord', shape=(), dtype=string)
tf.Tensor(b'/pscratch/sd/a/athomsen/DESY3/v2/fiducial/DESy3_fiducial_008.tfrecord', shape=(), dtype=string)
tf.Tensor(b'/pscratch/sd/a/a

In [15]:
# this can be run separately from above
mirrored_strategy = tf.distribute.MirroredStrategy()
n_replicas = mirrored_strategy.num_replicas_in_sync

global_batch_size = 8
local_batch_size = global_batch_size//n_replicas

ic(n_replicas)
ic(global_batch_size)
ic(local_batch_size);

INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')


ic| n_replicas: 4
ic| global_batch_size: 8
ic| local_batch_size: 2


### multiply the number of elements to take by the number of replicas

In [16]:
n_steps = 5

def dataset_fn(input_context):
    dataset = tf.data.Dataset.range(64)
    dataset = dataset.shard(input_context.num_input_pipelines, input_context.input_pipeline_id)
    dataset = dataset.batch(local_batch_size)
    
    dataset = dataset.take(n_replicas*n_steps)
    return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)

i = 0
for x in dist_dataset:
    print(i)
    print(x, "\n")
    
    i += 1

0
PerReplica:{
  0: tf.Tensor([0 1], shape=(2,), dtype=int64),
  1: tf.Tensor([2 3], shape=(2,), dtype=int64),
  2: tf.Tensor([4 5], shape=(2,), dtype=int64),
  3: tf.Tensor([6 7], shape=(2,), dtype=int64)
} 

1
PerReplica:{
  0: tf.Tensor([8 9], shape=(2,), dtype=int64),
  1: tf.Tensor([10 11], shape=(2,), dtype=int64),
  2: tf.Tensor([12 13], shape=(2,), dtype=int64),
  3: tf.Tensor([14 15], shape=(2,), dtype=int64)
} 

2
PerReplica:{
  0: tf.Tensor([16 17], shape=(2,), dtype=int64),
  1: tf.Tensor([18 19], shape=(2,), dtype=int64),
  2: tf.Tensor([20 21], shape=(2,), dtype=int64),
  3: tf.Tensor([22 23], shape=(2,), dtype=int64)
} 

3
PerReplica:{
  0: tf.Tensor([24 25], shape=(2,), dtype=int64),
  1: tf.Tensor([26 27], shape=(2,), dtype=int64),
  2: tf.Tensor([28 29], shape=(2,), dtype=int64),
  3: tf.Tensor([30 31], shape=(2,), dtype=int64)
} 

4
PerReplica:{
  0: tf.Tensor([32 33], shape=(2,), dtype=int64),
  1: tf.Tensor([34 35], shape=(2,), dtype=int64),
  2: tf.Tensor([36 37],

### use input_context.num_input_pipelines instead of input_context.num_replicas_in_sync

In [17]:
def dataset_fn(input_context):
    dataset = tf.data.Dataset.range(64)
    dataset = dataset.shard(input_context.num_input_pipelines, input_context.input_pipeline_id)
    dataset = dataset.batch(local_batch_size)
    return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)

i = 0
for x in dist_dataset:
    print(i)
    print(x, "\n")
    
    i += 1

0
PerReplica:{
  0: tf.Tensor([0 1], shape=(2,), dtype=int64),
  1: tf.Tensor([2 3], shape=(2,), dtype=int64),
  2: tf.Tensor([4 5], shape=(2,), dtype=int64),
  3: tf.Tensor([6 7], shape=(2,), dtype=int64)
} 

1
PerReplica:{
  0: tf.Tensor([8 9], shape=(2,), dtype=int64),
  1: tf.Tensor([10 11], shape=(2,), dtype=int64),
  2: tf.Tensor([12 13], shape=(2,), dtype=int64),
  3: tf.Tensor([14 15], shape=(2,), dtype=int64)
} 

2
PerReplica:{
  0: tf.Tensor([16 17], shape=(2,), dtype=int64),
  1: tf.Tensor([18 19], shape=(2,), dtype=int64),
  2: tf.Tensor([20 21], shape=(2,), dtype=int64),
  3: tf.Tensor([22 23], shape=(2,), dtype=int64)
} 

3
PerReplica:{
  0: tf.Tensor([24 25], shape=(2,), dtype=int64),
  1: tf.Tensor([26 27], shape=(2,), dtype=int64),
  2: tf.Tensor([28 29], shape=(2,), dtype=int64),
  3: tf.Tensor([30 31], shape=(2,), dtype=int64)
} 

4
PerReplica:{
  0: tf.Tensor([32 33], shape=(2,), dtype=int64),
  1: tf.Tensor([34 35], shape=(2,), dtype=int64),
  2: tf.Tensor([36 37],