In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import cm
%matplotlib inline


import torch
from torch import nn
from torch import distributions
from torch.nn.parameter import Parameter

import models
from models import Renorm_Dynamic
from EI_calculation import approx_ei
use_cuda = torch.cuda.is_available()
device = torch.device('cuda:0') if use_cuda else torch.device('cpu')

from thop import profile

## Simple Mass-Spring Dynamics

In [None]:
def one_step(x, v):
    #dx/dt=v
    #dv/dt=-x
    x_ = x + 0.1 * v
    v_ = v - 0.1 * x
    return x_, v_
def multi_steps(s, steps):
    s_hist = s
    sn_hist = perturb(s, 0)
    for t in range(steps):
        s_next = one_step(s[:,0],s[:,1])
        s_next = torch.Tensor(s_next).unsqueeze(0)
        s_next = s_next.to(device)
        s_hist = torch.cat((s_hist, s_next), 0)
        rand_next = perturb(s_next, sigma)
        sn_hist = torch.cat((sn_hist, rand_next), 0)
        s = s_next
    return s_hist, sn_hist
def perturb(s, sigma):
    rand = torch.randn([s.size()[0], 2], device=device) * sigma
    #rand2 = torch.randn([s.size()[0], 2]) * sigma
    #rand3 = torch.randn([s.size()[0], 2]) * sigma
    s1 = s - rand
    s2 = s + rand
    #s3 = s + rand3
    sr = torch.cat((s1, s2), 1)
    return sr

def generate_data(batch_size, sigma, L):
    x = 2 * (torch.rand([batch_size, 1], device=device) - 1/2) * L
    v = 2 * (torch.rand([batch_size, 1], device=device) - 1/2) * L
    xplus, vplus= one_step(x, v)
    s_p = perturb(torch.cat((x,v), 1), sigma)
    splus_p = perturb(torch.cat((xplus, vplus), 1), sigma)
    return s_p, splus_p, torch.cat((x,v),1),torch.cat((xplus,vplus),1)
def test_model(batch_size,net,sigma,L,scale):
    batch_size1 = batch_size*10
    state, state_next,rs,rsp = generate_data(batch_size,sigma,L)
    predict, latent, latent_p = net(state) 
    #L = int(max(torch.amax(torch.abs(latent.view(-1))),torch.amax(torch.abs(latent_p.view(-1)))).item()*10)

    ssp = net.encoding(state_next)
    sigmas = torch.sqrt(torch.mean((ssp-latent_p)**2, 0))
    #sigmas = torch.relu(net.sigmas)+1e-10
    sigmas_matrix = torch.diag(sigmas)
    ei = approx_ei(scale, scale, sigmas_matrix.data, lambda x:(net.dynamics(x.unsqueeze(0))+x.unsqueeze(0)), 
                   num_samples = 1000, L=100, easy=True, device=device)
    return ei,sigmas
def calc_ei_loss(latent_p, state_next, net, scale): 
    ssp = net.encoding(state_next)
    prediction = ssp#.detach()
    real = latent_p#.detach()
    # detach the variables of ssp and latent_p which only optimize the dynamics NN.
    sigmas = torch.sqrt(torch.mean((prediction - real)**2, 0))
    #sigmas = torch.relu(net.sigmas)+1e-10
    sigmas_matrix = torch.diag(sigmas).detach()
    ei = approx_ei(scale, scale, sigmas_matrix.data, lambda x:(net.dynamics(x.unsqueeze(0))+x.unsqueeze(0)), 
                   num_samples = 1000, L=100, easy=True, device=device)
    return ei, sigmas


In [None]:
def RGB_to_Hex(rgb):
    """
    RGB
    Args:
        rgb: tuple

    Returns:
        color: str
    """
    RGB = list(rgb)
    color = '#'
    for i in RGB:
        num = int(i)
        color += str(hex(num))[-2:].replace('x', '0').upper()
    return color
    
def generate_colors(N=12,colormap='hsv'):
    step = max(int(255/N),1)
    cmap = plt.get_cmap(colormap)
    rgb_list = []
    hex_list = []
    for i in range(N):
        id = step*i # cmap(int)->(r,g,b,a) in 0~1
        id = 255 if id>255 else id
        rgba_color = cmap(id)
        rgb = [int(d*255) for d in rgba_color[:3]]
        rgb_list.append(tuple(rgb))
        hex_list.append(RGB_to_Hex(rgb))
    return rgb_list,hex_list

In [None]:
rgb_list,hex_list = generate_colors(4,'ocean')
print(rgb_list)
print(hex_list)

### Experiments

#### Macro-state

