In [6]:
import torch
import numpy as np
import pandas as pd
import scipy.sparse as sparse
import scipy.sparse.linalg as sla
import json
from pathlib import Path
from tqdm import tqdm
from FVM.src.ICD import VolumnCenteredScheme
from FVM.src.Problem import ChipHeatDissipation, NormChipHeatDissipation, VaryDiffusionCof
from FVM.src.utils import readmesh
from utils import *
from MyPlot import *
from SegModel import *


chips = [
    [0.016, 0.012, 4000], [0.012, 0.006, 16000], [0.018, 0.009, 6000], [0.018, 0.012, 8000],
    [0.018, 0.018, 10000], [0.012, 0.012, 14000],[0.018, 0.006, 16000], [0.009, 0.009, 20000],
    [0.006, 0.024, 8000], [0.006, 0.012, 16000], [0.012, 0.024, 10000], [0.024, 0.024, 20000]]

class Tester:
    def __init__(
            self, 
            ckpt_save_path,
            hyper_parameters_save_path, 
            img_save_path, 
            device='cuda',
            dtype=torch.float,
            a=0.1):
        self.ckpt_save_path = ckpt_save_path
        self.hyper_parameters_save_path = hyper_parameters_save_path
        self.img_save_path = img_save_path
        self.device = device
        self.dtype = dtype
        self.a = a

        with open(hyper_parameters_save_path) as file:
            self.kwargs = json.load(file)
        # self.mesh()

    def _get_net(self):
        net_kwargs = self.kwargs['net_kwargs'].copy()
        model_name = net_kwargs.pop('model_name')
        self.net = CNN(model_name, net_kwargs, self.dtype, self.device)
    
    def save_img(self, path, pres, ans, Force):
        for i, (f, pre, u) in enumerate(zip(Force, pres, ans)):
            save_img(path/f'case{i+1}', f, pre, u, self.xx, self.yy, None)

    # def draw_img(self, pres, ans, Force):
    #     for i, (f, pre, u) in enumerate(zip(Force, pres, ans)):
    #         draw_img(f'{self.exp_name}-case{i+1}', f, pre, u, self.xx, self.yy, None)
     
    def load_ckpt(self, exp_name, best_or_last):
        ckpt = torch.load(f"{self.ckpt_save_path}/{exp_name}/{best_or_last}.pt")
        self._get_net()
        self.net.load_state_dict(ckpt)
        self.net.eval()

    def mesh(self):
        (self.left, self.bottom), (self.right, self.top) = self.area
        self.dx = (self.right - self.left) / self.GridSize
        self.dy = (self.top - self.bottom) / self.GridSize

        self.xx, self.yy = np.meshgrid(
            np.arange(self.left + self.h/2, self.right, self.dx),
            np.arange(self.bottom + self.h/2, self.top, self.dy)) 

    def predict(self, tensor):
        with torch.no_grad():
            predictions = self.net(tensor)
            return predictions.cpu().numpy().reshape(-1, self.GridSize, self.GridSize)   

In [5]:
from UniformICD import UniformFVM
from Problems.BlockSourceProblem import BlockSourceProblem
from fit_heat_multibc import C2DS, C5DS
from utils import layout2csv

