In [None]:
import numpy as np
import pandas as pd
import os
from utils.timefeatures import time_features
import warnings
import matplotlib.pyplot as plt
from models.FITS import Model as FITS
from models.iTransformer import Model as iTransformer
from models.ComFreformer import Model as ComFreformer
from tqdm import tqdm
from torch import optim
from torch import nn
import torch
import torch.nn.functional as F
import random
fix_seed = 2025
random.seed(fix_seed)
torch.manual_seed(fix_seed)
torch.cuda.manual_seed(fix_seed)
np.random.seed(fix_seed)
# torch.backends.cudnn.deterministic = True  
# torch.backends.cudnn.benchmark = False  

import os
import time

In [None]:
def sin_function(amp,period,phase_shift):
    x=np.arange(3000)
    y=amp*np.sin(2*np.pi*x/period+phase_shift)
    return y

In [None]:
y1=np.zeros(3000)+np.random.rand(3000)*0.2
y2=np.zeros(3000)+np.random.rand(3000)*0.2
amps1=[1, 0.8, 0.3, 0, 0.7, 0, 0, 0.3, 0, 0, 0, 0, 0, 0,  0.1]
amps2=[1, 0.8, 0.7, 0, 0.3, 0, 0, 0.3, 0, 0, 0, 0, 0, 0,  0.05]
for i in range(15):
    y1+=sin_function(amps1[i],np.int(1000/(i+2)**2),0)
    y2+=sin_function(amps2[i],np.int(1000/(i+2)**2),np.pi/2)
y=np.column_stack([y1, y2])