In [None]:
L = 10
hidden_units = 64
sigma=1
torch.manual_seed(10)
scale =4
sz = 4
batch_size =100
MAE = torch.nn.L1Loss()
net = Renorm_Dynamic(sym_size = sz, latent_size = scale, effect_size = sz, 
                     hidden_units = hidden_units, normalized_state=False, device = device)
net = net.cuda() if use_cuda else net
optimizer = torch.optim.Adam([p for p in net.parameters() if p.requires_grad==True], lr=1e-4)
maes = []
ei_micro=[]
for t in range(10001):    
    s,sp,rs,rsp = generate_data(batch_size, sigma, L)
    predict, latent, latent_p = net(s)
    mae = MAE(sp, predict)
    maes.append(mae.item())
    loss = mae
    #print(loss.item())
    #loss = mae
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if t % 500 == 0:
        ei,sigmas=test_model(batch_size, net,sigma,L,scale)
        ei_micro.append(ei[0])
        print('iter %s:' % t, 'loss = %.3f' % loss, ', MAE=%.3f' % mae.item(), 
              ',dEI= %.3f' % ei[0],', eff= %.3f' % ei[1], ', std = %.3f' % sigmas.mean().item())

#### Micro-state

In [None]:
L = 10
hidden_units = 64
sigma=1
torch.manual_seed(10)
scale =2
sz = 4
batch_size =100
MAE = torch.nn.L1Loss()

net = Renorm_Dynamic(sym_size = sz, latent_size = scale, effect_size = sz, 
                     hidden_units = hidden_units, normalized_state=False, device = device)
net = net.cuda() if use_cuda else net
optimizer = torch.optim.Adam([p for p in net.parameters() if p.requires_grad==True], lr=1e-4)
maes = []
ei_macro=[]
for t in range(10001):    
    s,sp,rs,rsp = generate_data(batch_size, sigma, L)
    predict, latent, latent_p = net(s)
    mae = MAE(sp, predict)
    maes.append(mae.item())
    loss = mae
    #print(loss.item())
    #loss = mae
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if t % 500 == 0:
        ei,sigmas=test_model(batch_size, net,sigma,L,scale)
        ei_macro.append(ei[0])
        ce=ei[0]-ei_micro[t % 500]
        print('iter %s:' % t, 'loss = %.3f' % loss, ', MAE=%.3f' % mae.item(), 
              ',dEI_macro-dEI_micro= %.3f' % ce,', eff= %.3f' % ei[1], ', std = %.3f' % sigmas.mean().item())

#### The Real and Predicted Macro-state for NIS

In [None]:
batch_size1 = batch_size*10
s,sp,sr,srp = generate_data(batch_size1, sigma, L)
predict, latent, latent_p = net(s) 

xy = latent_p #- latent
xyp = net.encoding(sp)#srp #- sr
xplot = torch.linspace(torch.min(torch.cat((xy,xyp),1)).data,torch.max(torch.cat((xy,xyp),1)).data,100, device=device)
if use_cuda:
    xplot = xplot.cpu()
    xy = xy.cpu()
    xyp = xyp.cpu()
plt.plot(xy[:,1].data, xyp[:,1].data,'o',markersize='2',label='Velocity',color='crimson')
plt.plot(xy[:,0].data, xyp[:,0].data,'s',markersize='2', label='Position',color=hex_list[3])
plt.plot(xplot.data, xplot.data, '-',label='y=x',color=hex_list[2])
plt.title('The Real and Predicted Macro-state for NIS',fontsize=14) 
plt.xlabel('Predicted Latent State',fontsize=13)
plt.ylabel('Decoded Real Latent State',fontsize=13)
plt.legend()
plt.savefig('spring_1.svg', dpi=600, format='svg')
plt.show()
plt.close()
xx = latent
yy = latent_p-latent
if use_cuda:
    xx = xx.cpu()
    yy = yy.cpu()
plt.figure()
plt.plot(xx[:,1].data, yy[:,0].data,'o',markersize='2', label = 'predicted $(v, dz / dt)$',color='crimson')
plt.plot(xx[:,1].data, 0.1*xx[:,1].data, label = 'real $dz / dt = v$',color=hex_list[0])
plt.plot(xx[:,0].data, yy[:,1].data,'s',markersize='2',label ='predicted $(z, dv / dt)$',color=hex_list[3])
plt.plot(xx[:,0].data, -0.1*xx[:,0].data, label = 'real $dv / dt = - z$',color=hex_list[2])
plt.title('The Real and Predicted Dynamics for NIS',fontsize=14) 
plt.xlabel('Learned Latent State',fontsize=13)
plt.ylabel('$\Delta$ Learned Latent State/$\Delta$ t',fontsize=13)
plt.legend(fontsize=8)
plt.savefig('spring_3.svg', dpi=600, format='svg')
plt.show()
plt.close()
err1 = torch.abs(yy[:,0] - 0.1 * xx[:,1])
err2 = torch.abs(yy[:,1] + 0.1 * xx[:,0])
err = torch.mean(torch.cat((err1, err2), 0))
err

