In [None]:
import csv
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import math


def load_data(file_path):
    # 读取CSV文件
    # read the CSV file
    df = pd.read_csv(file_path)
    # 获取时间t, x位置和y位置，并转换为Tensor，同时确保数据类型为torch.float32
    # Retrieve time t, x, y, and convert them to Tensors, ensuring that the data type is torch.float32
    t_obs = torch.tensor(df['Time'].values, dtype=torch.float64).view(-1, 1)
    x_obs = torch.tensor(df['X'].values, dtype=torch.float64).view(-1, 1)
    y_obs = torch.tensor(df['Y'].values, dtype=torch.float64).view(-1, 1)
    t_start = t_obs[0].detach().numpy().item()  # 训练集数据和测试集数据的时间都是从0开始的
    t_end = t_obs[-1].detach().numpy().item()

    return t_start, t_end, t_obs, x_obs, y_obs


# 定义重力加速度
# define g
g = 9.8

nodes = 32

In [None]:
class PointData(Dataset):
    def __init__(self, t_seq, p_seq):
        self.t_seq = t_seq
        self.p_seq = p_seq

    def __len__(self):
        return len(self.t_seq)

    def __getitem__(self, idx):
        return self.t_seq[idx], self.p_seq[idx]

In [None]:

# 定义神经网络，输入为时间t，输出为轨迹信息x, y坐标以及阻力参数k。
# 你可以采用别的定义方法，这里只是给出了一种可行的方式
# Define the neural network, with input as time t , and output as trajectory information including  x  and  y  coordinates, as well as the drag parameter k .
# You can use a different method; this is just one possible approach.

class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(1, 32, dtype=torch.float64), nn.Tanh(),
            nn.Linear(32, 64, dtype=torch.float64), nn.Tanh(),
            # nn.Linear(32, 32), nn.Tanh(),
            nn.Linear(64, 2, dtype=torch.float64)
        )

    def forward(self, x):
        return self.net(x)

In [None]:
def train(loader: DataLoader) -> nn.Module:


    # Train the model
    net = Net()
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(net.parameters(), lr=0.001)

    NUM_EPOCHS = 1000

    for epoch in range(NUM_EPOCHS):
        for t, p in loader:
            optimizer.zero_grad()
            loss = criterion(net(t), p)
            loss.backward()
            optimizer.step()
        # if epoch % 50 == 0:
        #     with torch.no_grad():
        #         loss = criterion(net(t_seq), p_seq)
        #         print(f"Epoch {epoch}, loss = {loss}")

    return net


