In [1]:
# define true function
import numpy as np

def FMO_formular(Cr, T=673.15, t = 600, DOC = 10):
    """
    Cao B, Yang S, Sun A, Dong Z, Zhang TY. 
    Domain knowledge-guided interpretive machine learning: 
    formula discovery for the oxidation behavior of ferritic-martensitic 
    steels in supercritical water. J Mater Inf 2022;2:4. 
    http://dx.doi.org/10.20517/jmi.2022.04
    
    input:
    Cr : oxidation chromium equivalent concentration (wt.%), 10.38 <= Cr <= 30.319
    Cr(wt.%) = [Cr](wt.%) + 40.3[V](wt.%) + 2.3[Si](wt.%) + 10.7[Ni](wt.%) − 1.5[Mn](wt.%)
    T : Absolute temperature (K), 673.15 <= T <= 923.15
    t : Exposure time (h), 30 <= t <= 2000
    DOC : Dissolved oxygen concentration (ppb), 0 <= DOC <= 8000
    
    output:
    the logarithm of weight gain (mg / dm2)
    """
    # Eq.(6c) in paper
    pre_factor = 0.084*(Cr**3/(T-DOC) - np.sqrt(T+DOC)) + 0.98*(Cr-DOC/T) / np.log(Cr+DOC)+8.543
    
    # Eq.(5a) in paper
    Q = 0.084*(Cr**2-Cr+DOC) / np.exp(DOC/T) + 45.09
    
    # Eq.(5b) in paper
    m = 0.323 - 0.061 * np.exp(DOC/T) / (Cr - np.sqrt(Cr) - DOC)
    
    ln_wg = pre_factor + np.log(DOC+2.17) -  Q * 1000 / 8.314 / T + m*np.log(t)
    
    return ln_wg

# generate data

In [2]:
# 20 source domain data

import random

random.seed(42)  

source = []

while len(source) < 20:
    random_number = random.uniform(16, 18)
    if random_number not in source:
        formatted_number = round(random_number,2)
        source.append(formatted_number)
    source.sort()


In [3]:
# 20 target domain data

random.seed(42) 

target = []

while len(target) < 20:
    random_number = random.uniform(14, 16)
    if random_number not in target:
        formatted_number = round(random_number,2)
        target.append(formatted_number)

        
random.shuffle(target)  

target_test = target[:10]

target_train = target[10:]


In [4]:
target_test

[15.3, 14.44, 15.28, 14.17, 15.09, 14.55, 14.06, 15.62, 15.18, 15.78]

# cal labels

In [5]:
# labels of 20 source domain data
Ys = FMO_formular(np.array(source))
Ys 

array([4.28762677, 4.286294  , 4.286294  , 4.28593767, 4.28143797,
       4.26893795, 4.26638147, 4.26638147, 4.26572593, 4.25882298,
       4.23554565, 4.21990128, 4.21207914, 4.20294762, 4.19240738,
       4.19025115, 4.18479209, 4.17130182, 4.1537004 , 4.13406903])

In [6]:
# labels of 10 target domain training data
Yt_train = FMO_formular(np.array(target_train))
# labels of 10 target domain test data
Yt_test =  FMO_formular(np.array(target_test))

In [7]:
Yt_train

array([4.24272342, 3.36021514, 3.19273477, 3.36021514, 4.03106003,
       4.28734686, 4.2064398 , 4.02360696, 3.99139946, 4.28104791])

In [8]:
Yt_test

array([4.27749602, 4.02360696, 4.27590592, 3.68779915, 4.25514913,
       4.09472442, 3.39613599, 4.29146826, 4.2663516 , 4.29211302])

In [9]:
# baseline without transfer
from sklearn.svm import SVR
from sklearn.metrics import r2_score

reg = SVR()
pre_y = reg.fit(np.array(target_train).reshape(-1,1), np.array(Yt_train).reshape(-1,1)).predict(np.array(target_test).reshape(-1,1))

  y = column_or_1d(y, warn=True)


In [10]:
pre_y

array([4.18576581, 3.94809518, 4.1829754 , 3.49824255, 4.15066715,
       4.06404803, 3.3412569 , 4.14761249, 4.16565155, 4.07096998])

In [12]:
# r2 on test data
r2_score(Yt_test,pre_y)

0.8211324687124604

#  transfer with MK-MMD based TCA 

In [91]:
Xs = np.array(source).reshape(-1,1)
Ys = np.array(Ys).reshape(-1,1)
Xt = np.array(target_train).reshape(-1,1)
Yt = np.array(Yt_train).reshape(-1,1)
Tx = np.array(target_test).reshape(-1,1)
Ty = np.array(Yt_test).reshape(-1,1)