class HeatTester(Tester):
    def __init__(
            self,
            exp_name, best_or_last, 
            **kwargs):
        super().__init__(**kwargs)
        self.load_ckpt(exp_name, best_or_last)
        
        self.area = self.kwargs["area"]
        self.GridSize = self.kwargs["GridSize"]
        self.boundary_gap = self.kwargs["boundary_gap"]
        self.chip_gap = self.kwargs["chip_gap"]
        self.mesh()
        
    def gen_layouts(self, DataN):
        df = layout2csv(DataN, self.area, self.GridSize, self.boundary_gap, self.chip_gap)
        layouts = []

        for _, data in df.groupby("idx"):
            info = data.values[:, 1:]
            chips_layout = ChipLayout(info)
            layout = chips_layout(self.xx, self.yy)
            layouts.append(torch.from_numpy(layout))
        layouts = torch.stack(layouts, dim=0)
        return layouts

    def gen_linsys(self):
        self.A, self.B = [], []
        for bd_case in [0, 1, 2]:
            problem = BlockSourceProblem(
                    None, 
                    bd_case, 298, 
                    self.kwargs["GridSize"], 
                    self.kwargs["area"]
                    )
            
            solver = UniformFVM(
                    self.kwargs["area"], 
                    self.kwargs["GridSize"], 
                    self.kwargs["GridSize"], 
                    problem
                    )
            
            A, b = solver.get_A()
            
            self.B.append(b)
            self.A.append(A)
    
    def init_testds(self, bd_cases):
        DataN = len(bd_cases)
        layouts = self.gen_layouts(DataN)
        match self.kwargs['net_kwargs']['in_channels']:
            case 2:
                ds = C2DS(
                    self.kwargs["area"], 
                    self.kwargs["GridSize"], 
                    layouts, self.dtype, self.device
                )
            case 5:
                ds = C5DS(
                    self.kwargs["area"], 
                    self.kwargs["GridSize"], 
                    layouts, self.dtype, self.device
                )
        self.ds = ds
    
    def test(self, bd_cases):
        self.init_testds(bd_cases)

        for data, layout, boundary, bd_case in self.ds:
            pre = self.net(data)

            b = layout.reshape(-1) * self.h * self.h + self.B[bd_case]
            A = self.A[bd_case]

            ans = spsolve(A, b).reshape(self.GridSize, self.GridSize)

    




In [None]:
from gen_data import ChipsDataGenerator

# Generate layouts for testing randomly
test_nums = 3
generator = ChipsDataGenerator(test_nums, ((0, 0), (0.1, 0.1)), 0.001, 0.001)
infos_df = generator.layout2csv(f'./test_info/heat_test.csv')
# # initialize tester
# heat_tester = HeatTester(
#     ckpt_save_path='./model_save/heat',
#     img_save_path='./images/heat',
#     a=0.1
# )
# heat_tester.test_all(infos_df, 'best_train', 'save')

In [None]:
class varykTester(Tester):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
    
    def _unpack_exp_name(self, exp_name):
        args = exp_name.split('-')
        self.t = int(args[0][-1])
        self.GridSize = int(args[1])
        self.h = self.a / self.GridSize

        self.net = self._get_net(args[2])
        self._mesh()
        self.b = np.load(f'./DLdata/square{self.t}x{self.t}/GridSize-{self.GridSize}/b.npy')
        
    def solve(self, mus):
        mesh = readmesh(f'./FVM/my_meshes/UniformQuad-VaryK-{self.GridSize}.obj')
        solver = VolumnCenteredScheme(mesh)
        U = []
        for mu in mus:
            problem = VaryDiffusionCof(mu, eps=self.h**2)
            A = solver.get_A(problem).tocsr()
            ref = sparse.linalg.spsolve(A, self.b).reshape(self.GridSize, self.GridSize)
            U.append(ref)
        return np.stack(U, axis=0)
    
    def assemble_tensor(self, mus):
        tensor, cofs = [], []
        for mu in mus:
            cof_func = PieceWiseConst(mu)
            cofs.append(cof_func(self.xx, self.yy))
            tensor.append(np.stack((self.xx, self.yy, cof_func(self.xx, self.yy)), axis=0))
        tensor = torch.from_numpy(np.stack(tensor, axis=0)).float()
        tensor = tensor.cuda()

        cofs = np.stack(cofs, axis=0)
        return tensor, cofs
    
    def test_all(self, mus, t, best_or_last, save_or_draw = 'save'):
        mus = mus.reshape(-1, t, t)
        for exp_name in Path(self.ckpt_save_path).glob(f'*{t}x{t}*'):
            exp_name = str(exp_name).split('/')[-1]

            self._unpack_exp_name(exp_name)

            refs = self.solve(mus)
            tensor, cofs = self.assemble_tensor(mus)

            self.load_ckpt(exp_name, best_or_last)
            pres = self.predict(tensor)

            match save_or_draw:
                case 'save':
                    self.save_img(Path(f"{self.img_save_path}/{exp_name}"), pres, refs, cofs)
                    
                case 'draw':
                    self.draw_img(pres, refs, cofs)


In [None]:
from gen_data import CofsDataGenerator