#### Position and Velocity

In [None]:
torch.manual_seed(100)
#torch.manual_seed(2050)
steps = 400
z = torch.randn([1, 2], device=device)*L/2 
s = perturb(z, sigma)
s_hist, z_hist = net.multi_step_prediction(s, steps)
if use_cuda:
    s_hist = s_hist.cpu()
    z_hist = z_hist.cpu()
plt.plot(z_hist[:, 0].data, z_hist[:, 1].data, '.',label='Predicted',color=hex_list[0])

#plt.plot(s_hist[:, 0].data, s_hist[:, 1].data, '*')
#plt.plot(s_hist[:, 2].data, s_hist[:, 3].data, '*')

rs_hist, rsn_hist = multi_steps(z, steps)
if use_cuda:
    rs_hist = rs_hist.cpu()
    rsn_hist = rsn_hist.cpu()
plt.plot(rs_hist[:, 0].data, rs_hist[:, 1].data, label='Real',color=hex_list[1])
#plt.plot(rsn_hist[:, 0].data, rsn_hist[:, 1].data)
#plt.plot(rsn_hist[:, 2].data, rsn_hist[:, 3].data)
plt.xlabel('Position (z)',fontsize=14)
plt.ylabel('Velocity (v)',fontsize=14)
plt.legend()
#plt.savefig('spring_5.svg', dpi=600, format='svg')
plt.show()
plt.close()

plt.figure()
means=torch.mean(torch.abs(rsn_hist-s_hist),1)
cums=torch.cumsum(means, 0)
plt.semilogy(means.data,color=hex_list[0])
plt.semilogy(cums.data/np.linspace(1, steps+1, steps+1),color=hex_list[1])
plt.xlabel('Time')
plt.ylabel('Errors')
plt.show()
plt.close()

### Compare with NN

In [None]:
MAE = torch.nn.L1Loss()
hidden_size = 64
block = lambda: nn.Sequential(nn.Linear(4, hidden_size), nn.LeakyReLU(), nn.Linear(hidden_size, hidden_size), nn.LeakyReLU(), nn.Linear(hidden_size, 4))
dynamics_direct = nn.ModuleList([block() for _ in range(1)])
dynamics_direct = dynamics_direct.cuda() if use_cuda else dynamics_direct
batch_size =100
optimizer = torch.optim.Adam(list(dynamics_direct.parameters()), lr=1e-4)
for t in range(10001):    
    s0,sp,_,_ = generate_data(batch_size, 1, 100)
    s=s0
    for i in range(len(dynamics_direct)):
        s = dynamics_direct[i](s)
    sh = s + s0
    loss = MAE(sh, sp)
    
    optimizer.zero_grad()
    loss.backward(retain_graph=True)
    optimizer.step()
    
    if t % 500 == 0:
        print('iter %s:' % t, 'loss = %.3f' % loss)

In [None]:
batch_size1 = batch_size*10
s,sp,sr,srp = generate_data(batch_size1, sigma, L)
s0 = s
for i in range(len(dynamics_direct)):
    s = dynamics_direct[i](s)
prediction = s + s0
xy = prediction - s0
xyp = srp - sr

if use_cuda:
    xy = xy.cpu()
    xyp = xyp.cpu()
    xplot = xplot.cpu()
plt.plot(xy[:,1].data, xyp[:,1].data,'o',markersize='2',label='Velocity',color='crimson')
plt.plot(xy[:,0].data, xyp[:,0].data,'s',markersize='2', label='Position',color=hex_list[3])
plt.plot(xplot.data, xplot.data, '-',label='y=x',color=hex_list[2])
plt.title('The Real and Predicted Macro-state for NN',fontsize=14) 
plt.xlabel('Predicted Latent State',fontsize=13)
plt.ylabel('Decoded Real Latent State',fontsize=13)
plt.legend()
plt.savefig('spring_2.svg', dpi=600, format='svg')
plt.show()
plt.close()

xx = s0
yy = prediction - s0
if use_cuda:
    xx = xx.cpu()
    yy = yy.cpu()
