In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import cv2
import os
import numpy as np
import pandas as pd

In [2]:
def masking(cr, img, x):
    mask = cv2.inRange(img, np.array(cr[0]), np.array(cr[1]))
    cl = mask[:, x]
    try:
        idx = np.where(cl > 0)[0][0]
    except IndexError:
        print(cr)
        return 10000000
    return idx


def smooth(idx, h):
    return max(1 - idx / h, 0)


def get_data(image_label, x):  #此函数用于使用cv2提取在浓度为x时的产物比例. This function is used to extract the product ratio at a concentration of x using cv2.
    # 读取图像，Read image
    img = cv2.imread(f"{image_label}")
    if img is None:
        raise ValueError(f"Cannot read the img: {image_label}.png")

    cropped_img = img[97:711,126:899]

    # 获取裁剪后图像尺寸，Get the size of the cropped image
    height, width = cropped_img.shape[:2]

    # 将x值映射到图像像素坐标，Map the x value to the image pixel coordinates.
    x_min, x_max = 2, 12  # 图像x轴范围，Image x-axis range.
    x_pixel = int((x - x_min) * width / (x_max - x_min))

    # 确保x_pixel在有效范围内，Ensure x_pixel is within the valid range.
    x_pixel = max(0, min(x_pixel, width-1))

    # 定义每种元素对应的颜色范围 (OpenCV读入自动默认BGR格式)，Define the color range corresponding to each element (OpenCV reads in BGR format by default).
    color_ranges = {
        'N2': ([160, 0, 0], [255, 140, 100]),       # 蓝色，Blue
        'NH4_ion': ([0, 110, 250], [120, 170, 255]), # 橘黄色，Orange
        'N2O': ([0, 160, 0], [150, 255, 150]),      # 绿色，Green
        'NO': ([0, 0, 160], [100, 100, 255]),        # 红色，Red
        'NO2': ([180, 100, 140], [200, 120, 160]),    # 紫色，Purple
    }
    #
    smoothed_results = {'N2': smooth(masking(color_ranges['N2'], cropped_img, x_pixel), height),
                        'NH4_ion': smooth(masking(color_ranges['NH4_ion'], cropped_img, x_pixel), height),
                        'N2O': smooth(masking(color_ranges['N2O'], cropped_img, x_pixel), height),
                        'NO': smooth(masking(color_ranges['NO'], cropped_img, x_pixel), height),
                        'NO2': smooth(masking(color_ranges['NO2'], cropped_img, x_pixel), height)}

    return (smoothed_results['N2'], smoothed_results['NH4_ion'], smoothed_results['N2O'],
            smoothed_results['NO'], smoothed_results['NO2'])

In [3]:
datapath_train = "/bohr/train-gvtn/v1/"
# datapath_train = ""
input_csv_path_train = os.path.join(datapath_train + 'input_train.csv')
ref_path = os.path.join(datapath_train + 'ref_result_train.csv')
data_train = pd.read_csv(input_csv_path_train)
ref_train = pd.read_csv(ref_path)


class EnvDataset(torch.utils.data.Dataset):
    def __init__(self, feature, ref):
        super().__init__()
        self.feature = feature
        self.ref = ref

    def __len__(self):
        return len(self.feature)

    def __getitem__(self, idx):
        return self.feature[idx], self.ref[idx]


class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(5, 8),
            nn.Tanh(),
            nn.Linear(8, 32),
            nn.Tanh(),
            nn.Linear(32, 64),
            nn.Tanh(),
            nn.Linear(64, 8),
        )

    def forward(self, x):
        return self.net(x)

In [4]:
features = []
refs = []
for i in range(5):
    features.append(get_data(os.path.join(datapath_train, data_train['File Name'][i]), data_train['c'][i]))
    refs.append([ref_train['p_1'][i], ref_train['p_2'][i], ref_train['p_3'][i], ref_train['p_4'][i],
                 ref_train['p_5'][i], ref_train['p_6'][i], ref_train['p_7'][i], ref_train['p_8'][i]])

features = torch.tensor(features, dtype=torch.float32)
refs = torch.tensor(refs, dtype=torch.float32)

print(features)
print(refs)

([160, 0, 0], [255, 140, 100])
([0, 110, 250], [120, 170, 255])
([0, 110, 250], [120, 170, 255])
tensor([[0.0554, 0.0016, 0.1091, 0.4870, 0.3502],
        [0.0000, 0.0000, 0.0098, 0.3844, 0.6091],
        [0.0293, 0.0000, 0.0749, 0.4300, 0.4674],
        [0.1938, 0.2427, 0.1564, 0.3893, 0.0212],
        [0.1531, 0.0130, 0.1889, 0.4625, 0.1857]])
tensor([[2.7209, 0.7209, 0.2168, 0.3014, 0.0671, 0.0342, 0.0000, 1.3604],
        [3.1030, 1.1030, 0.6645, 0.4187, 0.0099, 0.0000, 0.0000, 1.5515],
        [2.8337, 0.8337, 0.3519, 0.3237, 0.0566, 0.0224, 0.0000, 1.4168],
        [2.5049, 0.4281, 0.0066, 0.1231, 0.0492, 0.0616, 0.0769, 1.0988],
        [2.5763, 0.5714, 0.0795, 0.1969, 0.0799, 0.0652, 0.0048, 1.2785]])


In [5]:
dataset = EnvDataset(features, refs)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=2, shuffle=True)


NUM_EPOCHS = 40000

