In [1]:
# Import
from classes.base import *
import dolfinx
from dolfinx import mesh, fem, io, nls
from dolfinx.fem import VectorFunctionSpace
from mpi4py import MPI
import numpy as np
import ufl
import matplotlib.pyplot as plt

import jsonpickle as jp
import shutil
import re
import inspect
from tqdm import tqdm

And = Infix(ufl.And)


def check_symmetry(
    func: fem.Function,
    data,
    add_point=0,
):
    from matplotlib.ticker import FormatStrFormatter

    def find_middle(add_point):
        middle_x = (data.light.left + data.light.right) / 2
        middle_point = int((data.mesh.left + middle_x) /
                           (data.mesh.right - data.mesh.left) * base.len)
        middle_point += add_point
        return middle_point

    def plot(func: ArrayFunc):
        fig, ax = plt.subplots(facecolor='White')
        fig.set_size_inches(20, 10)
        ax.grid(True, which='Both')
        ax.set_xlim((min(func.cord[0]), max(func.cord[0])))
        ax.set_ylim((min(func.cord[1]), max(func.cord[1])))
        ax.yaxis.set_major_formatter(FormatStrFormatter(('%.3f')))
        plt.plot([middle, middle], (min(func.cord[1]), max(func.cord[1])),
                 c='red')

        ax.plot(func.cord[0], func.cord[1], label=func.name, color='black')
        ax.fill_between(
            func.cord[0],
            func.cord[1] * 0,
            func.cord[1],
            where=func.cord[1] > func.cord[1] * 0,
            alpha=0.5,
            facecolor='green',
            label=func.name,
        )
        ax.fill_between(
            func.cord[0],
            func.cord[1] * 0,
            func.cord[1],
            where=func.cord[1] < func.cord[1] * 0,
            alpha=0.5,
            facecolor='red',
            label=func.name,
        )

    print('Rule: Right - left')
    base = ArrayFunc(func, 'base')
    dif = ArrayFunc(func, 'dif')
    middle = find_middle(add_point)

    dif.mirror(middle)
    dif.translate(-middle)
    dif.cord[1] = (dif.cord[1] - base.cord[1]) / base.cord[1]
    dif.cord[1, dif.cord[0] >= middle] = 0
    plot(dif)
    plot(base)


def clear_savedir(path_save):
    """Clear directory in VTK folder"""

    try:
        shutil.rmtree(path_save)
    except:
        pass


def view_file1D(
    name,
    dir='/home/VTK/System1D_files/',
    view_dir='/home/VTK/System1D_files/0',
):
    shutil.copytree(dir + name, view_dir, dirs_exist_ok=True)


def repr_str(func, rules: list = {}) -> str:
    # TODO: to module
    # FIXME: (( to (\n(
    rep = str(func)
    rep = re.sub(r'\{ A \| A_\{i_(\d|\{\d+\})\} \=', '', rep)
    rep = re.sub(r'\[i_(\d|{\d+})]', '', rep)
    rep = re.sub(r'dx.+\)', 'dx', rep)
    rep = re.sub(r'ds.+\)', 'ds', rep)
    for key, raw in rules.items():
        rep = rep.replace(str(raw), key)

    base = {
        'v_0[0]': 'v',
        'v_0[1]': 'u',
        '-1 * ': '-',
        '+ -': '-',
        'f[0]': 'N',
        'f[1]': 'P',
        '{': '{\n',
        '} *': '\n} *',
    }
    for key, raw in base.items():
        rep = rep.replace(key, raw)

    beauty = {
        '(grad(P))': 'grad(P)',
        '(grad(N))': 'grad(N)',
        '(grad(u))': 'grad(u)',
        '(grad(v))': 'grad(v)',
        '})': ')',
        '+ (': '\n   +(',
    }
    for key, raw in beauty.items():
        rep = rep.replace(key, raw)
    return rep


In [2]:
# Set up and redefine parametrs
from classes import parametrs

DATA = parametrs.Data('test', 'test desc')
_r = DATA.rates
_r.a, _r.b, _r.e, _r.general = 1, 0.1, 0.5, 0.01

DATA.time.line = np.arange(0, 1,0.001)
DATA.time.check = DATA.time.line[::100]

