# Distributed FWI with Devito using Tensorflow and Dask

## Inversion Computational Setup

### Accessing Google Cloud Storage

In [1]:
import cloudpickle as pickle
from google.cloud import storage

saved_true_model_file = "true_model"
saved_smooth_model_file = "smooth_model"
saved_shot_data_file_prefix = "shot"
bucket_name = "fwi-data"


def create_bucket(bucket_name=bucket_name):
    """Creates a new bucket."""
    storage_client = storage.Client()
    bucket = storage_client.lookup_bucket(bucket_name)
    if not bucket: 
        bucket = storage_client.create_bucket(bucket_name)
    print('Bucket {} created'.format(bucket.name))
    
def delete_bucket(bucket_name=bucket_name):
    """Deletes a bucket. The bucket must be empty."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    if bucket: 
        bucket.delete()
    print('Bucket {} deleted'.format(bucket.name))

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print('File {} uploaded to {}.'.format(
        source_file_name,
        destination_blob_name))
    
def download_blob(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(source_blob_name)

    blob.download_to_filename(destination_file_name)

    print('Blob {} downloaded to {}.'.format(
        source_blob_name,
        destination_file_name))

def dump_model(model, file_name):
    pickle.dump(model, open(file_name, 'wb'))


def get_model(file_name):
    return pickle.load(open(file_name, "rb"))
        

def dump_shot_data(file_name, shot_id, src, true_data):
    pickle.dump({'src': src, 'rec': true_data}, open(file_name, 'wb'))


def get_shot_data(file_name, shot_id, dt):
    shot_data = pickle.load(open(file_name, 'rb'))
    shot_data['src'] = shot_data['src'].resample(dt)
    shot_data['rec'] = shot_data['rec'].resample(dt)
    return shot_data


def upload_model_to_gcloud(model, model_file):
    dump_model(model, model_file)
    upload_blob(bucket_name, model_file, model_file)

    
def download_model_from_gcloud(model_file):
    download_blob(bucket_name, model_file, model_file)
    return get_model(model_file)


def upload_shot_to_gcloud(shot_id, src, true_data):
    file_name = '{}_{}'.format(saved_shot_data_file_prefix, shot_id)
    dump_shot_data(file_name, shot_id, src, true_data)
    upload_blob(bucket_name, file_name, file_name)

    
def download_shot_from_gcloud(shot_id, dt):
    file_name = '{}_{}'.format(saved_shot_data_file_prefix, shot_id)
    download_blob(bucket_name, file_name, file_name)
    return get_shot_data(file_name, shot_id, dt)

create_bucket()

Bucket fwi-data created


### Setting up (synthetic) data

In [2]:
from examples.seismic import TimeAxis, RickerSource, Receiver
from examples.seismic.acoustic import AcousticWaveSolver

def generate_shot_data(t0, tn, f0, shots, receivers, client):
    params = [t0, tn, f0, shots, receivers]
    work = [params + [shot_id] for shot_id in range(shots)]

    fgi = [client.submit(generate_shot_data_i, *job) for job in work]

    wait(fgi)


def generate_shot_data_i(t0, tn, f0, shots, receivers, shot_id):
    true_model = download_model_from_gcloud(saved_true_model_file)

    # Time step from model grid spacing
    dt = true_model.critical_dt

    # Acquisitional Geometry
    time_range = TimeAxis(start=t0, stop=tn, step=dt)
    src = RickerSource(name="src", grid=true_model.grid, f0=f0,
                       time_range=time_range)
    
    src.coordinates.data[0, :] = [30, shot_id*1000./(shots-1)]
    
    rec = Receiver(name="rec", grid=true_model.grid, time_range=time_range,
                   npoint=receivers)
    rec.coordinates.data[:, 1] = np.linspace(0, true_model.domain_size[0],
                                             num=receivers)
    rec.coordinates.data[:, 0] = 980.  # 20m from the right end

    # set up solver
    solver = AcousticWaveSolver(true_model, src, rec, space_order=4)

    # generate synthetic receiver data from true model
    true_data, _, _ = solver.forward(src=src, m=true_model.m)
    
    upload_shot_to_gcloud(shot_id, src, true_data)
    
    

### Computational Consideration

In [3]:
shape = (101, 101)
spacing = (10., 10.)
origin = (0., 0.)

shots = 9
receivers = 101
epochs = 5

t0 = 0.
tn = 1000.
f0 = 0.01

### True and Smooth Models

In [4]:
from examples.seismic import demo_model, plot_velocity, plot_perturbation
from devito import configuration
from distributed import Client, LocalCluster, wait

configuration['log_level'] = 'WARNING'

true_model = demo_model('circle-isotropic', vp=3.0, vp_background=2.5,
                        origin=origin, shape=shape, spacing=spacing, nbpml=40)

smooth_model = demo_model('circle-isotropic', vp=2.5, vp_background=2.5,
                          origin=origin, shape=shape, spacing=spacing, nbpml=40)

## Dask Specifics

In [5]:
class fg_pair:
    def __init__(self, f, g):
        self.f = f
        self.g = g

    def __add__(self, other):
        f = self.f + other.f
        g = self.g + other.g
        return fg_pair(f, g)

    def __radd__(self, other):
        if other == 0:
            return self
        else:
            return self.__add__(other)

## Operators for gradient based inversion

In [6]:
from devito import Function

def fwi_gradient_i(shot_id):
    from devito import clear_cache
    
    clear_cache()

    smooth_model = download_model_from_gcloud(saved_smooth_model_file)

    params = download_shot_from_gcloud(shot_id, smooth_model.critical_dt)
    src = params['src']
    rec = params['rec']
    
    solver = AcousticWaveSolver(smooth_model, src, rec, space_order=4)

    smooth_data, u0, _ = solver.forward(src=src, m=smooth_model.m, save=True)

    residual = Receiver(name='rec', grid=smooth_model.grid,
                        time_range=rec.time_range,
                        coordinates=rec.coordinates.data)

    residual.data[:] = smooth_data.data[:] - rec.data[:]

    f = .5*np.linalg.norm(residual.data.flatten())**2

    grad = Function(name="grad", grid=smooth_model.grid)
    solver.gradient(rec=residual, u=u0, m=smooth_model.m, grad=grad)

    g = np.array(grad.data[:])

    return fg_pair(f, g)

In [7]:
def fwi_gradient(client, smooth_model, shots):
    upload_model_to_gcloud(smooth_model, saved_smooth_model_file)

    # Distribute job to workers - equivalent to the use of client.map(..)
    fgi = [client.submit(fwi_gradient_i, shot_id) for shot_id in range(shots)]

    # Distribute worklist to workers
    total = client.submit(sum, fgi)
    fg = total.result()

    return fg.f, fg.g

## FWI with Tensorflow

In [10]:
import socket
import tensorflow as tf
import numpy as np
import functools as ft
from dask.distributed import Client, LocalCluster, wait
from dask_kubernetes import KubeCluster, make_pod_spec

class TF_Devito_Fwi:

    def __init__(self, nshots, nreceivers, true_model, smooth_model,
                 t0, tn, f0, optimizer_func, hparams, workers=2):
        # True and Smooth velocity models
        upload_model_to_gcloud(true_model, saved_true_model_file)
        self.smooth_model = smooth_model
        true_data = true_model.m.data
        
        # Cluster setup
        scheduler_ip = socket.gethostbyname(socket.gethostname())
        scheduler_port = 30000
        scheduler_address = str(scheduler_ip) + ":" + str(scheduler_port)
        cluster = KubeCluster.from_yaml('config/worker.yaml',
                                        port=scheduler_port,
                                        env={'DASK_SCHEDULER_ADDRESS': scheduler_address})
        cluster.scale(nshots)
        client = Client(cluster)
        self.client = client
        """
        cluster = LocalCluster(n_workers=workers, death_timeout=600)
        client = Client(cluster)
        """
        fwi_gradient_call = ft.partial(fwi_gradient, client, smooth_model, nshots)

        generate_shot_data(t0, tn, f0, nshots, nreceivers, client)

        # Create the tf graph
        self.smooth_data = tf.Variable(self.smooth_model.m.data)
        f, g = tf.py_func(fwi_gradient_call, [], [tf.float64, tf.float32])
        if 'learning_rate' in list(hparams.keys()):
            alpha = hparams['learning_rate']
        else: 
            alpha = 0.005 / tf.reduce_max(g)
        optimizer = optimizer_func(alpha, **hparams)
        
        self.train_op = optimizer.apply_gradients([(g, self.smooth_data)])
        
        self.relative_error = tf.norm((self.smooth_data-true_data)/true_data)
        
        self.sess = tf.Session()
        self.sess.run(tf.global_variables_initializer())    
        
    def train(self, epochs):
        for i in range(0, epochs):
            _, re = self.sess.run([self.train_op, self.relative_error])
            yield re
        self.smooth_model.m.data[:] = self.smooth_data.eval(session=self.sess)

### Initialise FWI process

In [11]:
optimizer = tf.train.MomentumOptimizer

hparam = {'momentum': 0.4, 'use_nesterov': True}

fwi = TF_Devito_Fwi(shots, receivers, true_model, smooth_model,
                    t0, tn, f0, optimizer, hparam, 5)

relative_losses = np.zeros((epochs, 1))

for i, rl in enumerate(fwi.train(epochs)):
    print("Epoch: {}, Relative Losses: {}".format(i, rl))
    relative_losses[i] = rl

File true_model uploaded to true_model.


ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'bab67c18-3230-44cf-8858-723aa4269c66', 'Content-Type': 'application/json', 'Date': 'Mon, 08 Oct 2018 17:02:37 GMT', 'Content-Length': '1335'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod in version \"v1\" cannot be handled as a Pod: v1.Pod: Spec: v1.PodSpec: Containers: []v1.Container: v1.Container: Resources: v1.ResourceRequirements: Limits: unmarshalerDecoder: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$', parsing 665 ...IMIT_CPU)\"... at {\"kind\": \"Pod\", \"metadata\": {\"generateName\": \"dask-root-05be6f26-4\", \"labels\": {\"role\": \"worker\", \"dask.pydata.org/cluster-name\": \"dask-root-05be6f26-4\", \"user\": \"root\", \"app\": \"dask\", \"component\": \"dask-worker\"}, \"namespace\": \"default\"}, \"spec\": {\"containers\": [{\"args\": [\"dask-worker\", \"$(DASK_SCHEDULER_ADDRESS)\", \"--nthreads\", \"1\", \"--no-bokeh\", \"--memory-limit\", \"1GB\", \"--death-timeout\", \"60\"], \"env\": [{\"name\": \"DASK_SCHEDULER_ADDRESS\", \"value\": \"tcp://10.8.1.5:30000\"}, {\"name\": \"DASK_SCHEDULER_ADDRESS\", \"value\": \"10.8.1.5:30000\"}], \"image\": \"$(WORKER_IMAGE)\", \"imagePullPolicy\": \"Always\", \"name\": \"fwi-shot\", \"resources\": {\"limits\": {\"cpu\": \"$(LIMIT_CPU)\", \"memory\": \"$(LIMIT_MEMORY)\"}, \"requests\": {\"cpu\": \"$(REQUEST_CPU)\", \"memory\": \"$(REQUEST_MEMORY)\"}}}], \"restartPolicy\": \"Never\"}}","reason":"BadRequest","code":400}



In [None]:
nbpml = true_model.nbpml
smooth_model.vp = np.sqrt(1. / smooth_model.m.data[nbpml:-nbpml, nbpml:-nbpml])

plot_velocity(smooth_model)

In [None]:
#NBVAL_SKIP
import matplotlib.pyplot as plt

# Plot objective function decrease
plt.figure()
plt.loglog(relative_losses)
plt.xlabel('Iteration number')
plt.ylabel('True relative error')
plt.title('Convergence')
plt.show()