Xtrain = np.concatenate((Xs, Xt))
Ytrain = np.concatenate((Ys, Yt))

# TCA
new_Xtrain, new_Tx = MKMMD_TCA(Xtrain, Tx, dim=2)

# reg 
pre = reg.fit(new_Xtrain,Ytrain).predict(new_Tx)

# r2 score
r2_score(Ty,pre)

h matrix is calculated
matrix Q : [[0.17911529 0.23672828 0.28177309 0.23437824 0.08256993]
 [0.23672828 0.3182647  0.39136413 0.34225986 0.12444452]
 [0.28177309 0.39136413 0.51298548 0.4997374  0.19647392]
 [0.23437824 0.34225986 0.4997374  0.61544772 0.30917875]
 [0.08256993 0.12444452 0.19647392 0.30917875 0.34279387]]
MMD of each kernel :  [0.35359279800077625, 0.44236970571682915, 0.48815609731407994, 0.4420962848559135, 0.2921090477979017]
η_k is calculated
the optimal weights are found
weights of kernels : [[8.15537436e-01]
 [1.65467106e-06]
 [3.05824108e-07]
 [2.69861795e-07]
 [1.84460333e-01]]


  y = column_or_1d(y, warn=True)


0.9330316382066635

---

In [88]:
import numpy as np
import scipy.linalg
from sklearn.gaussian_process.kernels import  RBF 

"""
when trad-off parameter = 0, we can use the weighted summation of each kernel to estimate the MK-MMD based TCA
"""
def MKMMD_TCA(TCA_Xs, TCA_Xt,dim = 2):
    ns, nt = len(TCA_Xs), len(TCA_Xt)
    if dim > (ns + nt):
        raise DimensionError('The maximum number of dimensions should be smaller than', (ns + nt))
    else:pass
    
    # call MK-MMD
    _, weights,kernel_list = MKMMD().predict(TCA_Xs, TCA_Xt)
    print('weights of kernels :', weights)
    Xs_new_list = []
    Xt_new_list = []
    for k in range(len(kernel_list)):
        kernel = kernel_list[k]
        # formular in paper Domain Adaptation via Transfer Component Analysis
        # Eq.(2) 
        X = np.vstack((TCA_Xs, TCA_Xt))
        K = kernel(X)
        # cal matrix L 
        e = np.vstack((1 / ns * np.ones((ns, 1)), -1 / nt * np.ones((nt, 1))))
        L = e * e.T
        # cal centering matrix H page 202 the last pargraph at left side
        n, _ = X.shape
        H = np.eye(n) - 1 / n * np.ones((n, n))
        # page 202 the last pargraph at right side
        matrix = (K @ L @ K + 0.1 * np.eye(n)) @ K @ H @ K.T
        # cal eigenvalues : w, eigenvectors :V
        w, V = scipy.linalg.eig(matrix)
        w, V = w.real, V.real
        # peak out the first self.dim components
        ind = np.argsort(abs(w))[::-1]
        A = V[:, ind[:dim]]
        # output the mapped data
        Z = K @ A
        Xs_new, Xt_new = Z[:ns, :], Z[ns:, :]
        Xs_new_list.append(Xs_new)
        Xt_new_list.append(Xt_new)
    # calculate the weighted summation
    res_Xs_new = Xs_new_list[0] * weights[0]
    res_Xt_new = Xt_new_list[0] * weights[0]
    
    for num in range(len(weights)-1):
        index = num+1
        res_Xs_new += Xs_new_list[index]* weights[index]
        res_Xt_new += Xt_new_list[index]* weights[index]
    
    
    return res_Xs_new, res_Xt_new


class DimensionError(Exception):
    pass

In [90]:
"""
MKMMD implement
"""


# multi-kernel maximum mean discrepancy
# cao bin, HKUST, China, binjacobcao@gmail.com
# free to charge for academic communication

import numpy as np
from cvxopt import solvers, matrix 
from sklearn.gaussian_process.kernels import  RBF 