plt.figure()
plt.plot(xx[:,1].data, yy[:,0].data,'o',markersize='2', label = 'predicted $(v, dz / dt)$',color='crimson')
plt.plot(xx[:,1].data, 0.1*xx[:,1].data, label = 'real $dz / dt = v$',color=hex_list[0])
plt.plot(xx[:,0].data, yy[:,1].data,'s',markersize='2',label ='predicted $(z, dv / dt)$',color=hex_list[3])
plt.plot(xx[:,0].data, -0.1*xx[:,0].data, label = 'real $dv / dt = - z$',color=hex_list[2])
plt.title('The Real and Predicted Dynamics for NN',fontsize=14) 
plt.xlabel('Learned Latent State',fontsize=13)
plt.ylabel('$\Delta$ Learned Latent State/$\Delta$ t',fontsize=13)
plt.legend(fontsize=8)
plt.savefig('spring_4.svg', dpi=600, format='svg')
plt.show()
plt.close()

#### Multi-scale Searching

In [None]:
L = 10
hidden_units = 64
sigma=1
#torch.manual_seed(2050)
experiments = 10
batch_size =100
epochs=10001
MAE = torch.nn.L1Loss()

In [None]:
scale=4
ei_micro=[]
for experiment in range(experiments):
    net = Renorm_Dynamic(sym_size=4, latent_size = scale, effect_size = 4, hidden_units = hidden_units,
                        normalized_state = False, device=device)
    net = net.cuda() if use_cuda else net
    optimizer = torch.optim.Adam([p for p in net.parameters() if p.requires_grad==True], lr=1e-4)
    batch_size1 = batch_size*10
    s,sp,sr,srp = generate_data(batch_size1, sigma, L)
    predict, latent, latent_p = net(s) 
    mae = MAE(net.eff_predict(predict), sp)
    avg_mae = mae.mean()
    
    #mutual information
    ssp = net.encoding(sp)
    # detach the variables of ssp and latent_p which only optimize the dynamics NN.
    sigmas = torch.sqrt(torch.mean((ssp - latent_p)**2, 0))
    sigmas_matrix = torch.diag(sigmas)
    ei = approx_ei(scale, scale, sigmas_matrix.data, lambda x:(net.dynamics(x.unsqueeze(0))+x.unsqueeze(0)), 
                       num_samples = 1000, L=100, easy=True, device=device)
    print(ei)
    ei_micro.append(ei[0])