_l = DATA.light_confs
_l.left, _l.right, _l.type = 0.1, 0.3, 'sigmoid'
DATA.print_info__()

  save_confs  solution_name: test
  save_confs  description: test desc
  save_confs  file_name: solve
  save_confs  dir: /home/Solves/
  solver_confs  solving: {'convergence': 'incremental', 'tolerance': 1e-06}
  solver_confs  petsc: {'ksp_type': 'preonly', 'pc_type': 'lu', 'pc_factor_mat_solver_type': 'mumps'}
  solver_confs  form: {}
  solver_confs  jit: {}
  mesh_confs  left: 0
  mesh_confs  right: 1
  mesh_confs  intervals: 100
  mesh_confs  degree: 1
  mesh_confs  family: CG
  bcs: {'type': 'close'}
  time  line: [0.0, 0.001 .. 0.998, 0.999]; len = 1000
  time  check: [0.0, 0.1 .. 0.8, 0.9]; len = 10
  rates  a: 1
  rates  b: 0.1
  rates  e: 0.5
  rates  general: 0.01
  rates  P_step: 0.13
  rates  gamma: 4
  light_confs  left: 0.1
  light_confs  right: 0.3
  light_confs  type: sigmoid
  light_confs  smoothing: 100
  initial  N: 0.2
  initial  P: 0.001


In [3]:
# Set up space
# TODO: Change to another study and mesh accurancy
variables_n = 2
variables = ['N', 'P']
element_families = {'N': 'CG', 'P': 'CG'}

# TODO: Change to another geometry and dimension
DOMAIN = mesh.create_interval(
    nx=DATA.mesh_confs.intervals,
    comm=MPI.COMM_WORLD,
    points=[DATA.mesh_confs.left, DATA.mesh_confs.right],
)

element = {
    i:
    ufl.FiniteElement(
        family=element_families[i],
        cell=DOMAIN.ufl_cell(),
        degree=DATA.mesh_confs.degree,
    )
    for i in element_families
}
SPACE = FunctionSpace(
    mesh=DOMAIN,
    element=ufl.MixedElement(*[element[i] for i in element]),
)
SUBSPACE = {
    variables[i]: SPACE.sub(0).collapse()[0]
    for i in range(len(variables))
}


In [4]:
# Set up functions and subfunctions
x, y, z = (list(SpatialCoordinate(SPACE)) + [0] * 2)[:3]

# TODO: Change to another time approximation
FUNC, FUNC0 = Function(SPACE), Function(SPACE)
func_indicator,func_indicator0 = {
    variables[i]: split(FUNC)[0]
    for i in range(len(variables))
}, {
    variables[i]: split(FUNC0)[0]
    for i in range(len(variables))
}
n, p = func_indicator['N'], func_indicator['P']
n0, p0 = func_indicator0['N'], func_indicator0['P']

SUB_FUNC = {variables[i]: FUNC.sub(0) for i in range(len(variables))}
TIME = Constant(SUBSPACE['N'], 0)

In [5]:
# Set up equation
from classes import distributions

## Light
# TODO: Change to another dimension  
LIHGT = distributions.Simple1D(        
    x=x,
    x0=DATA.light_confs.left,
    smoothing=DATA.light_confs.smoothing,
).create(kind=DATA.light_confs.type)
LIHGT *= distributions.Simple1D(
    x=x,
    x0=DATA.light_confs.right,
    smoothing=-DATA.light_confs.smoothing,
).create(kind=DATA.light_confs.type)

## Constants
_r = DATA.rates
gen_rate = _r.general
P_step = _r.P_step
a_nm = gen_rate * _r.a
b_pm = gen_rate * _r.b * exp(-p / P_step)
e_np = gen_rate * _r.e * exp(-p / P_step)

g = _r.gamma
m = 1 - p - n
_under_ln = m / (1-n)
_power = (g-1) / g
reaction = g * m * (-ln(_under_ln))**_power

CONST = {
    'A_NM': a_nm,
    'B_PM': b_pm,
    'E_NP': e_np,
    'REACTION': reaction,
}


