In [1]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import pickle
import seaborn as sns

# BNN Modules
import pyro
import pyro.distributions as dist
from pyro.nn import PyroModule, PyroSample
import torch.nn as nn
from sklearn.model_selection import train_test_split

from pyro.infer import MCMC, NUTS

from pyro.infer.autoguide import AutoDiagonalNormal
from pyro.infer import SVI, Trace_ELBO, Predictive
from tqdm.auto import trange, tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
ground_motion = pd.read_pickle("data/processed_ground_motion_PGA.pkl")
interest_index = (ground_motion['Earthquake Magnitude'] > 6.0) & (ground_motion['Earthquake Magnitude'] < 7.0)
ground_motion2 = ground_motion[interest_index].sample(n=501)

In [3]:
y = np.log(ground_motion['PGA (g)'].values)
X = ground_motion[['EpiD (km)']].values.T[0]
# We added shuffling to introduce stochasticity
#X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.6, shuffle=True)

In [4]:
X_train = torch.tensor(X, dtype=torch.float32)
y_train = torch.tensor(y, dtype=torch.float32).reshape(-1, 1)
#X_test = torch.tensor(X_test, dtype=torch.float32)
#y_test = torch.tensor(y_test, dtype=torch.float32).reshape(-1, 1)

In [5]:
class BNN4GM_V1(PyroModule):
    def __init__(self, in_dim=1, out_dim=1, hid_dim=5, prior_scale=10.):
        super().__init__()

        self.activation = nn.ReLU()  # or nn.ReLU()
        self.layer1 = PyroModule[nn.Linear](in_dim, hid_dim)  # Input to hidden layer
        self.layer2 = PyroModule[nn.Linear](hid_dim, out_dim)  # Hidden to output layer

        # Set layer parameters as random variables
        self.layer1.weight = PyroSample(dist.Normal(0., prior_scale).expand([hid_dim, in_dim]).to_event(2))
        self.layer1.bias = PyroSample(dist.Normal(0., prior_scale).expand([hid_dim]).to_event(1))
        self.layer2.weight = PyroSample(dist.Normal(0., prior_scale).expand([out_dim, hid_dim]).to_event(2))
        self.layer2.bias = PyroSample(dist.Normal(0., prior_scale).expand([out_dim]).to_event(1))

    def forward(self, x, y=None):
        x = x.reshape(-1, 1)
        x = self.activation(self.layer1(x))
        mu = self.layer2(x).squeeze()
        sigma = pyro.sample("sigma", dist.Uniform(0., 1.))  # Infer the response noise

        # Sampling model
        with pyro.plate("data", x.shape[0]):
            obs = pyro.sample("obs", dist.Normal(mu, sigma), obs=y)
        return mu

In [6]:
class Model(PyroModule):
    def __init__(self, h1=20, h2=20):
        super().__init__()
        self.fc1 = PyroModule[nn.Linear](1, h1)
        self.fc1.weight = PyroSample(dist.Normal(0., 1.).expand([h1, 1]).to_event(2))
        self.fc1.bias = PyroSample(dist.Normal(0., 1.).expand([h1]).to_event(1))
        self.fc2 = PyroModule[nn.Linear](h1, h2)
        self.fc2.weight = PyroSample(dist.Normal(0., 1.).expand([h2, h1]).to_event(2))
        self.fc2.bias = PyroSample(dist.Normal(0., 1.).expand([h2]).to_event(1))
        self.fc3 = PyroModule[nn.Linear](h2, 1)
        self.fc3.weight = PyroSample(dist.Normal(0., 1.).expand([1, h2]).to_event(2))
        self.fc3.bias = PyroSample(dist.Normal(0., 1.).expand([1]).to_event(1))
        self.relu = nn.ReLU()

    def forward(self, x, y=None):
        x = x.reshape(-1, 1)
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        mu = self.fc3(x).squeeze()
        sigma = pyro.sample("sigma", dist.Uniform(0., 1.))
        with pyro.plate("data", x.shape[0]):
            obs = pyro.sample("obs", dist.Normal(mu, sigma), obs=y)
        return mu

In [7]:
model = BNN4GM_V1()

# Set Pyro random seed
pyro.set_rng_seed(42)

# Define Hamiltonian Monte Carlo (HMC) kernel
# NUTS = "No-U-Turn Sampler" (https://arxiv.org/abs/1111.4246), gives HMC an adaptive step size
nuts_kernel = NUTS(model, jit_compile=False)  # jit_compile=True is faster but requires PyTorch 1.6+

# Define MCMC sampler, get 50 posterior samples
mcmc = MCMC(nuts_kernel, num_samples=20)

# Run MCMC
mcmc.run(X_train, y_train)
"""
guide = AutoDiagonalNormal(model)
adam = pyro.optim.Adam({"lr": 1e-3})
svi = SVI(model, guide, adam, loss=Trace_ELBO())

pyro.clear_param_store()
bar = trange(20000)
for epoch in bar:
    loss = svi.step(X_train, y_train)
    bar.set_postfix(loss=f'{loss / x.shape[0]:.3f}')"""

Warmup:   0%|                                                                                                           | 0/40 [00:00, ?it/s]

KeyboardInterrupt: 

In [None]:
from pyro.infer import Predictive
predictive = Predictive(model=model, posterior_samples=mcmc.get_samples())
test_x = 10**torch.linspace(0, 2.5, 101)
preds = predictive(test_x)

In [None]:
f, ax = plt.subplots(figsize=(7, 7))
ax.set(xscale="log", yscale="log")
sns.scatterplot(data=ground_motion2, x="EpiD (km)", y="PGA (g)",hue="Vs30 (m/s) selected for analysis", alpha=0.7)
ax.set(xscale="log", yscale="log")
plt.plot(10**np.linspace(0,2.5,101),torch.exp(preds['obs'][0]).detach().cpu().numpy(), label='Vs = 100 m/s')
plt.plot(10**np.linspace(0,2.5,101),torch.exp(preds['obs'][1]).detach().cpu().numpy(), label='Vs = 400 m/s')
plt.plot(10**np.linspace(0,2.5,101),torch.exp(preds['obs'][2]).detach().cpu().numpy(), label='Vs = 700 m/s')
plt.plot(10**np.linspace(0,2.5,101),torch.exp(preds['obs'][3]).detach().cpu().numpy(), label='Vs = 1000 m/s')
plt.plot(10**np.linspace(0,2.5,101),torch.exp(preds['obs'][4]).detach().cpu().numpy(), label='Vs = 1000 m/s')
plt.legend()

In [None]:
y = np.log(ground_motion['PGA (g)'].values)
X = ground_motion[['EpiD (km)']].values.T
# We added shuffling to introduce stochasticity
#X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.6, shuffle=True)

In [None]:
np.array(list(ground_motion[['EpiD (km)']].values.T[0])).shape

In [None]:
X.shape