In [None]:
err_scale = []
multi_err_scale = []
ei_scale=[]
ce_scale=[]
for scale in [4,3,2,1]:
    print('*********************',scale,'**************************')
    err_experiment=[]
    multi_err_experiment=[]
    ei_experiment=[]
    ce_experiment=[]
    for experiment in range(experiments):
        print('----------',experiment+1,'----------')
        #micro
        print('micro')
        net0 = Renorm_Dynamic(sym_size=4, latent_size = 4, effect_size = 4, hidden_units = hidden_units,
                        normalized_state = False, device=device)
        net0 = net0.cuda() if use_cuda else net0
        optimizer0 = torch.optim.Adam([p for p in net0.parameters() if p.requires_grad==True], lr=1e-4)
        for t in range(epochs):    
            s,sp,_,_ = generate_data(batch_size, sigma, L)
            predict0, latent0, latent_p0 = net0(s)
            loss0 = MAE(sp, predict0)

            optimizer0.zero_grad()
            loss0.backward(retain_graph=True)
            optimizer0.step()

            if t % 500 == 0:
                ei0,sigmas0=test_model(batch_size, net0,sigma,L,4)
                #print('effective mutual information:',ei)
                print('iter %s:' % t, 'loss = %.3f' % loss0, ', dEI= %.3f' % ei0[0],
                     ', eff= %.3f' % ei0[1], ', std = %.3f' % sigmas0.mean().item())
        batch_size1 = batch_size*10
        s,sp,sr,srp = generate_data(batch_size1, sigma, L)
        predict0, latent0, latent_p0 = net0(s) 
        mae0 = MAE(net0.eff_predict(predict0), sp)
        avg_mae0 = mae0.mean()
        
        #mutual information
        ssp0 = net0.encoding(sp)
        # detach the variables of ssp and latent_p which only optimize the dynamics NN.
        sigmas0 = torch.sqrt(torch.mean((ssp0 - latent_p0)**2, 0))
        sigmas_matrix0 = torch.diag(sigmas0)
        ei0 = approx_ei(4, 4, sigmas_matrix0.data, lambda x:(net0.dynamics(x.unsqueeze(0))+x.unsqueeze(0)), 
                        num_samples = 1000, L=100, easy=True, device=device)
        print('effective mutual information of micro:',ei0)

        #macro
        print('macro')
        net = Renorm_Dynamic(sym_size=4, latent_size = scale, effect_size = 4, hidden_units = hidden_units,
                            normalized_state = False, device=device)
        net = net.cuda() if use_cuda else net
        optimizer = torch.optim.Adam([p for p in net.parameters() if p.requires_grad==True], lr=1e-4)
        for t in range(epochs):    
            s,sp,_,_ = generate_data(batch_size, sigma, L)
            predict, latent, latent_p = net(s)
            loss = MAE(sp, predict)

            optimizer.zero_grad()
            loss.backward(retain_graph=True)
            optimizer.step()

            if t % 500 == 0:
                ei,sigmas=test_model(batch_size, net,sigma,L,scale)
                #print('effective mutual information:',ei)
                print('iter %s:' % t, 'loss = %.3f' % loss, ', dEI= %.3f' % ei[0],
                     ', eff= %.3f' % ei[1], ', std = %.3f' % sigmas.mean().item())
        #test
        predict, latent, latent_p = net(s) 
        mae = MAE(net.eff_predict(predict), sp)
        avg_mae = mae.mean()
        err_experiment.append(avg_mae.item())
        
        #mutual information
        ssp = net.encoding(sp)
        sigmas = torch.sqrt(torch.mean((ssp - latent_p)**2, 0))
        sigmas_matrix = torch.diag(sigmas)
        # detach the variables of ssp and latent_p which only optimize the dynamics NN.
        ei = approx_ei(scale, scale, sigmas_matrix.data, lambda x:(net.dynamics(x.unsqueeze(0))+x.unsqueeze(0)), 
                       num_samples = 1000, L=100, easy=True, device=device)
        print('effective mutual information:',ei)
        ei_experiment.append(ei[0])

        ce=ei[0]-ei0[0]
        print('casual emergence:',ce)
        ce_experiment.append(ce)
        
        #multi-step test
        steps = 500
        z = torch.randn([1, 2], device=device)*L/2 
        s = perturb(z, sigma)
        s_hist, z_hist = net.multi_step_prediction(s, steps)
        rs_hist, rsn_hist = multi_steps(z, steps)
        means=torch.mean(torch.abs(rsn_hist-s_hist),1)
        means=means.cpu() if use_cuda else means
        cums=torch.cumsum(means, 0)
        multi_err_experiment.append(means.data)
    err_scale.append(err_experiment)
    multi_err_scale.append(multi_err_experiment)
    ei_scale.append(ei_experiment)
    ce_scale.append(ce_experiment)

In [None]:
scales=torch.Tensor([4,3,2,1])
means=[]
stds=[]
for err in ce_scale:
    m=np.mean(err)
    std=np.std(err)
    means.append(m)
    stds.append(std)
plt.plot(scales, means,'o')
plt.errorbar(scales, means, stds)
plt.xlabel('Scale ($q$)')
plt.ylabel('dEI_macro-dEI_micro')
plt.show()

#### Casual Emergence

In [None]:
rgb_list,hex_list = generate_colors(2,'cool')
print(rgb_list)
print(hex_list)

#Spring data
ces=ce_scale

scales=torch.Tensor([4,3,2,1])
means=[]
stds=[]

for ce in ces:
    m=np.mean(ce)
    std=np.std(ce)
    means.append(m)
    stds.append(std)

#plt.figure(figsize=(3.92*2,2.66*2), dpi = 80)
plt.plot(scales, means,'o')
plt.errorbar(scales, means, stds,color=hex_list[0])
plt.bar(scales, means, width=0.3, facecolor=hex_list[1], edgecolor='white')
plt.title('Relationship between dCE and Scale(q)',fontsize=14) 
plt.xlabel('Scale ($q$)',fontsize=14)
plt.ylabel('Causal Emergence (dCE)',fontsize=14)
plt.axhline(0, color='black', linestyle='--')
plt.savefig('Spring_casual_emergence.svg', dpi=600, format='svg')
plt.show()

### Mutual Information

In [None]:
import entropy_estimators as ee

#### Verification for Information Bottleneck

In [None]:
L = 10
hidden_units = 64
sigma=1
torch.manual_seed(10)
scale =2
sz = 4
batch_size =1000
MAE = torch.nn.L1Loss()
net = Renorm_Dynamic(sym_size = sz, latent_size = scale, effect_size = sz, 
                     hidden_units = hidden_units, normalized_state=False, device = device)
net = net.cuda() if use_cuda else net
optimizer = torch.optim.Adam([p for p in net.parameters() if p.requires_grad==True], lr=1e-4)
maes = []
ei_micro=[]