In [6]:
# Surface
class Surface_collection:

    def __init__(self, kind, indic, consts, data_mesh):
        assert kind in self.styles(get=True).keys(), 'Not implemented method'
        self.indic = indic
        self.consts = consts
        self.data_mesh = data_mesh
        self.kind = kind

    def create(self, **parametrs):
        return getattr(self, self.kind)(**parametrs)

    @staticmethod
    def styles(func=None, get=False, l={}):
        if not get:
            args = set(inspect.getfullargspec(func).args)
            args.remove('self')
            l.update({func.__name__: ','.join(args)})
            return func
        else:
            return l

    @classmethod
    def get_kinds(cls, view=True):
        kinds = cls.styles(get=True)
        if not view:
            return kinds
        else:
            [
                print(kind, ':', args.replace(',', ', ')) for kind,
                args in kinds.items()
            ]

    @styles
    def close(self, f: fem.Function):
        return 0

    @styles
    def robin(self, f: fem.Function, const, ext):
        return -const * (ext-f)

    @styles
    def fixed_flux(self, f: fem.Function, value):
        return -value

    @styles
    def like_inside(self, f, N_ext, P_ext):
        h = (self.data_mesh.right -
             self.data_mesh.left) / self.data_mesh.intervals
        n, p = self.indic.N, self.indic.P
        if f == n:
            flux_N = 0
            flux_N += -self.consts['A_NM'] * (N_ext-n)
            flux_N += +self.consts['A_NM'] * p * (N_ext-n)
            flux_N += -self.consts['E_NP'] * p * (N_ext-n)
            flux_N += -self.consts['A_NM'] * n * (P_ext-p)
            flux_N += +self.consts['E_NP'] * n * (P_ext-p)
            return flux_N / h
        elif f == p:
            flux_P = 0
            flux_P += -self.consts['B_PM'] * (P_ext-p)
            flux_P += +self.consts['B_PM'] * n * (P_ext-p)
            flux_P += -self.consts['E_NP'] * n * (P_ext-p)
            flux_P += -self.consts['B_PM'] * p * (N_ext-n)
            flux_P += +self.consts['E_NP'] * p * (N_ext-n)
            return flux_P / h




In [8]:

sur_factory = Surface_collection(kind=DATA.bcs['type'],
                                 indic=func_indicator,
                                 consts=CONST,
                                 data_mesh=DATA.mesh_confs)


SURFACE = {
    'N': sur_factory.create(f=func_indicator['N'], '**DATA.bcs.N'),
    'P': sur_factory.create(f=func_indicator['P'], '**DATA.bcs.P'),
}
# SURFACE = {
#     'N': 0,
#     'P': 0,
# }



SyntaxError: positional argument follows keyword argument (3277693295.py, line 8)

In [None]:

def create_bcs():
    return []


BCS = create_bcs()


# Equation
def create_equation():

    def create_facets():
        set_connectivity(DOMAIN)
        ds = Measure("ds", domain=DOMAIN)
        return ds

    def inside_flux(
        a_NM=CONST['A_NM'],
        b_PM=CONST['B_PM'],
        e_NP=CONST['E_NP'],
    ):
        flux_N = 0
        flux_N += -a_NM * grad(n)
        flux_N += +a_NM * p * grad(n)
        flux_N += -e_NP * p * grad(n)
        flux_N += -a_NM * n * grad(p)
        flux_N += +e_NP * n * grad(p)

        flux_P = 0
        flux_P += -b_PM * grad(p)
        flux_P += +b_PM * n * grad(p)
        flux_P += -e_NP * n * grad(p)
        flux_P += -b_PM * p * grad(n)
        flux_P += +e_NP * p * grad(n)
        return flux_N, flux_P

    dt = DATA.time.line[1]-DATA.time.line[0]
    ds = create_facets()

    u, v = TestFunctions(SPACE)
    # n, p = func_indicator[N], func_indicator[P]
    # n0, p0 = func_indicator0[N], func_indicator0[P]
    qN, qP = inside_flux()

    equationN = (1/dt) * (n-n0) * u * dx
    equationN += -(qN|inner|grad(u)) * dx
    equationN += u * SURFACE['N'] * ds

    equationP = (1/dt) * (p-p0) * v * dx
    equationP += -(qP|dot|grad(v)) * dx
    equationP += -LIHGT * CONST['REACTION'] * v * dx
    equationP += v * SURFACE['P'] * ds

    return {'N': equationN, 'P': equationP}


EQUATION = create_equation()
# FIXME: clear reset KSP solver
PROBLEM = []