In [None]:
plt.figure(figsize=(18, 4))
plt.subplot(1, 2, 1)
plt.plot(y1[:1000], label='Signal X',color='blue')
# plt.xlabel('Time',fontweight='bold', fontsize=12)
# plt.ylabel('Value',fontweight='bold', fontsize=12)
plt.legend(prop={'weight':'bold','size' : 14})
plt.subplot(1, 2, 2)
plt.plot(y2[:1000], label='Signal Y', color='orange')
# plt.xlabel('Time',fontweight='bold', fontsize=12)
# plt.ylabel('Value',fontweight='bold', fontsize=12)
plt.legend(prop={'weight':'bold','size' : 14})
plt.savefig('part of sequences.pdf', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
class Arg(object):
    def __init__(self, batch_size=32,
                 seq_len=100, pred_len=100, enc_in=2,d_deep=100,
                 cut_freq=30, individual=False,attention_type='complex', 
                 d_model=64, d_ff=64, n_heads=2, e_layers=1, dropout=0.1,activation='gelu', learning_rate=1e-4):
        self.batch_size=batch_size
        self.seq_len=seq_len
        self.pred_len=pred_len
        self.d_deep=d_deep
        self.attention_type=attention_type
        self.d_model=d_model
        self.d_ff=d_ff
        self.n_heads=n_heads
        self.e_layers=e_layers
        self.dropout=dropout
        self.activation=activation
        self.individual=individual
        self.enc_in=enc_in
        self.cut_freq=cut_freq
        self.learning_rate=learning_rate
        self.output_attention=False
        self.use_layernorm=False
        self.use_norm=True
        self.revin=True
        self.embed='timeF'
        self.freq='t'
        self.class_strategy='projection'
        self.factor=1
        self.use_modrelu=True
        self.modrelub=0.0


# iTransformer

In [None]:
config = Arg()
itrans=iTransformer(config).to('cuda:0')
model_optim=optim.Adam(itrans.parameters(),lr=1e-4)
itf_trainloss=[]
criterion=nn.MSELoss()
for epoch in range(1000): 
    order=np.random.permutation(2000) 
    train_loss=[]
    for batch in range(25): 
        batch_start=order[batch*80:(batch+1)*80] 
        batch=[]
        for i in batch_start:
            batch.append(y[i:i+200,:])
        batch_xy=np.array(batch).reshape(80,200,2)
        batch_xy=torch.from_numpy(batch_xy).float().to('cuda:0')
        batch_x=batch_xy[:,:100,:]
        batch_y=batch_xy[:,100:,:]
        model_optim.zero_grad()
        output=itrans(batch_x, x_mark_enc=None, x_dec=None, x_mark_dec=None)
        output=output[:,-100:,:]
        loss=criterion(output,batch_y)
        train_loss.append(loss.item())
        loss.backward()
        model_optim.step()
    itf_trainloss.append(np.mean(train_loss))
    if epoch%40==0 or epoch==799:
        print('epoch: ',epoch,' loss: ',np.mean(train_loss))

In [None]:
losses=[]
itrans.eval()
with torch.no_grad():
    for i in range(2200,2800):
        batch_xy=torch.from_numpy(y[i:i+200,:]).float().to('cuda:0')
        batch_xy=batch_xy.unsqueeze(0)
        batch_x=batch_xy[:,:100,:]
        batch_y=batch_xy[:,-100:,:]
        output=itrans(batch_x, x_mark_enc=None, x_dec=None, x_mark_dec=None)
        output=output[:,-100:,:]
        loss=criterion(output,batch_y)
        losses.append(loss.item())
print('average loss: ',np.mean(losses))   

In [None]:
t=2700
batch_xy=torch.from_numpy(y[t:t+200,:]).float().to('cuda:0')
batch_xy=batch_xy.unsqueeze(0)
batch_x=batch_xy[:,:100,:]
batch_y=batch_xy[:,100:,:]
itrans.eval()
with torch.no_grad():
    output=itrans(batch_x, x_mark_enc=None, x_dec=None, x_mark_dec=None)
    output=output[:,-100:,:]
    loss=criterion(output,batch_y)
    output=output.cpu().numpy()
    batch_xy=batch_xy.cpu().numpy()
    print(loss.item())

In [None]:
y1_true=batch_xy[0,:,0]
y2_true=batch_xy[0,:,1]
y1_pred=output[0,-100:,0] 
y2_pred=output[0,-100:,1]
plt.figure(figsize=(6, 6)) 

plt.subplot(2, 1, 1)
plt.plot(y1_true, label='GroundTruth of X', color='blue')
plt.plot(range(100,200),y1_pred, label='Prediction of X', color='red', linestyle='--')
plt.ylim(-4, 4)
# plt.xlabel('Time',fontweight='bold', fontsize=12)
# plt.ylabel('Value',fontweight='bold', fontsize=12)
plt.legend(loc='upper left',prop={'weight':'bold','size':10})
plt.tick_params(left=False, labelleft=False)
plt.grid()

plt.subplot(2, 1, 2)
plt.plot(y2_true, label='GroundTruth of Y', color='orange')
plt.plot(range(100,200),y2_pred, label='Prediction of Y', color='green', linestyle='--')
plt.ylim(-4, 4)
# plt.xlabel('Time',fontweight='bold', fontsize=12)
# plt.ylabel('Value',fontweight='bold', fontsize=12)
plt.legend(loc='upper left',prop={'weight':'bold','size':10})
plt.tick_params(left=False, labelleft=False)
plt.grid()
plt.subplots_adjust(hspace=0)
# plt.tight_layout()
plt.savefig('prediction of itrans 2400.pdf', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
i=0
a=sin_function(amps1[i],np.int(1000/(i+2)**2),0)
b=sin_function(amps2[i],np.int(1000/(i+2)**2),np.pi/2)
plt.figure(figsize=(6, 6)) 
plt.subplot(2, 1, 1)
plt.plot(a[t:t+200])
plt.ylim(-2, 2)
plt.tick_params(left=False, labelleft=False)
plt.grid()

plt.subplot(2, 1, 2)
plt.plot(b[t:t+200])
plt.ylim(-2, 2)
plt.tick_params(left=False, labelleft=False)
plt.grid()
plt.subplots_adjust(hspace=0)
plt.savefig('waveform 2400.pdf', dpi=300, bbox_inches='tight')
plt.show()

# ComFreformer

In [None]:
config=Arg()
cff=ComFreformer(config).to('cuda:0')
model_optim=optim.Adam(cff.parameters(),lr=1e-4)
cff_trainloss=[]
criterion=nn.MSELoss()
for epoch in range(1000): 
    order=np.random.permutation(2000) 
    train_loss=[]
    for batch in range(25): 
        batch_start=order[batch*80:(batch+1)*80] 
        batch=[]
        for i in batch_start:
            batch.append(y[i:i+200,:])
        batch_xy=np.array(batch).reshape(80,200,2)
        batch_xy=torch.from_numpy(batch_xy).float().to('cuda:0')
        batch_x=batch_xy[:,:100,:]
        batch_y=batch_xy[:,100:,:]
        model_optim.zero_grad()
        output=cff(batch_x)
        output=output[:,-100:,:]
        # loss=criterion(output,batch_xy)
        loss=criterion(output,batch_y)
        train_loss.append(loss.item())
        loss.backward()
        model_optim.step()
    cff_trainloss.append(np.mean(train_loss))
    if epoch%40==0 or epoch==799:
        print('epoch: ',epoch,' loss: ',np.mean(train_loss))

In [None]:
plt.figure(figsize=(12, 5))
plt.plot(cff_trainloss, label='ComFreformer', color='blue')
plt.plot(itf_trainloss, label='iTransformer', color='orange')
plt.plot(fits_trainloss, label='FITS', color='green')
plt.legend( prop={'weight':'bold','size':14})
plt.xlabel('Epoch',fontweight='bold', fontsize=15)
plt.ylabel('Train Loss',fontweight='bold', fontsize=15)
plt.savefig('train loss.pdf', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
losses=[]
cff.eval()
with torch.no_grad():
    for i in range(2200,2800):
        batch_xy=torch.from_numpy(y[i:i+200,:]).float().to('cuda:0')
        batch_xy=batch_xy.unsqueeze(0)
        batch_x=batch_xy[:,:100,:]
        batch_y=batch_xy[:,-100:,:]
        output=cff(batch_x)
        output=output[:,-100:,:]
        loss=criterion(output,batch_y)
        losses.append(loss.item())
print('average loss: ',np.mean(losses)) 

In [None]:
t=2700
batch_xy=torch.from_numpy(y[t:t+200,:]).float().to('cuda:0')
batch_xy=batch_xy.unsqueeze(0)
batch_x=batch_xy[:,:100,:]
batch_y=batch_xy[:,100:,:]
cff.eval()
with torch.no_grad():
    output=cff(batch_x)
    output=output[:,-100:,:]
    loss=criterion(output,batch_y)
    output=output.cpu().numpy()
    batch_xy=batch_xy.cpu().numpy()
    print(loss.item())

In [None]:
y1_true=batch_xy[0,:,0]
y2_true=batch_xy[0,:,1]
y1_pred=output[0,-100:,0] 
y2_pred=output[0,-100:,1]
plt.figure(figsize=(6, 6)) 

plt.subplot(2, 1, 1)
plt.plot(y1_true, label='GroundTruth of X', color='blue')
plt.plot(range(100,200),y1_pred, label='Prediction of X', color='red', linestyle='--')
plt.ylim(-4, 4)
# plt.xlabel('Time',fontweight='bold', fontsize=12)
# plt.ylabel('Value',fontweight='bold', fontsize=12)
plt.legend(loc='upper left',prop={'weight':'bold','size':10})
plt.tick_params(left=False, labelleft=False)
plt.grid()

plt.subplot(2, 1, 2)
plt.plot(y2_true, label='GroundTruth of Y', color='orange')
plt.plot(range(100,200),y2_pred, label='Prediction of Y', color='green', linestyle='--')
plt.ylim(-4, 4)
# plt.xlabel('Time',fontweight='bold', fontsize=12)
# plt.ylabel('Value',fontweight='bold', fontsize=12)
plt.legend(loc='upper left',prop={'weight':'bold','size':10})
plt.tick_params(left=False, labelleft=False)
plt.grid()
plt.subplots_adjust(hspace=0)
# plt.tight_layout()
plt.savefig('prediction of cff 2400.pdf', dpi=300, bbox_inches='tight')
plt.show()

# FITS

In [None]:
config = Arg()
fits=FITS(config).to('cuda:0')
model_optim=optim.Adam(fits.parameters(),lr=1e-3)
fits_trainloss=[]
criterion=nn.MSELoss()
for epoch in range(1000): 
    order=np.random.permutation(2000) 
    train_loss=[]
    for batch in range(25): 
        batch_start=order[batch*80:(batch+1)*80] 
        batch=[]
        for i in batch_start:
            batch.append(y[i:i+200,:])
        batch_xy=np.array(batch).reshape(80,200,2)
        batch_xy=torch.from_numpy(batch_xy).float().to('cuda:0')
        batch_x=batch_xy[:,:100,:]
        batch_y=batch_xy[:,100:,:]
        model_optim.zero_grad()
        output=fits(batch_x)
        output=output[:,-100:,:]
        loss=criterion(output,batch_y)
        train_loss.append(loss.item())
        loss.backward()
        model_optim.step()
    fits_trainloss.append(np.mean(train_loss))
    if epoch%40==0 or epoch==999:
        print('epoch: ',epoch,' loss: ',np.mean(train_loss))

In [None]:
losses=[]
fits.eval()
with torch.no_grad():
    for i in range(2200,2800):
        batch_xy=torch.from_numpy(y[i:i+200,:]).float().to('cuda:0')
        batch_xy=batch_xy.unsqueeze(0)
        batch_x=batch_xy[:,:100,:]
        batch_y=batch_xy[:,-100:,:]
        output=fits(batch_x)
        output=output[:,-100:,:]
        loss=criterion(output,batch_y)
        losses.append(loss.item())
print('average loss: ',np.mean(losses)) 

In [None]:
t=2700
batch_xy=torch.from_numpy(y[t:t+200,:]).float().to('cuda:0')
batch_xy=batch_xy.unsqueeze(0)
batch_x=batch_xy[:,:100,:]
batch_y=batch_xy[:,100:,:]
fits.eval()
with torch.no_grad():
    output=fits(batch_x)
    output=output[:,-100:,:]
    loss=criterion(output,batch_y)
    output=output.cpu().numpy()
    batch_xy=batch_xy.cpu().numpy()
print('loss: ',loss.item())

In [None]:
y1_true=batch_xy[0,:,0]
y2_true=batch_xy[0,:,1]
y1_pred=output[0,-100:,0] 
y2_pred=output[0,-100:,1]
plt.figure(figsize=(6, 6)) 

plt.subplot(2, 1, 1)
plt.plot(y1_true, label='GroundTruth of X', color='blue')
plt.plot(range(100,200),y1_pred, label='Prediction of X', color='red', linestyle='--')
plt.ylim(-4, 4)
# plt.xlabel('Time',fontweight='bold', fontsize=12)
# plt.ylabel('Value',fontweight='bold', fontsize=12)
plt.legend(loc='upper left',prop={'weight':'bold','size':10})
plt.tick_params(left=False, labelleft=False)
plt.grid()

plt.subplot(2, 1, 2)
plt.plot(y2_true, label='GroundTruth of Y', color='orange')
plt.plot(range(100,200),y2_pred, label='Prediction of Y', color='green', linestyle='--')
plt.ylim(-4, 4)
# plt.xlabel('Time',fontweight='bold', fontsize=12)
# plt.ylabel('Value',fontweight='bold', fontsize=12)
plt.legend(loc='upper left',prop={'weight':'bold','size':10})
plt.tick_params(left=False, labelleft=False)
plt.grid()
plt.subplots_adjust(hspace=0)
# plt.tight_layout()
plt.savefig('prediction of fits 2400.pdf', dpi=300, bbox_inches='tight')
plt.show()