Islt=[]
Islt1=[]
Isshat1=[]
Isshat=[]
Ts=[]

for t in range(20001):    
    s,sp,rs,rsp = generate_data(batch_size, sigma, L)
    predict, latent, latent_p = net(s)
    
    mae = MAE(sp, predict)
    maes.append(mae.item())
    loss = mae
    #print(loss.item())
    #loss = mae
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if t % 50 == 0:
        I_yt_xth=ee.mi(predict.cpu().detach(),latent.cpu().detach())
        I_yt1_xth=ee.mi(predict.cpu().detach(),latent_p.cpu().detach())
        I_xt_yt=ee.mi(s.cpu(),latent.cpu().detach())
        I_xt_yt1=ee.mi(s.cpu(),latent_p.cpu().detach())
        Isshat1.append(I_xt_yt)
        Islt.append(I_yt_xth)
        Islt1.append(I_yt1_xth)
        Isshat.append(I_xt_yt1)
        Ts.append(t)

        print(t,I_yt_xth,I_yt1_xth,I_xt_yt,I_xt_yt1)

plt.plot(Ts,Islt,label='I(hatxt+1,yt)')
plt.plot(Ts,Islt1,label='I(hatxt+1,yt_1)')
plt.plot(Ts,Isshat1,label='I(xt,yt)')
plt.plot(Ts,Isshat,label='I(xt,yt1)')
plt.legend(loc='best')
plt.xlabel('Iter')
plt.ylabel('Value')
plt.show()

In [None]:
plt.plot(Ts,Islt,label='$I(\hat{x}_{t+1},y_t)$',color='crimson')
plt.plot(Ts,Islt1,label='$I(\hat{x}_{t+1},y(t+1))$',color='dodgerblue',linestyle='-.')
plt.plot(Ts,Isshat1,label='$I(x_t,y_t)$',color='forestgreen')
plt.plot(Ts,Isshat,label='$I(x_t,y(t+1))$',color='darkorange',linestyle='-.')
plt.legend(loc='best')
plt.title('Verification for Information Bottleneck on NIS',fontsize=14) 
plt.xlabel('Iter',fontsize=13)
plt.ylabel('Mutual Information',fontsize=13)
plt.ylim(5.5,8)
plt.savefig('Spring_Scale_and_I_yt+1.svg', dpi=600, format='svg')
plt.show()

#### Verification for Theorem 6 with Scale and Mutual Information 

In [None]:
L = 10
hidden_units = 64
sigma=1
torch.manual_seed(10)
scale =4
sz = 4
batch_size =1000
MAE = torch.nn.L1Loss()
net = Renorm_Dynamic(sym_size = sz, latent_size = scale, effect_size = sz, 
                     hidden_units = hidden_units, normalized_state=False, device = device)
net = net.cuda() if use_cuda else net
optimizer = torch.optim.Adam([p for p in net.parameters() if p.requires_grad==True], lr=1e-4)
maes = []
ei_micro=[]
Isst=[]
Illt=[]
Isshat4=[]
Irrt4=[]
errs=[]
Ts=[]

for t in range(20001):    
    s,sp,rs,rsp = generate_data(batch_size, sigma, L)
    predict, latent, latent_p = net(s)
    
    mae = MAE(sp, predict)
    maes.append(mae.item())
    loss = mae
    #print(loss.item())
    #loss = mae
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if t % 500 == 0:
        I_yt_xt=ee.mi(s.cpu(),latent.cpu().detach())
        I_xt_xt1hat=ee.mi(s.cpu(),predict.cpu().detach())
        Isshat4.append(I_xt_xt1hat)
        Irrt4.append(I_yt_xt)
        Ts.append(t)

        print(t,I_yt_xt,I_xt_xt1hat)

In [None]:
L = 10
hidden_units = 64
sigma=1
torch.manual_seed(10)
scale =3
sz = 4
batch_size =1000
MAE = torch.nn.L1Loss()
net = Renorm_Dynamic(sym_size = sz, latent_size = scale, effect_size = sz, 
                     hidden_units = hidden_units, normalized_state=False, device = device)
net = net.cuda() if use_cuda else net
optimizer = torch.optim.Adam([p for p in net.parameters() if p.requires_grad==True], lr=1e-4)
maes = []
ei_micro=[]
Isst=[]
Illt=[]
Isshat3=[]
Irrt3=[]
errs=[]
Ts=[]