net = Net()
criterion = nn.MSELoss()
optimizer = optim.Adam(net.parameters(), lr=0.0001)

for epoch in range(NUM_EPOCHS):
    for feature, ref in dataloader:
        optimizer.zero_grad()
        pred = net(feature)
        loss = criterion(pred, ref)
        loss.backward()
        optimizer.step()
    if epoch % 100 == 0:
        with torch.no_grad():
            pred = net(features)
            loss = criterion(pred, refs)
            print(f"epoch: {epoch}, loss: {loss}")

epoch: 0, loss: 1.4476737976074219
epoch: 100, loss: 0.1930593103170395
epoch: 200, loss: 0.016677487641572952
epoch: 300, loss: 0.015491415746510029
epoch: 400, loss: 0.014125319197773933
epoch: 500, loss: 0.012694112956523895
epoch: 600, loss: 0.011224610731005669
epoch: 700, loss: 0.009769524447619915
epoch: 800, loss: 0.008401504717767239
epoch: 900, loss: 0.007148941047489643
epoch: 1000, loss: 0.005951192229986191
epoch: 1100, loss: 0.004899120423942804
epoch: 1200, loss: 0.003961133770644665
epoch: 1300, loss: 0.003282382385805249
epoch: 1400, loss: 0.002880636602640152
epoch: 1500, loss: 0.0026600512210279703
epoch: 1600, loss: 0.0024578191805630922
epoch: 1700, loss: 0.0023010889999568462
epoch: 1800, loss: 0.0021683776285499334
epoch: 1900, loss: 0.002063232008367777
epoch: 2000, loss: 0.0019644179847091436
epoch: 2100, loss: 0.0018870480125769973
epoch: 2200, loss: 0.0018661494832485914
epoch: 2300, loss: 0.0017332652350887656
epoch: 2400, loss: 0.0016873767599463463
epoch: 

In [11]:
#-------------读取测试集---------------#“DATA_PATH”是测试集加密后的环境变量，按照如下方式可以在提交后，系统评分时访问测试集，但是选手无法直接下载
#----Read the testing set, “DATA_PATH” is an environment variable for the encrypted test set. After submission, you can access the test set for system scoring in the following manner, but the contestant cannot download it directly.-----#
if os.environ.get('DATA_PATH'):
    DATA_PATH = os.environ.get("DATA_PATH") + "/"
else:
    print("Baseline运行时，因为无法读取测试集，所以会有此条报错，属于正常现象")
    print("When baseline is running, this error message will appear because the test set cannot be read, which is a normal phenomenon.")
    #Baseline运行时，因为无法读取测试集，所以会有此条报错，属于正常现象
    #When baseline is running, this error message will appear because the test set cannot be read, which is a normal phenomenon.

datapath_test = DATA_PATH
# datapath_test = ""
input_csv_path_test = os.path.join(datapath_test + 'input_test.csv')
data_test = pd.read_csv(input_csv_path_test)

#---对测试数据进行计算--Calculate on the test data----#
output_data = [] # 用于存储测试集输出结果的列表，A list used to store the output results of the test set.

# 遍历每一张图和x，Traverse each image and x.
for index, row in data_test.iterrows():
    image_label = os.path.join(datapath_test, row['File Name'])  # 获取文件名（不带扩展名）并与datapath连接
    x_value = row['c']

    # 调用get_data函数处理图像并获取结果，Invoke the get_data function to process the image and obtain the results.
    results = get_data(image_label, x_value)

    # 调用calculate函数计算值，Call the calculate function to compute the value.
    pred = net(torch.tensor(results, dtype=torch.float32))
    calculated_values = pred.detach().numpy()

    # 将结果添加到输出数据中，Add the result to the output data.
    output_data.append({
        'File Name': row['File Name'],
        'Scaled mol X': 1,
        'p_1': calculated_values[0],
        'p_2': calculated_values[1],
        'Scaled mol X+': 1,
        'p_3': calculated_values[2],
        'p_4': calculated_values[3],
        'p_5': calculated_values[4],
        'p_6': calculated_values[5],
        'p_7': calculated_values[6],
        'p_8': calculated_values[7]
    })

# 创建输出DataFrame并保存为CSV文件，Create the output DataFrame and save it as a CSV file
output_df = pd.DataFrame(output_data)
output_csv_path = os.path.join('submission.csv')
print(output_df)
output_df.to_csv(output_csv_path, index=False)

Baseline运行时，因为无法读取测试集，所以会有此条报错，属于正常现象
When baseline is running, this error message will appear because the test set cannot be read, which is a normal phenomenon.
([160, 0, 0], [255, 140, 100])
([0, 110, 250], [120, 170, 255])
([0, 110, 250], [120, 170, 255])
  File Name  Scaled mol X       p_1       p_2  Scaled mol X+       p_3  \
0    00.png             1  2.720835  0.720850              1  0.216815   
1    27.png             1  3.103030  1.103048              1  0.664504   
2    72.png             1  2.833675  0.833654              1  0.351929   
3    77.png             1  2.504925  0.428070              1  0.006615   
4    96.png             1  2.576205  0.571398              1  0.079499   

        p_4       p_5       p_6       p_7       p_8  
0  0.301398  0.067088  0.034252 -0.000021  1.360422  
1  0.418671  0.009935  0.000004 -0.000002  1.551517  
2  0.323679  0.056619  0.022401  0.000003  1.416836  
3  0.123054  0.049171  0.061616  0.076841  1.098757  
4  0.196845  0.079898  0.0