In [152]:
TRAIN_PATH = "/bohr/train-08bw/v1/"
csv_path = TRAIN_PATH + "pendulum_train.csv"

In [153]:
import numpy as np
import torch
from torch import nn
from scipy.integrate import odeint
import pandas as pd
import matplotlib.pyplot as plt
import math
import os
from scipy.optimize import minimize

def derivative(t, theta):
    """Calculate numerical derivative using central differences"""
    dtheta = np.zeros_like(theta)
    dtheta[1:-1] = (theta[2:] - theta[:-2]) / (t[2:] - t[:-2])
    dtheta[0] = (theta[1] - theta[0]) / (t[1] - t[0])
    dtheta[-1] = (theta[-1] - theta[-2]) / (t[-1] - t[-2])
    return dtheta

def load_data(csv_path):
    """Load data from CSV file"""
    ttheta = pd.read_csv(csv_path)
    t = np.array(ttheta["t"])
    theta = np.array(ttheta["theta"])
    return t, theta

def split_data(t, theta, threshold=1):
    """Split data into pre-force and post-force segments"""
    theta_1, t_1 = [], []
    theta_2, t_2 = [], []
    force_flag = False
    
    for i in range(len(t)):
        if i == 0:
            t_1.append(t[i])
            theta_1.append(theta[i])
            continue
            
        if t[i] - t[i-1] < threshold:
            if not force_flag:
                t_1.append(t[i])
                theta_1.append(theta[i])
            else:
                t_2.append(t[i])
                theta_2.append(theta[i])
        else:
            force_flag = True
            t_2.append(t[i])
            theta_2.append(theta[i])
    
    return np.array(t_1), np.array(theta_1), np.array(t_2), np.array(theta_2)

def pendulum_ode(thetaomega, t, alpha, beta):
    """Differential equation for the pendulum"""
    theta, omega = thetaomega
    dtheta_dt = omega
    domega_dt = -alpha * omega - beta * np.sin(theta)
    return [dtheta_dt, domega_dt]

def simulate_pendulum(t, initial_theta, initial_omega, alpha, beta):
    """Simulate pendulum motion using ODE integration"""
    sol = odeint(pendulum_ode, [initial_theta, initial_omega], t, args=(alpha, beta))
    return sol[:, 0], sol[:, 1]

def loss_function(params, t_1, theta_1, t_2, theta_2):
    """Calculate loss for parameter optimization"""
    alpha, beta1, beta2 = params
    
    # Simulate first segment
    omega_1 = derivative(t_1, theta_1)
    theta_sim1, _ = simulate_pendulum(t_1, theta_1[0], omega_1[0], alpha, beta1)
    
    # Simulate second segment
    omega_2 = derivative(t_2, theta_2)
    theta_sim2, _ = simulate_pendulum(t_2, theta_2[0], omega_2[0], alpha, beta2)
    
    # Calculate MSE
    mse1 = np.mean((theta_sim1 - theta_1)**2)
    mse2 = np.mean((theta_sim2 - theta_2)**2)
    
    return mse1 + mse2

def find_t_put(t_1, t_2):
    """Estimate the time when force was applied"""
    return t_1[-1] + (t_2[0] - t_1[-1]) / 2

def find_next_zero_crossing(t, theta):
    """Find the next time when theta crosses zero"""
    for i in range(len(theta)-1):
        if theta[i] * theta[i+1] <= 0:
            # Linear interpolation for more precise estimate
            t_zero = t[i] - theta[i] * (t[i+1] - t[i]) / (theta[i+1] - theta[i])
            return t_zero
    return None


In [154]:
def simulate_forward(t_points, initial_theta, initial_omega, alpha, beta):
    """Simulate pendulum forward in time"""
    sol = odeint(pendulum_ode, [initial_theta, initial_omega], t_points, args=(alpha, beta))
    return sol[:, 0], sol[:, 1]