for t in range(20001):    
    s,sp,rs,rsp = generate_data(batch_size, sigma, L)
    predict, latent, latent_p = net(s)
    
    mae = MAE(sp, predict)
    maes.append(mae.item())
    loss = mae
    #print(loss.item())
    #loss = mae
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if t % 500 == 0:
        I_yt_xt=ee.mi(s.cpu(),latent.cpu().detach())
        I_xt_xt1hat=ee.mi(s.cpu(),predict.cpu().detach())
        Isshat3.append(I_xt_xt1hat)
        Irrt3.append(I_yt_xt)
        Ts.append(t)

        print(t,I_yt_xt,I_xt_xt1hat)

In [None]:
L = 10
hidden_units = 64
sigma=1
torch.manual_seed(10)
scale =2
sz = 4
batch_size =1000
MAE = torch.nn.L1Loss()
net = Renorm_Dynamic(sym_size = sz, latent_size = scale, effect_size = sz, 
                     hidden_units = hidden_units, normalized_state=False, device = device)
net = net.cuda() if use_cuda else net
optimizer = torch.optim.Adam([p for p in net.parameters() if p.requires_grad==True], lr=1e-4)
maes = []
ei_micro=[]
Isst=[]
Illt=[]
Isshat2=[]
Irrt2=[]
errs=[]
Ts=[]

for t in range(20001):    
    s,sp,rs,rsp = generate_data(batch_size, sigma, L)
    predict, latent, latent_p = net(s)
    
    mae = MAE(sp, predict)
    maes.append(mae.item())
    loss = mae
    #print(loss.item())
    #loss = mae
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if t % 500 == 0:
        I_yt_xt=ee.mi(s.cpu(),latent.cpu().detach())
        I_xt_xt1hat=ee.mi(s.cpu(),predict.cpu().detach())
        Isshat2.append(I_xt_xt1hat)
        Irrt2.append(I_yt_xt)
        Ts.append(t)

        print(t,I_yt_xt,I_xt_xt1hat)

In [None]:
rgb_list,hex_list = generate_colors(4,'cool')
print(rgb_list)
print(hex_list)

plt.plot(Ts[1:41],Irrt4[1:41],label='$I(x_t,y_t^{q=4})$',linewidth=2,color=hex_list[0],linestyle='-.')
plt.plot(Ts[1:41],Irrt3[1:41],label='$I(x_t,y_t^{q=3})$',linewidth=2,color=hex_list[1],linestyle='--')
plt.plot(Ts[1:41],Irrt2[1:41],label='$I(x_t,y_t^{q=2})$',linewidth=2,color=hex_list[2],linestyle='-.')
plt.plot(Ts[1:41],Isshat2[1:41],label='$I(x_t,\hat{x}_{t+1})$',linewidth=2,color=hex_list[3])
plt.legend(loc='best')
plt.title('Verification for Theorem 6',fontsize=14) 
plt.xlabel('Iter',fontsize=13)
plt.ylabel('Mutual Information',fontsize=13)
plt.ylim(4,9.5)
plt.savefig('Spring_Scale_and_I.svg', dpi=600, format='svg')
plt.show()

#### Verification for Theorem 2 when Scale(q)=3

In [None]:
L = 10
hidden_units = 64
sigma=1
torch.manual_seed(10)
scale =3
sz = 4
batch_size =1000
MAE = torch.nn.L1Loss()
net = Renorm_Dynamic(sym_size = sz, latent_size = scale, effect_size = sz, 
                     hidden_units = hidden_units, normalized_state=False, device = device)
net = net.cuda() if use_cuda else net
optimizer = torch.optim.Adam([p for p in net.parameters() if p.requires_grad==True], lr=1e-4)
maes = []
ei_micro=[]
Isst=[]
Illt=[]
Isshat=[]
Irrt=[]
errs=[]
Ts=[]

for t in range(50001):    
    s,sp,rs,rsp = generate_data(batch_size, sigma, L)
    predict, latent, latent_p = net(s)
    
    mae = MAE(sp, predict)
    maes.append(mae.item())
    loss = mae
    #print(loss.item())
    #loss = mae
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if t % 500 == 0:
        I_xt_xt1=ee.mi(s.cpu(),sp.cpu())
        I_yt_yt1=ee.mi(latent.cpu().detach(), latent_p.cpu().detach(),k=3)
        I_xt_xt1hat=ee.mi(s.cpu(),predict.cpu().detach())
        Isst.append(I_xt_xt1)
        Isshat.append(I_xt_xt1hat)
        Irrt.append(I_yt_yt1)
        errs.append(np.std([I_yt_yt1,I_xt_xt1hat,I_xt_xt1]))
        Ts.append(t)

        print(t,I_xt_xt1,I_yt_yt1, I_xt_xt1hat)

