In [None]:
import pandas as pd

def load_data(idx: int) -> tuple[pd.DataFrame, pd.DataFrame]:
    data = pd.read_csv(f"C:/Users/ljw10/Desktop/LeeJunWoo/gshs/contest/creative25/data/{idx}.csv")
    label = pd.read_csv(f"C:/Users/ljw10/Desktop/LeeJunWoo/gshs/contest/creative25/label/{idx}.csv")
    data.fillna(0, inplace=True)

    return (data, label)

In [None]:
import torch

def data_to_tensor(data: pd.DataFrame) -> torch.Tensor:
    d_lon, d_lat = (data.iloc[-1] - data.iloc[0])[["lon", "lat"]] * 2 + 1
    d_lon, d_lat = int(d_lon), int(d_lat)

    res = torch.zeros(4, d_lon, d_lat, dtype=torch.float32)
    pixels = torch.tensor(data["pixels"].values, dtype=torch.float32)
    vx = torch.tensor(data["vx"].values, dtype=torch.float32)
    vy = torch.tensor(data["vy"].values, dtype=torch.float32)
    hum = torch.tensor(data["hum"].values, dtype=torch.float32)

    row = torch.arange(d_lon * d_lat) // d_lat
    col = torch.arange(d_lon * d_lat) % d_lat
    res[0, row, col] = pixels
    res[1, row, col] = vx
    res[2, row, col] = vy
    res[3, row, col] = hum
    return res

In [None]:
def label_to_tensor(label: pd.DataFrame) -> torch.Tensor:
    min_lon, min_lat = label["lon"].min(), label["lat"].min()
    d_lon, d_lat = (label["lon"].max() - label["lon"].min()) * 2 + 1, (label["lat"].max() - label["lat"].min()) * 2 + 1
    d_lon, d_lat = int(d_lon), int(d_lat)

    res = torch.zeros(d_lon, d_lat, dtype=torch.float32)
    for i in label.itertuples():
        res[int((i.lon - min_lon) * 2), int((i.lat - min_lat) * 2)] = i.pixels  # type: ignore

    return res

In [None]:
def data_compose(idx: int) -> tuple[torch.Tensor, torch.Tensor, int, int] | None:
    data, label = load_data(idx)
    dmilon, dmilat = data.iloc[0][["lon", "lat"]] * 2
    dmalon, dmalat = data.iloc[-1][["lon", "lat"]] * 2
    lmilon, lmilat = label["lon"].min() * 2, label["lat"].min() * 2
    lmalon, lmalat = label["lon"].max() * 2, label["lat"].max() * 2

    min_lon, min_lat = min(dmilon, lmilon), min(dmilat, lmilat)
    max_lon, max_lat = max(dmalon, lmalon), max(dmalat, lmalat)
    if max_lon - min_lon > 24 or max_lat - min_lat > 24:
        return None

    tensor_data = data_to_tensor(data)
    tensor_label = label_to_tensor(label)

    res_data = torch.zeros(4, 25, 25, dtype=torch.float32)
    res_data[:, int(dmilon - min_lon):int(dmalon - min_lon + 1), int(dmilat - min_lat):int(dmalat - min_lat + 1)] = tensor_data
    res_label = torch.zeros(25, 25, dtype=torch.float32)
    res_label[int(lmilon - min_lon):int(lmalon - min_lon + 1), int(lmilat - min_lat):int(lmalat - min_lat + 1)] = tensor_label
    return (res_data, res_label, int(max_lon - min_lon + 1), int(max_lat - min_lat + 1))

In [None]:
import random

def augment(data: torch.Tensor, label: torch.Tensor, lon_size: int, lat_size: int) -> list[torch.Tensor]:
    lon_move = random.randint(0, 25 - lon_size)
    lat_move = random.randint(0, 25 - lat_size)
    res_data = torch.zeros(4, 25, 25, dtype=torch.float32)
    res_label = torch.zeros(25, 25, dtype=torch.float32)

    res_data[:, lon_move:(lon_move + lon_size), lat_move:(lat_move + lat_size)] = data[:, :lon_size, :lat_size]
    res_label[lon_move:(lon_move + lon_size), lat_move:(lat_move + lat_size)] = label[:lon_size, :lat_size]
    return [res_data, res_label]