# Initial
def set_initial(
    data: Param_initial = DATA.initial,
    confs=DATA.solve_confs,
):
    # TODO: reset bcs
    SUB_FUNC['N'].interpolate(Function(SUBSPACE['N'], data.N0))
    SUB_FUNC['P'].interpolate(Function(SUBSPACE['P'], data.P0))

    FUNC.x.scatter_forward()
    FUNC0.interpolate(FUNC)

    TIME.value = 0
    global PROBLEM
    PROBLEM = NonlinearProblem(
        F=sum(EQUATION.values()),
        bcs=BCS,
        u=FUNC,
        solve_options=confs.solve_opts,
        petsc_options=confs.petsc_opts,
        form_compiler_params=confs.form_opts,
        jit_params=confs.jit_opts,
    )


set_initial()


# Functions
def dump(save=False, DATA=DATA):
    consts = CONST.copy()
    DATA.dump.consts = {key: repr_str(value) for key, value in consts.items()}
    consts.update({'LIGHT': LIHGT})
    DATA.dump.equations = {
        key: repr_str(value, consts)
        for key, value in EQUATION.items()
    }

    if not save:
        print(DATA.dump.EQUATION_N)
        print('*' * 80)
        print(DATA.dump.EQUATION_P)
    else:
        with open(
                DATA.save.dir_save + DATA.save.save_name + DATA.save.file_name +
                '_anotaton.txt',
                'w',
        ) as annotation:
            annotation.write(jp.encode(DATA, numeric_keys=True, indent=4))
    pass


def solve(n_steps=DATA.time.n_steps, reset=True, save=False):

    def _set_next():
        # TODO: interpolate bcs
        FUNC0.interpolate(FUNC)
        TIME.value += DATA.time.dt

    def _solve_default(steps):
        for step in steps:
            _set_next()
            steps.set_description(f'Solving PDE. Time:{TIME.value:.2f}')
            PROBLEM.solve()

    def _solve_with_save(steps, data_save: Param_save = DATA.save):

        def _save_functions(file, time):
            file.write_function(SUB_FUNC['N'], time)
            file.write_function(SUB_FUNC['P'], time)
            # flux_N = Function(SUBSPACE[N], SURFACE[N])
            # flux_N.name = 'flux_N'
            # file.write_function(flux_N, time)

            # flux_P = Function(SUBSPACE.P, SURFACE.P)
            # flux_P.name = 'flux_P'
            # file.write_function(flux_P, time)

        clear_savedir(data_save.dir_save + data_save.save_name)
        save_path = data_save.dir_save + data_save.save_name + data_save.file_name
        check_every = int(DATA.time.n_steps / DATA.time.n_checks)

        SUB_FUNC['N'].name = 'neutral'
        SUB_FUNC['P'].name = 'polimer'
        light = Function(SUBSPACE['N'], LIHGT)
        light.name = 'Light'
        with io.XDMFFile(DOMAIN.comm, save_path + '.xdmf', 'w') as file:
            file.write_mesh(DOMAIN)
            for step in steps:
                if step % check_every == 0:
                    steps.set_description(f'Solving PDE. Time:{TIME.value:.2f}')
                    _save_functions(file, TIME.value)
                _set_next()
                PROBLEM.solve()
            _save_functions(file, TIME.value)
        dump(True)

    if reset: set_initial()
    steps_line = tqdm(
        desc=f'Solving PDE. Time:{TIME.value:.3f}',
        iterable=np.arange(0, n_steps, dtype=int),
    )
    if save: _solve_with_save(steps_line, data_save=DATA.save)
    else: _solve_default(steps_line)


def draw(show_fluxes=False, flux_scale=100, show_points=False):
    col_light = Function(SUBSPACE['N'], LIHGT, 'Light')
    col_funcs = {}
    col_fluxes = {}

    for key in SUB_FUNC.keys():
        col_funcs.update({key: Function(SUBSPACE[key], SUB_FUNC[key], key)})
        col_fluxes.update({key:Function(SUBSPACE[key], flux_scale * SURFACE[key], 'flux_' + key)}) #yapf: disable
    col_M = Function(SUBSPACE['N'], 1 - col_funcs['N'] - col_funcs['P'], 'M')

    collection = [col_M, col_light, *col_funcs.values()]
    if show_fluxes: collection.append(*col_fluxes.values())
    func_plot1D(funcs=collection, show_points=show_points)


In [None]:
solve(
    save=False,
        # n_steps=2000,
    reset=True,
)
draw()

In [None]:
check_symmetry(data=DATA,
               add_point=0,
               func=Function(SUBSPACE['N'], SUB_FUNC['N'], 'neutral'))