print(Isst,Isshat,Irrt,errs)
plt.plot(Ts,Isst,label='I(xt,xt+1)')
plt.plot(Ts,Irrt,label='I(yt,yt+1)')
plt.plot(Ts,Isshat,label='I(xt,xthat)')
plt.plot(Ts,errs,label='error')
plt.ylim(0,9)
plt.legend(loc='best')
plt.xlabel('Iter')
plt.ylabel('Value, Scale(q)=3')
plt.savefig('Mutual Information when Scale(q)=3.svg', dpi=600, format='svg')
plt.show()

In [None]:
rgb_list,hex_list = generate_colors(4,'viridis')
print(rgb_list)
print(hex_list)

plt.plot(Ts,Isst,label='$I(x_t,x_{t+1})$',color='forestgreen')
plt.plot(Ts,Irrt,label='$I(y_t,y(t+1))$',color='crimson',linestyle='-.')
plt.plot(Ts,Isshat,label='$I(x_t,\hat{x}_{t+1})$',color='dodgerblue',linestyle='--')
#plt.plot(Ts,errs,label='error',color=hex_list[3])
plt.ylim(0,9)

plt.title('Verification for Theorem 2 when Scale(q)=3',fontsize=14)
plt.legend(loc='best')
plt.xlabel('Iter',fontsize=13)
plt.ylabel('Mutual Information, Scale(q)=3',fontsize=13)
plt.savefig('Spring_Mutual_Information_Scale3.svg', dpi=600, format='svg')
plt.show()

#### Verification for Theorem 2 when Scale(q)=2

In [None]:
L = 10
hidden_units = 64
sigma=1
torch.manual_seed(10)
scale =2
sz = 4
batch_size =1000
MAE = torch.nn.L1Loss()
net = Renorm_Dynamic(sym_size = sz, latent_size = scale, effect_size = sz, 
                     hidden_units = hidden_units, normalized_state=False, device = device)
net = net.cuda() if use_cuda else net
optimizer = torch.optim.Adam([p for p in net.parameters() if p.requires_grad==True], lr=1e-4)
maes = []
ei_micro=[]
Isst=[]
Illt=[]
Isshat=[]
Irrt=[]
errs=[]
Ts=[]

for t in range(50001):    
    s,sp,rs,rsp = generate_data(batch_size, sigma, L)
    predict, latent, latent_p = net(s)
    
    mae = MAE(sp, predict)
    maes.append(mae.item())
    loss = mae
    #print(loss.item())
    #loss = mae
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if t % 500 == 0:
        I_xt_xt1=ee.mi(s.cpu(),sp.cpu())
        I_yt_yt1=ee.mi(latent.cpu().detach(), latent_p.cpu().detach(),k=18)
        I_xt_xt1hat=ee.mi(s.cpu(),predict.cpu().detach())
        Isst.append(I_xt_xt1)
        Isshat.append(I_xt_xt1hat)
        Irrt.append(I_yt_yt1)
        errs.append(np.std([I_yt_yt1,I_xt_xt1hat,I_xt_xt1]))
        Ts.append(t)

        print(t,I_xt_xt1,I_yt_yt1, I_xt_xt1hat)

print(Isst,Isshat,Irrt,errs)
plt.plot(Ts,Isst,label='I(xt,xt+1)')
plt.plot(Ts,Irrt,label='I(yt,yt+1)')
plt.plot(Ts,Isshat,label='I(xt,xthat)')
plt.plot(Ts,errs,label='error')
plt.ylim(0,9)
plt.legend(loc='best')
plt.xlabel('Iter')
plt.ylabel('Value, Scale(q)=2')
plt.savefig('Mutual Information when Scale(q)=2.svg', dpi=600, format='svg')
plt.show()

In [None]:
rgb_list,hex_list = generate_colors(4,'viridis')
print(rgb_list)
print(hex_list)

plt.plot(Ts,Isst,label='$I(x_t,x_{t+1})$',color='forestgreen')
plt.plot(Ts,Irrt,label='$I(y_t,y(t+1))$',color='crimson',linestyle='-.')
plt.plot(Ts,Isshat,label='$I(x_t,\hat{x}_{t+1})$',color='dodgerblue',linestyle='--')
#plt.plot(Ts,errs,label='error',color=hex_list[3])
plt.ylim(0,9)
plt.title('Verification for Theorem 2 when Scale(q)=2',fontsize=14) 
plt.legend(loc='best')
plt.xlabel('Iter',fontsize=13)
plt.ylabel('Mutual Information, Scale(q)=2',fontsize=13)
plt.savefig('Spring_Mutual_Information_Scale2.svg', dpi=600, format='svg')
plt.show()