class MKMMD():
    def __init__(self, gamma_list=[2,1.5,1,0.5,0.1], kernel_num = 5):
        '''
        Our code is designed for educational purposes, 
        and to make it easier to understand, 
        we have implemented only the RBF (Radial Basis Function) kernel.
        
        This case focuses on solving the weights of kernels. 
        The estimation of length scales is crucial in kernel-based models,
        For further details on the method (length scales), please visit the following link: 
        [https://github.com/MaterialsInformaticsDemo/DAN/blob/main/code/MK_MMD.py].

        :param gamma_list: list of length scales for rbf kernels
        :param kernel_num: number of kernels in MK_MMD
        '''
        if len(gamma_list) != kernel_num: 
            print('please assign specific length scales for each rbf kernel')
        self.kernel_num = kernel_num
        kernel_list = []
        for i in range(kernel_num):
            kernel_list.append(RBF(gamma_list[i],"fixed"))
        self.kernel_list = kernel_list

    def predict(self, Xs, Xt,) :
        '''
        :param Xs: ns * m_feature, source domain data 
        :param Xt: nt * m_feature, target domain data

        return :
        the result of MK_MMD & weights of kernels 
        '''
        # cal weights for each rbf kernel
        # two rows above section 2.2 Empirical estimate of the MMD, asymptotic distribution, and test
        h_matrix = [] # 5 * 5 
        for i in range(self.kernel_num):
            _, h_k_vector = funs(Xs, Xt, self.kernel_list[i], MMD = False, h_k_vector = True)
            h_matrix.append(h_k_vector)
        h_matrix = np.vstack(h_matrix)
        print('h matrix is calculated')

        # cal the covariance matrix of h_matrix
        # Eq.(7)
        Q_k = np.cov(h_matrix)
        print('matrix Q :',Q_k)
        # cal the weights of kernels, Eq.(11)
        # vector η_k, Eq.(2)
        η_k = []
        for k in range(self.kernel_num):
            MMD, _ = funs(Xs, Xt, self.kernel_list[k], MMD = True, h_k_vector = False)
            η_k.append(MMD)
        print('MMD of each kernel : ',η_k)
        print('η_k is calculated')

        # solve the standard quadratic programming problem 
        # see : https://github.com/Bin-Cao/KMMTransferRegressor/blob/main/KMMTR/KMM.py
        P = 2 * matrix(Q_k + 1e-5 * np.eye(self.kernel_num)) # λm = 1e-5 
        # q = - η_k ， maximum η_k * beta in QB
        q = matrix(-np.array(η_k).reshape(-1,1))
        G = matrix(-np.eye(self.kernel_num))
        # the summation of the beta is 1, Eq.(3), let's D = 1
        A = matrix(np.ones((1,self.kernel_num)))
        b=matrix(1.)
        h=matrix(np.zeros((self.kernel_num,1)))
        # P is 5 * 5
        # q is 5 * 1
        # G is 5 * 5
        # A is 1 * 5
        # b = 1, h = 5*1
        solvers.options['show_progress'] = False
        sol = solvers.qp(P,q,G,h,A,b)
        beta = sol['x']
        print('the optimal weights are found')
        MK_MMD = np.array(η_k) @ np.array(beta)
        return MK_MMD, np.array(beta),self.kernel_list
        
def funs(Xs, Xt, kernel, MMD = True, h_k_vector = False):
    if MMD == True:
        # cal MMD for one rbf kernel
        # Eq.(1) in paper
        dim = np.array(Xs).ndim
        Xs = np.array(Xs).reshape(-1,dim)
        Xt = np.array(Xt).reshape(-1,dim)
        EXX_= kernel(Xs,Xs)
        EYY_= kernel(Xt,Xt)
        EYX_= kernel(Xt,Xs)
        EXY_= kernel(Xs,Xt)
        MMD = np.array(EXX_).mean() + np.array(EYY_).mean() - np.array(EYX_).mean() - np.array(EXY_).mean()
    else: 
        MMD = None
        pass

    if h_k_vector == True:
        # cal vector h_k(x,x',y,y'), contains m**2*n**2 terms
        # between Eq.(1) and Eq.(2)
        # k(x, x') is the element of matrix EXX_
        # k(y, y') is the element of matrix EYY_
        # k(x, y') and k(x', y) are the element of matrix EXY_
        ns, nt = len(Xs), len(Xt)
        combin_ns = generate_combinations(ns)
        combin_nt = generate_combinations(nt)
        h_k_vector = []
        for x in range(len(combin_ns)):
            for y in range(len(combin_nt)):
                S_x = np.array(Xs[combin_ns[x][0]]).reshape(-1,1) # x
                S_x_ =  np.array(Xs[combin_ns[x][1]]).reshape(-1,1) # x'
                T_x =  np.array(Xt[combin_nt[y][0]]).reshape(-1,1) # y
                T_x_ =  np.array(Xt[combin_nt[y][1]]).reshape(-1,1) # y'
                h_k = kernel(S_x,S_x_) + kernel(T_x,T_x_) - kernel(S_x,T_x_) - kernel(S_x_,T_x)
                h_k_vector.append(h_k[0][0])
        h_k_vector = np.array(h_k_vector)
    else: 
        h_k_vector = None
        pass
    return MMD, h_k_vector

def generate_combinations(n):
    # Cn^2
    combinations = []
    for i in range(n):
        for j in range(i, n):
            combinations.append((i, j))
    return combinations