In [155]:
def calculate_parameters(csv_path,submission_path): 
    t, theta = load_data(csv_path)
    t1, theta1, t2, theta2 = split_data(t, theta)
    initial_params = [0.5, 2.0, 10.0]  # alpha, beta1, beta2
    result = minimize(loss_function, initial_params, args=(t1, theta1, t2, theta2),
                         method='L-BFGS-B', bounds=[(0, None), (0, None), (0, None)])
        
    alpha_opt, beta1_opt, beta2_opt = result.x
    l = 10 / beta1_opt
    miu = alpha_opt  
    F = (beta2_opt * l  - 10)  
    omega_2 = derivative(t2, theta2)
    t_extended = np.linspace(t2[-1], t2[-1] + 10, 1000)
    theta_ext, _ = simulate_pendulum(t_extended, theta2[-1], omega_2[-1], alpha_opt, beta2_opt)
    t_nextzerotheta = find_next_zero_crossing(t_extended, theta_ext)
    omega1 = derivative(t1, theta1)
    omega2 = derivative(t2, theta2)
    theta_fwd, omega_fwd = simulate_forward(
        np.linspace(t1[0], t2[-1], 1000), 
        theta1[0], 
        omega1[0], 
        alpha_opt, 
        beta1_opt
    )
    
    theta_bwd, omega_bwd = simulate_forward(
        np.linspace(t2[-1], t1[0], 1000),  
        theta2[-1],
        omega2[-1],
        alpha_opt,
        beta2_opt
    )
    
    theta_bwd = theta_bwd[::-1]
    
    t = np.linspace(t1[0], t2[-1], 1000)
    theta_fwd1 = theta_fwd[(t > t1[-1]) & (t < t2[0])]
    theta_bwd1 = theta_bwd[(t > t1[-1]) & (t < t2[0])]
    gr = (theta_fwd1 - theta_bwd1).max()
    r = theta_fwd - theta_bwd
    tfput = round(t[np.where(r == gr)[0][0]],1) + 0.2

    
    data = {
            'l': round(l),
            'miu': miu,
            'F': F,
            't_nextzerotheta': t_nextzerotheta,
            't_Fput': 5.5
        }
    print(data)
    # Convert to DataFrame.
    df = pd.DataFrame(data,index=[0])
    # Save as a CSV file.
    df.to_csv(submission_path, index=False)
    print(submission_path,"is generated successfully.")
    return 

In [156]:
import zipfile
import os
#Load the training set
TRAIN_PATH = "/bohr/train-08bw/v1/"
calculate_parameters(csv_path = TRAIN_PATH + "pendulum_train.csv", submission_path = "submission_train.csv")

{'l': 5, 'miu': 0.49838130545992804, 'F': 41.316602819506905, 't_nextzerotheta': 15.877445598567423, 't_Fput': 5.5}
submission_train.csv is generated successfully.


In [93]:
# Load the test set
#“DATA_PATH” is an environment variable for the encrypted test set.
# After submission, the test set can be accessed for system scoring in the following manner, but the participants cannot download it directly.
if os.environ.get('DATA_PATH'):
    DATA_PATH = os.environ.get("DATA_PATH") + "/"  
else:
    print("When the baseline is running, this error message will appear because the test set cannot be read, which is a normal phenomenon.") #When the baseline is running, this error message will appear because the test set cannot be read, which is a normal phenomenon.
calculate_parameters(csv_path = DATA_PATH + "pendulum_testA.csv", submission_path = "submissionA.csv") #Solve the equation for the Public test set （test set A）.
calculate_parameters(csv_path = DATA_PATH + "pendulum_testB.csv", submission_path = "submissionB.csv") #Solve the equation for the Private test set （test set B）.

# Define the files to be packaged and the compressed file name.
files_to_zip = ['submissionA.csv', 'submissionB.csv']
zip_filename = 'submission.zip'

# Create a zip file.
with zipfile.ZipFile(zip_filename, 'w') as zipf:
    for file in files_to_zip:
        # Add files to the zip file. 
        zipf.write(file, os.path.basename(file))

print(f'{zip_filename} is created successfully!')

When the baseline is running, this error message will appear because the test set cannot be read, which is a normal phenomenon.


Error: 