# Code for Lecture-2 of Short Course of Temporal Point Processes

In [5]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

In [6]:
# fix random seed 
np.random.seed(12345)
torch.random.manual_seed(12345)

<torch._C.Generator at 0x11cffcaf0>

## Neural Hawkes Process

### Continuous-time LSTM cell

The LSTM cell $c(t)$ drifts from $c_{\text{start}}$ towards $c_{\text{target}}$. 

In [40]:
class CTLSTMCell(nn.Module): 
    
    def __init__(self, hdim): 
        super(CTLSTMCell, self).__init__()
        """
        hdim : # of hidden neurons
        """
        self.hdim = hdim 
        self.lin = nn.Linear(hdim*2, hdim*6, bias=True)
        return 
    
    def forward(self, x, h, c, ct): 
        """
        x : input embedding
        h : hidden state right before time t 
        c : LSTM cell right before time t 
        ct : LSTM target cell given current history
        """
        x = torch.cat((x, h), dim=0)
        y = self.lin(x)
        
        gi, gf, z, git, gft, gd = y.chunk(6, 0)
        
        gi = torch.sigmoid(gi)
        gf = torch.sigmoid(gf)
        z = torch.tanh(z)
        git = torch.sigmoid(git)
        gft = torch.sigmoid(gft)
        gd = F.softplus(gd)
        
        cs = gf * c + gi * z 
        ct = gft * ct + git * z
        
        return cs, ct, gd
    
    def decay(self, cs, ct, gd, dt): 
        """
        cs : LSTM start cell
        ct : LSTM target cell 
        gd : decay gate
        dt : elapsed time 
        """
        c = ct + (cs - ct) * torch.exp(-gd * dt)
        h = torch.tanh(c)
        
        return c, h

### Neural Hawkes process

The intensity is defined as $\lambda_k(t) = \text{Softplus}(\text{Linear}(h(t)))$.  

In [47]:
class NHP(nn.Module): 
    
    def __init__(self, kdim, hdim): 
        super(NHP, self).__init__()
        """
        kdim : # of event types 
        hdim : # of hidden neurons
        """
        self.eps = np.finfo(float).eps 
        self.max = np.finfo(float).max 
        self.kdim = kdim 
        self.hdim = hdim 
        self.BOS = kdim 
        
        self.emb_in = nn.Embedding(kdim+1, hdim)
        self.ctlstm = CTLSTMCell(hdim)
        self.emb_out = nn.Linear(hdim, kdim)
        
        self.cs = torch.zeros(size=[hdim], dtype=torch.float32)
        self.ct = torch.zeros(size=[hdim], dtype=torch.float32)
        self.gd = torch.zeros(size=[hdim], dtype=torch.float32)
        
        return 
    
    def start(self): 
        self.cs = torch.zeros(size=[hdim], dtype=torch.float32)
        self.ct = torch.zeros(size=[hdim], dtype=torch.float32)
        self.gd = torch.zeros(size=[hdim], dtype=torch.float32)
        self.update(self.BOS, 0.0)
        return 
    
    def update(self, k, dt): 
        """
        k : event type 
        dt : elapsed time since last event
        """
        c, h = self.ctlstm.decay(self.cs, self.ct, self.gd, dt)
        x = self.emb_in(torch.LongTensor([k]))[0]
        self.cs, self.ct, self.gd = self.ctlstm(x, h, c, self.ct)
        
        return 
    
    def forward(self, k, dt): 
        self.update(k, dt)
        return 
    
    def compute_intensities(self, dt): 
        c, h = self.ctlstm.decay(self.cs, self.ct, self.gd, dt)
        return F.softplus(self.emb_out(h))
    
    def compute_total_intensity(self, dt): 
        intensities = self.compute_intensities(dt)
        return torch.sum(intensities)

### Draw a sequence of events by thinning algorithm

For the code to be easy to understand, I only have non-vectorized implementation. Please check the repos for my published papers for highly vectorized and optimized implementation. 

In [66]:
def thinning(model): 
    dt = 0.0
    bound = 100.0 
    # manualy chosen for simplicity
    # in principle, it can be found using the method in Appendix B.3 of Mei & Eisner 2017
    while True: 
        u = np.random.uniform(0.0, 1.0)
        dt += -np.log(1-u) / bound
        intens = model.compute_intensities(dt)
        total_inten = torch.sum(intens)
        accept_prob = total_inten / bound
        u = np.random.uniform(0.0, 1.0)
        if u <= accept_prob: 
            break 
    
    k = torch.multinomial(intens, 1)
    
    return k, dt
    

In [67]:
kdim = 4 
hdim = 8
nhp = NHP(kdim, hdim)
# init by BOS 
nhp.start()

T = 100.0
t = 0
seq = []

while True:
    # draw dt using thinning algorithm
    
    k, dt = thinning(nhp)
    t += dt
    if t <= T: 
        seq += [(dt, k)] # track dt, not t, easy to use
        # update model 
        nhp.forward(k, dt)
    else: 
        break

print(f"over time interval [0, {T}]")
print(f"# of events : {len(seq)}")

over time interval [0, 100.0]
# of events : 290


### Predict next event time and type by sampling (approx. MBR)

In [68]:
def predict_time(model): 
    dts, ks = [], []
    n = 10 
    k, dt = thinning(model)
    dts += [float(dt)]
    dt_pred = np.mean(dts)
    return dt_pred

def predict_type(model, dt): 
    intens = model.compute_intensities(dt)
    k_pred = torch.argmax(intens)
    return k_pred

In [69]:
se = 0.0
nerr = 0

nhp.start() # restart
n = 100

for i, s in enumerate(seq[:n]): 
    # predict
    dt_pred = predict_time(nhp)
    # time
    dt = seq[i][0]
    se += (dt_pred - dt) ** 2
    # type 
    k_pred = predict_type(nhp, dt)
    k = seq[i][1]
    if k_pred != k: 
        nerr += 1

print(f"check time prediction accuracy")
#print(f"RMSE using estimated intensity : {rmse_mle:.4f}")
print(f"RMSE using true model : {np.sqrt(se/n):.4f}")

print(f"\ncheck type prediction error rate")
#print(f"Error Rate using estimated intensities : {100.0*nerr_mle/len(seq):.2f}%")
print(f"Error Rate using true model : {100.0*nerr/n:.2f}%")


check time prediction accuracy
RMSE using true model : 0.4117

check type prediction error rate
Error Rate using true model : 73.00%