def calculate(data_path: str):
    # Process the data
    t_s, t_e, t_seq, x_seq, y_seq = load_data(data_path)
    p_seq = torch.tensor(torch.zeros([len(t_seq), 2]), dtype=torch.float64)
    for i in range(len(t_seq)):
        p_seq[i] = torch.tensor([x_seq[i], y_seq[i]], dtype=torch.float64)
    dataset = PointData(t_seq, p_seq)
    loader = DataLoader(dataset, batch_size=4, shuffle=True)

    net = train(loader)

    sx = 0
    sy = 0
    sm = 0
    cnt = 0
    samples = np.linspace(t_s, t_e, 5000)
    for i in range(5000):
        time_tensor = torch.tensor([samples[i]], dtype=torch.float64, requires_grad=True)
        op = net(time_tensor)
        # print(op)

        dy_dt = torch.autograd.grad(op[1], time_tensor, grad_outputs=torch.ones_like(op[1]),
                                    create_graph=True)[0]
        d2y_dt2 = torch.autograd.grad(dy_dt, time_tensor, grad_outputs=torch.ones_like(dy_dt),
                                      create_graph=True)[0]
        dx_dt = torch.autograd.grad(op[0], time_tensor, grad_outputs=torch.ones_like(op[1]),
                                    create_graph=True)[0]
        d2x_dt2 = torch.autograd.grad(dx_dt, time_tensor, grad_outputs=torch.ones_like(dy_dt),
                                      create_graph=True)[0]

        # if abs(dy_dt.item()) > 10 or abs(dx_dt.item()) > 10 or abs(d2y_dt2.item()) > 10 or abs(d2x_dt2.item()) > 10:
        #     continue
        # print("dx", dx_dt)
        # print("dy", dy_dt)
        # print(d2x_dt2, d2y_dt2)
        v_gs = math.sqrt(dx_dt ** 2 + dy_dt ** 2)
        kx = d2x_dt2 / (dx_dt * v_gs)
        ky = (d2y_dt2 + 9.8) / (dy_dt * v_gs)
        # print(kx, ky)
        sx += kx.item()
        sy += ky.item()
        sm += (kx.item() + ky.item()) / 2
        # print((kx + ky) / 2)
        cnt += 1

    k = sx / cnt * -1

    eva_samples = np.linspace(t_s, t_e, 100000)
    max_height = -1
    max_point = None
    max_v = None
    for sample in eva_samples:
        sample_tensor = torch.tensor([sample], dtype=torch.float64, requires_grad=True)
        pred = net(sample_tensor)
        if pred[1].item() > max_height:
            max_point = (pred[0].item(), pred[1].item())
            dx_dt = torch.autograd.grad(pred[0], sample_tensor, grad_outputs=torch.ones_like(pred[1]),
                                        create_graph=True)[0]
            dy_dt = torch.autograd.grad(pred[1], sample_tensor, grad_outputs=torch.ones_like(pred[1]),
                                        create_graph=True)[0]
            max_v = (dx_dt, dy_dt)
            max_height = pred[1].item()


    end_tensor = torch.tensor([t_e], dtype=torch.float64, requires_grad=True)
    pred = net(end_tensor)
    dx_dt = torch.autograd.grad(pred[0], end_tensor, grad_outputs=torch.ones_like(pred[1]),
                                create_graph=True)[0]
    dy_dt = torch.autograd.grad(pred[1], end_tensor, grad_outputs=torch.ones_like(pred[1]),
                                create_graph=True)[0]
    slope = dy_dt / dx_dt
    offset = pred[1].item() - slope * pred[0].item()
    s = -1 * offset / slope

    return k, max_height, s.item()

In [None]:
def calculate_parameters(data_path, submission_path):
    k_pred, height_max, range_max = calculate(data_path)
    with open(submission_path, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['k', 'height_max', 'range_max'])
        writer.writerow([k_pred, height_max, range_max])

In [None]:
#---------提交范例-------#
#---------Submission example-------#
import zipfile
import os

#导入训练集数据
# import training data

TRAIN_PATH = "/bohr/train-g6oc/v1/"
calculate_parameters(data_path=TRAIN_PATH + "projectile_train.csv", submission_path="submission_train.csv")  #求解训练集方程

In [None]:
# 导入测试集数据
#“DATA_PATH”是测试集加密后的环境变量，按照如下方式可以在提交后，系统评分时访问测试集，但是选手无法直接下载
# import test data
#"DATA_PATH" is an encrypted environment variable for the test set. It can be accessed by the system for scoring after submission, but contestants cannot directly download it.

if os.environ.get('DATA_PATH'):
    DATA_PATH = os.environ.get("DATA_PATH") + "/"
else:
    #Baseline运行时，因为无法读取测试集，所以会有此条报错，属于正常现象
    #During the execution of the Baseline, since it is unable to read the test set, this error message will appear, which is a normal phenomenon
    print(
        "During the execution of the Baseline, since it is unable to read the test set, this error message will appear, which is a normal phenomenon.")
calculate_parameters(data_path=DATA_PATH + "projectile_testA.csv",
                     submission_path="submission_testA.csv")  #求解Public测试集方程
calculate_parameters(data_path=DATA_PATH + "projectile_testB.csv",
                     submission_path="submission_testB.csv")  #求解Private测试集方程

In [None]:
# 定义要打包的文件和压缩文件名
files_to_zip = ['submission_testA.csv', 'submission_testB.csv']
zip_filename = 'submission.zip'

# 创建一个 zip 文件
with zipfile.ZipFile(zip_filename, 'w') as zipf:
    for file in files_to_zip:
        # 将文件添加到 zip 文件中
        zipf.write(file, os.path.basename(file))

print(f'{zip_filename} 创建成功!')