# Import Modules

In [29]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import xml.etree.ElementTree as ET

from torch import Tensor
from numpy import ndarray
from typing import Type

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Import Data and Data Processing

In [30]:
tree = ET.parse("./data/O-A0038-003.xml")
root = tree.getroot()
ns = {"ns": "urn:cwa:gov:tw:cwacommon:0.1"}

# long_min, long_max, lat_min, lat_max
llrange = [
    float(root.find("./ns:dataset/ns:GeoInfo/ns:{0}".format(tag), ns).text)
    for tag in [
        "BottomLeftLongitude",
        "TopRightLongitude",
        "BottomLeftLatitude",
        "TopRightLatitude"
    ]
]

# template data
data = np.array([
    [float(temp) for temp in row.split(',')]
    for row in root.find("./ns:dataset/ns:Resource/ns:Content", ns).text.split('\n')
])

# long & lat grid
long_grid, lat_grid = np.mgrid[llrange[1]:llrange[0]:120j, llrange[2]:llrange[3]:67j]

# Classification dataset (long, lat, label)
classData = np.array([
    [long_grid[i, j], lat_grid[i, j], 0 if data[i, j] == -999 else 1]
    for i in range(120)
    for j in range(67)
])

# Regression dataset (long, lat, value)
regData = np.array([
    [long_grid[i, j], lat_grid[i, j], data[i, j]]
    for i in range(120)
    for j in range(67)
    if data[i, j] != -999
])

# DataSet

In [89]:
def tvt_split(arr: ndarray) -> tuple:
    idx   = np.random.permutation(len(arr))
    n     = int(0.1*len(arr))
    train = arr[idx[2*n:]]
    valid = arr[idx[:n]]
    test  = arr[idx[n:2*n]]
    return train, valid, test

def toTensor(arr: ndarray) -> Tensor:
    return torch.tensor(arr, dtype=torch.float32)

class BasicDataSet:
    def __init__(self, _data: ndarray) -> None:
        self.data : ndarray = _data
        self.X    : ndarray = self.data[:, :2]
        self.X_t  : Tensor  = toTensor(self.X)

class NormalDataSet(BasicDataSet):
    def __init__(self, _data: ndarray) -> None:
        super(NormalDataSet, self).__init__(_data)
        self.Y  : ndarray = self.data[:, 2:]
        self.Y_t: Tensor  = toTensor(self.Y)

class OneHotDataSet(BasicDataSet):
    def __init__(self, _data: ndarray) -> None:
        super(OneHotDataSet, self).__init__(_data)
        self.Y  : ndarray = np.array([[1, 0] if row[2] == 0 else [0, 1] for row in self.data])
        self.Y_t: Tensor  = toTensor(self.Y)

class DataSet:
    def __init__(self, _data: ndarray, _bds: Type[BasicDataSet]) -> None:
        self.data : ndarray = _data
        self.train: Type[BasicDataSet]
        self.valid: Type[BasicDataSet]
        self.test : Type[BasicDataSet]
        self.train, self.valid, self.test = tuple(_bds(splitdata) for splitdata in tvt_split(_data))

classDS = DataSet(classData, NormalDataSet)
regDS   = DataSet(regData  , NormalDataSet)

# Network

In [94]:
class ClassModel(nn.Module):
    def __init__(self) -> None:
        super(ClassModel, self).__init__()
        self.seq = nn.Sequential(
            nn.Linear(2, 10),
            nn.ReLU(),
            nn.Linear(10, 10),
            nn.ReLU(),
            nn.Linear(10, 1),
            nn.Sigmoid()
        )
    
    def forward(self, X):
        return self.seq(X)

class RegModel(nn.Module):
    def __init__(self) -> None:
        super(RegModel, self).__init__()
        self.seq = nn.Sequential(
            nn.Linear(2, 10),
            nn.Tanh(),
            nn.Linear(10, 10),
            nn.Tanh(),
            nn.Linear(10, 1)
        )
    
    def forward(self, X):
        return self.seq(X)

# Training Function

In [95]:
def trainingFunc(net, criterion, optimizer, trainEpoch: int, DS: DataSet) -> tuple:
    trainlosslist = []
    validlosslist = []

    print(" Epoch | trainLoss | validLoss ")
    print("-------|-----------|-----------")

    for epoch in range(trainEpoch):
        # train data
        optimizer.zero_grad()
        y_train = net(DS.train.X_t)
        trainloss = criterion(y_train, DS.train.Y_t)
        trainloss.backward()
        optimizer.step()

        # valid data
        y_valid = net(DS.valid.X_t)
        validloss = criterion(y_valid, DS.valid.Y_t)

        # record
        trainlosslist.append(trainloss.item())
        validlosslist.append(validloss.item())

        if epoch % 100 == 0:
            print(" {0:5d} | {1:9.6f} | {2:9.6f} ".format(epoch, trainloss.item(), validloss.item()))
    
    return trainlosslist, validlosslist

# Training

In [96]:
net        = ClassModel()
criterion  = nn.BCELoss()
optimizer  = torch.optim.Adam(net.parameters(), lr=0.01)
trainEpoch = 1000

trainlosslist, validlosslist = trainingFunc(net, criterion, optimizer, trainEpoch, classDS)

 Epoch | trainLoss | validLoss 
-------|-----------|-----------
     0 |  7.602461 |  6.650561 
   100 |  0.686394 |  0.671436 
   200 |  0.686363 |  0.670692 
   300 |  0.686356 |  0.670699 
   400 |  0.686349 |  0.670699 
   500 |  0.686340 |  0.670699 
   600 |  0.686331 |  0.670700 
   700 |  0.686323 |  0.670702 
   800 |  0.686314 |  0.670704 
   900 |  0.686306 |  0.670707 


In [97]:
with open("tmp.txt", 'w') as file:
    for i in range(120):
        for j in range(67):
            y = net(toTensor([long_grid[i, j], lat_grid[i, j]]))
            file.write('000' if y[0] < 0.5 else '111')
        file.write('\n')