# Generate layouts for testing randomly
test_nums = 3
# # initialize tester
varyk_tester = varykTester(
    ckpt_save_path='./model_save/varyk',
    img_save_path='./images/varyk',
    a=1.0
)
for t in [4]:
    # generator = CofsDataGenerator(test_nums,  0.1, 10,)
    # df =  generator.cofs2csv(t, f'./test_info/varyk-{t}.csv')
    # varyk_tester.test_all(df.values, t, 'best_train', 'save')
    data = np.array([ 1, 1, 2, 2, 1, 1, 2, 2, 3, 3, 4, 4, 3, 3, 4, 4])
    varyk_tester.test_all(data, t, 'best_train', 'save')
    

In [None]:
from varyUNet import VaryUNet
class nonlinearTester(Tester):
    def __init__(self, **kwargs):
        super().__init__(kwargs)

    def force(self, x, y, center, delta=0.025):
        px, py = center
        # mask = (x > px-delta) * (x <= px+delta) * (y > py-delta) * (y <= py+delta)
        force = 100 * torch.exp(-50 * ((x - px)**2 + (y - py)**2))
        return force

    def _get_net(self, features, layers, end_padding='valid'):
        net = VaryUNet(3, 1, features, layers, end_padding)
        net = net.to(self.device)
        return net
    
    def assemble_tensor(self, centers):
        Force = np.stack([self.force(self.xx, self.yy, c) for c in centers])
        F = torch.stack([torch.stack([self.xx, self.yy, f]) for f in Force])
        tensor = F.cuda().float()
        return Force, tensor

    def solve(self, Force):
        tenst_nums = len(Force)

    def picard_solve()


In [None]:
from SegModel import *

class Tester:
    def __init__(self, ckpt_path):
        pass

    def unpack_ckpt_name(self, name):
        tag, block_name, planes, layer_num, adaptor_num, \
        norm_method, pool_method, padding, padding_mode, GridSize, epochs,\
        batch_size, lr, method, maxiter = \
        name.split('-')

        match block_name:
            case 'BasicBlock':
                Block=BasicBlock, 
            case 'ResBasicBlock':
                Block=ResBasicBlock,          
            case 'ResBottleNeck':
                Block=ResBottleNeck, 
        net = DBHSegModel(
            Block=Block, 
            planes=int(planes),
            in_channels=3,
            classes=1,
            GridSize=int(GridSize),
            layer_nums=[int(i) for i in layer_num.split('#')],
            adaptor_nums=[int(i) for i in adaptor_num.split('#')],
            norm_method=norm_method,
            pool_method=pool_method,
            padding=padding,
            center=True,
            device='cpu'
        )

        


In [12]:
import json
from SegModel import *
from fit_heat_multibc import *
def load_hyper_params(js_path):
    with open(js_path) as f:
        js = json.load(f)
    return dict(js)

def load(js_path, ckpt_path, device):
    params = load_hyper_params(js_path)
    block_name, planes, layer_num, adaptor_num, factor,\
        norm_method, pool_method, padding_mode = params['net_kwargs'].split('-')

    match block_name:
            case 'BasicBlock':
                Block=BasicBlock
            case 'ResBasicBlock':
                Block=ResBasicBlock          
            case 'ResBottleNeck':
                Block=ResBottleNeck

    net = DBHSegModel(
        Block=Block, 
        planes=int(planes),
        in_channels=2,
        classes=1,
        GridSize=params['GridSize'],
        layer_nums=[int(i) for i in layer_num.split('#')],
        adaptor_nums=[int(i) for i in adaptor_num.split('#')],
        factor=int(factor),
        norm_method=norm_method,
        pool_method=pool_method,
        padding='same',
        padding_mode=padding_mode,
        center=True,
        device=device
    )
    ckpt = torch.load(ckpt_path)
    net = net.load_state_dict(ckpt)

    trainer = MyTrainer(
        
    )
    return trainer

net, data_generator = load(
    'params/test/BFITPAdd-ResBottleNeck-16-2#4#4#8-2#4#4#8-2-layer-avg-reflect_256_500_15_1.00e-03_jac-10.json',
    'model_save/test/BFITPAdd-ResBottleNeck-16-2#4#4#8-2#4#4#8-2-layer-avg-reflect_256_500_15_1.00e-03_jac-10/last.pt',
    'cpu')

dataN = 5
layouts = data_generator.gen_layouts(dataN)
boundaries = data_generator.gen_boundaries(np.random.choice([0, 1, 2], dataN))