In [None]:
t1, t2, s1, s2 = data_compose(1)  # type: ignore
d, l = augment(t1, t2, s1, s2)
d2, l2 = d.reshape(1, 2500), l.reshape(1, 625)
print(l2.shape)

In [None]:
from torch import nn

class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Sequential(
            nn.Linear(2500, 100),
            nn.ReLU(),
            nn.Linear(100, 50),
            nn.ReLU(),
            nn.Linear(50, 625),
            nn.ReLU(),
        )
    
    def forward(self, x):
        x = self.linear(x)
        return x

In [None]:
def data_complete(batch_size: int, learn: bool) -> tuple[torch.Tensor, torch.Tensor]:
    res_data = []
    res_label = []
    cnt = 0

    while cnt < batch_size:
        idx = random.randint(0, 130) if learn else random.randint(131, 179)
        comp = data_compose(idx)
        if not comp:
            continue

        t1, t2, s1, s2 = comp
        data, label = augment(t1, t2, s1, s2)
        res_data.append(data.reshape(1, 2500))
        res_label.append(label.reshape(1, 625))
        cnt += 1

    return (torch.cat(res_data), torch.cat(res_label))

In [None]:
from torch import optim
from torch.utils.data import DataLoader, TensorDataset

LR = 0.005

model = Model()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LR)
history_learn = []
history_test = []

x_train, y_train = data_complete(2048, True)
x_test, y_test = data_complete(512, False)
train_dataset = TensorDataset(x_train, y_train)
test_dataset = TensorDataset(x_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

for i in range(15):
    model.train()
    loss_sum1 = 0.0
    for x_batch, y_batch in train_loader:
        optimizer.zero_grad()
        outs = model(x_batch)
        loss = criterion(outs, y_batch)
        with torch.no_grad():
            loss_sum1 += loss
        loss.backward()
        optimizer.step()
    
    model.eval()
    loss_sum2 = 0.0
    with torch.no_grad():
        for x_batch, y_batch in test_loader:
            outs = model(x_batch)
            loss_sum2 += criterion(outs, y_batch)
    
    history_learn.append(loss_sum1)
    history_test.append(loss_sum2)
    print(i, end=' ')
    if i % 20 == 19:
        print()

In [None]:
import matplotlib.pyplot as plt

plt.plot(range(15), history_learn)
plt.plot(range(15), history_test)
plt.show()

In [None]:
t1, t2, s1, s2 = data_compose(170)  # type: ignore
tensor = t1[0]
plt.figure(figsize=(24, 6))
plt.subplot(1, 3, 1)
plt.imshow(tensor.numpy(), cmap='Reds', aspect='auto')
plt.colorbar()
plt.title('Before')
plt.xlabel('longitude')
plt.ylabel('latitude')
plt.xticks([], [])
plt.yticks([], [])

In [None]:
data = t1.reshape(1, 2500)
tensor = model(data).reshape(25, 25).floor()

plt.figure(figsize=(24, 6))
plt.subplot(1, 3, 1)
plt.imshow(tensor.detach().numpy(), cmap='Reds', aspect='auto')
plt.colorbar()
plt.title('After')
plt.xlabel('longitude')
plt.ylabel('latitude')
plt.xticks([], [])
plt.yticks([], [])

In [None]:
plt.figure(figsize=(24, 6))
plt.subplot(1, 3, 1)
plt.imshow(t2.numpy(), cmap='Reds', aspect='auto')
plt.colorbar()
plt.title('Before')
plt.xlabel('longitude')
plt.ylabel('latitude')
plt.xticks([], [])
plt.yticks([], [])