# Forecast transit time

In this notebook we are going to train a model to forecast the transit time of a Coronal Mass Ejection (CME) from the Sun to Earth. This is a regression task, so each CME event is associated with the value of the transit time from the Sun to Earth, calculated as the difference between the take-off time and the arrival time. 

We will use a dataset already prepared by [Alobaid et al. 2022](https://www.frontiersin.org/articles/10.3389/fspas.2022.1013345/full) and available at the following [link](https://github.com/deepsuncode/CMETNet/blob/main/CMETNet_Package/data/ICME_list.csv).


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


In [None]:
# Download the dataset:
link = 'https://raw.githubusercontent.com/deepsuncode/CMETNet/main/CMETNet_Package/data/ICME_list.csv'

download = False
if download:
    import urllib.request
    import os
    os.makedirs('transitTime', exist_ok=True)
    urllib.request.urlretrieve(link, 'transitTime/ICME_list.csv')

In [None]:
# Read the dataset with pandas:
df = pd.read_csv('transitTime/ICME_list.csv')

In [None]:
df

## Data exploration:

In [None]:
# Number of CMEs per year:
df['year'] = pd.to_datetime(df['disturbance']).dt.year
df['year'].value_counts().sort_index().plot(kind='bar')
plt.xlabel('Year')
plt.ylabel('Number of CMEs')
plt.title('Number of CMEs per year')
# delete the year column
df = df.drop(columns=['year'])

In [None]:
# CME transit time distribution:
df['transit_time'].plot(kind='hist', bins=20)
plt.xlabel('Transit time [days]')
plt.ylabel('Number of CMEs')
plt.title('CME transit time distribution')


In [None]:
# Drop those with more than 120 days of transit time:
df = df[df['transit_time'] <= 120]

In [None]:
# Regression analysis:
from sklearn.linear_model import LinearRegression

# ydata, the target variable:
ydata = df['transit_time'].values

# xdata, the rest of the features:
xdata = df.drop(columns=['disturbance', 'transit_time']).values
feature_names = df.drop(columns=['disturbance', 'transit_time']).columns


## Regression task:

To test the performance of several models we will use a linear regressor, and an artificial neural network.

In [None]:
# Standarization of the features:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()

xdata_scaled = scaler.fit_transform(xdata)



# Linear regression:
model = LinearRegression()
model.fit(xdata_scaled, ydata)

# Coefficients, sorted vertically:
coefficients = pd.Series(model.coef_, index=feature_names).sort_values()
coefficients.plot(kind='bar', color='skyblue')
plt.ylabel('Coefficient')
plt.title('Linear regression coefficients')
# plt.xticks(rotation=45)


It looks that there is one feature that is very important for the transit time of CMEs. Let's see which one it is:


In [None]:
largestcoef = np.argmax(np.abs(model.coef_))
print(f'The most important feature is {feature_names[largestcoef]} with a coefficient of {model.coef_[largestcoef]:.2f}')

In [None]:
# Prediction vs real transit time:
y_pred = model.predict(xdata_scaled)
plt.scatter(ydata, y_pred, marker='.')   # scatter plot
plt.xlabel('Real transit time [hrs]')
plt.ylabel('Predicted transit time [hrs]')
plt.title('Prediction vs real transit time')
plt.plot([ydata.min(), ydata.max()], [ydata.min(), ydata.max()], color='k', linestyle='--')   # diagonal line

print('R2 score:', model.score(xdata_scaled, ydata))
print('MSE:', np.mean((ydata - y_pred)**2))

In [None]:
# Artificial neural network with pytorch:
import torch
import torch.nn as nn
import torch.optim as optim

# Neural network:
class Net(nn.Module):
    def __init__(self, input_size):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(input_size, 32)
        self.fc2 = nn.Linear(32, 1)
        self.activation = nn.ELU()

    def forward(self, x):
        x = self.fc1(x)
        x = self.activation(x)
        x = self.fc2(x)
        return x
    
    def fit(self, xdata, ydata, epochs=1000, lr=0.01, verbose=True, xtest=None, ytest=None):
        criterion = nn.MSELoss()
        optimizer = optim.Adam(self.parameters(), lr=lr)
        
        xdata_tensor = torch.tensor(xdata, dtype=torch.float32)
        ydata_tensor = torch.tensor(ydata, dtype=torch.float32).reshape(-1, 1)
        
        if xtest is not None and ytest is not None:
            xtest_tensor = torch.tensor(xtest, dtype=torch.float32)
            ytest_tensor = torch.tensor(ytest, dtype=torch.float32).reshape(-1, 1)
        
        loss_train = []
        loss_test = []
        
        for epoch in range(epochs):
            optimizer.zero_grad()
            output = self(xdata_tensor)
            loss = criterion(output, ydata_tensor)
            loss.backward()
            optimizer.step()
            loss_train.append(loss.item())
            
            if epoch % 100 == 0 and verbose:
                print(f'Epoch {epoch}, loss: {loss.item()}', flush=True)
            
            if xtest is not None and ytest is not None:
                loss_test.append(criterion(self(xtest_tensor), ytest_tensor).item())
        
        return loss_train, loss_test
                
                
    def predict(self, xdata):
        xdata_tensor = torch.tensor(xdata, dtype=torch.float32)
        return self(xdata_tensor).detach().numpy().ravel()
    
    def score(self, xdata, ydata):
        y_pred = self.predict(xdata)
        return 1 - np.mean((ydata - y_pred)**2) / np.mean((ydata - ydata.mean())**2)

model = Net(input_size=xdata.shape[1])

# Split the data into training and testing:
from sklearn.model_selection import train_test_split
xtrain, xtest, ytrain, ytest = train_test_split(xdata_scaled, ydata, test_size=0.2, random_state=42)


loss_train, loss_test = model.fit(xtrain, ytrain, epochs=3000, lr=0.001, verbose=False, xtest=xtest, ytest=ytest)

plt.plot(loss_train, label='Training loss')
plt.plot(loss_test, label='Testing loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Min test loss: {:.2f} at epoch {}'.format(min(loss_test), np.argmin(loss_test)))
plt.yscale('log')
plt.legend()



# Prediction vs real transit time:
plt.figure()
y_predtest = model.predict(xtest)
plt.scatter(ytest, y_predtest, color='C1', marker='.')   # scatter plot
y_predtrain = model.predict(xtrain)
plt.scatter(ytrain, y_predtrain, color='C0', marker='.')   # scatter plot

plt.xlabel('Real transit time [hrs]')
plt.ylabel('Predicted transit time [hrs]')
plt.title('Prediction vs real transit time')
plt.plot([ydata.min(), ydata.max()], [ydata.min(), ydata.max()], color='k', linestyle='--')   # diagonal line

y_pred = model.predict(xdata_scaled)
print('R2 score:', model.score(xdata_scaled, ydata))
print('MSE:', np.mean((ydata - y_pred)**2))

## Symbolic regression using pySR:

In the previous case we used a linear regressor and an artificial neural network. The linear regressor has high interpretability but low performance, while the artificial neural network has high performance but low interpretability. In this case we will use symbolic regression to find a mathematical expression that relates the input features to the target variable, so we can have a model with good average performance and good interpretability.

The idea of symbolic regression is to find a mathematical expression that relates the input features to the target variable. The symbolic regression algorithm will search for the best mathematical expression that fits the data, and the result will be a mathematical expression that can be used to predict the target variable. These expressions are ordered in terms of complexity and performance, so we can choose the best expression that fits our needs.

In [None]:
try:
    import pysr
except:
   %pip install pysr
   import pysr 
    

In [None]:
from pysr import PySRRegressor

model = PySRRegressor(niterations=200, binary_operators=["*", "+", "-", "/", "^"], elementwise_loss="loss(predictions, target) = abs(predictions - target)", warm_start=True, constraints={'^': (-1, 3)})
model.fit(xdata_scaled, ydata)


In [None]:
# Equation with the largest score:
model.equations_.iloc[model.equations_['score'].idxmax()]

In [None]:
# Prediction vs real transit time:
plt.figure()
y_pred = model.predict(xdata_scaled)
plt.scatter(ydata, y_pred, marker='.')   # scatter plot
plt.xlabel('Real transit time [hrs]')
plt.ylabel('Predicted transit time [hrs]')
plt.title('Prediction vs real transit time')
plt.plot([ydata.min(), ydata.max()], [ydata.min(), ydata.max()], color='k', linestyle='--')   # diagonal line

print('R2 score:', model.score(xdata_scaled, ydata))
print('MSE:', np.mean((ydata - y_pred)**2))

# Write the text on the plot:
eq_name = model.equations_.iloc[model.equations_['score'].idxmax()].equation
plt.text(0.1, 0.9, eq_name, fontsize=12, transform=plt.gca().transAxes)
