# Fault Diagnosis Toolbox in Python -- a development notebook

Erik Frisk<br>
<frisk@isy.liu.se><br>
Department of Electrical Engineering<br>
Linköping University<br>
Sweden

## Basic Python imports

In [4]:
%matplotlib notebook
import time
import matplotlib
import matplotlib.pyplot as plt
import sys
import numpy as np
import sympy as sym
import scipy.sparse as sp
from sympy.printing import ccode, octave_code
new_paths = ['Models/', 'Misc/', 'faultdiagnosistoolbox/build/lib.macosx-10.6-x86_64-2.7/']
[sys.path.append(d) for d in new_paths if not d in sys.path];
from misc import *
import faultdiagnosistoolbox as fdt

## Define a diagnosis models -- structural Three Tank model

In [5]:
#from ThreeTank_model import model
from ThreeTank_model_sym import model

In [6]:
print "Model: %s" % model.name
print "  (ne, nx, nf, nz) = (%d,%d,%d,%d)" % (model.ne(), model.nx(), model.nf(), model.nz())
print "  sprank(X) = %d" % model.sprank()
if model.IsDynamic():
    print "  Dynamic model"
else:
        print "  Static model"

Model: Three Tank System
  (ne, nx, nf, nz) = (12,10,6,3)
  sprank(X) = 10
  Dynamic model


In [7]:
print "Searching for MSO sets..."
msos=model.MSO()
print "Found %d mso sets" % len(msos)
print msos

Searching for MSO sets...
Found 6 mso sets
[array([ 7,  6,  4, 10,  3,  8,  9,  1,  2,  5, 11], dtype=int32)
 array([ 7,  6,  4, 10,  3,  8,  9,  0], dtype=int32)
 array([ 7,  6,  4, 10,  1,  2,  5, 11,  0], dtype=int32)
 array([ 7,  6,  3,  8,  9,  1,  2,  5, 11,  0], dtype=int32)
 array([ 7,  4, 10,  3,  8,  9,  1,  2,  5, 11,  0], dtype=int32)
 array([ 6,  4, 10,  3,  8,  9,  1,  2,  5, 11,  0], dtype=int32)]


In [208]:
m=2
mso = msos[m]
rIdx = model.MSOCausalitySweep(mso,causality='int') # Boolean index array to integral causality redundant equation
if np.any(rIdx):
    red = mso[rIdx][0] # Take the first one
    m0 = [e for e in mso if e!=red]
    Gamma = model.Matching(m0)
    print "Found integral causality matching (mso " + str(m) + ", red = " + str(red) + ")"
else:
    print "No integral causality matching exists"

Found integral causality matching (mso 2, red = 7)


In [287]:
def IsDifferentialConstraint( rel ):
    return isinstance(rel,list) and len(rel)==3 and rel[2] is "diff";

def UsedVars(rels, variables):
    uv = set([])
    for expr in rels:
        if not IsDifferentialConstraint(expr):
            uv |= set([str(v) for v in expr.atoms() if str(v) in variables])
    return list(uv)

def ExprToCode( expr, language):
    if language is 'C':
        genCode = ccode(expr)
    elif language is 'Matlab':
        genCode = octave_code(expr)
    elif language is 'Python':
        genCode = str(expr)
    else:
        print "Unknown language"
    return genCode

def CodeEOL(language):
    if language is 'C' or language is 'Matlab':
        return ';'
    elif language is 'Python':
        return ''
    return ''

def CodeComment(language):
    if language is 'C':
        return '//'
    elif language is 'Matlab':
        return '%'
    elif language is 'Python':
        return '#'
    return '#'

def DVar(diffConstraint):
    return diffConstraint[0]
def IVar(diffConstraint):
    return diffConstraint[1]

def SeqResGenCausality( Gamma, e, diffres ):
    if not IsDifferentialConstraint(e):
        return Gamma.matchType
    elif diffres is 'int':
        # Treat differential residual constraint in integral causality
        if Gamma.matchType is 'der':
            return 'mixed'
        elif Gamma.matchType is 'int' or Gamma.matchType is 'mixed':
            return Gamma.type
        elif Gamma.matchType is 'algebraic':
            return 'int'
    else:
        # Treat differential residual constraint in derivative causality
        if Gamma.matchType is 'int':
            return 'mixed'
        elif Gamma.matchType is 'der' or Gamma.matchType is 'mixed':
            return Gamma.matchType
        elif Gamma.matchType is 'algebraic':
            return 'der'

In [406]:
def AlgebraicHallComponent(model, g, language):
    fzSub = zip(model.f, np.zeros(len(model.f),dtype=np.int64))
    eqs = map(lambda eq: eq.subs(fzSub), model.syme[g.row])    
    hcVars = list(np.array(model.x)[g.col])
    sol = sym.solve(eqs,hcVars)
    resGen = []
    for varSol in sol:
        if len(hcVars)==1:
            genCode = "%s = %s%s %s %s" %(varSol,ExprToCode(sol[varSol], language), CodeEOL(language),
                                         CodeComment(language),model.e[g.row[0]])
        else:
            genCode = "%s = %s%s" %(varSol,ExprToCode(sol[varSol], language), CodeEOL(language))
        resGen.append(genCode)
    return resGen

def CodeApproxInt(v,dv,enum,language):
    if language is 'C' or language is 'Matlab':
         return 'ApproxInt(%s,state.%s,Ts)%s %s %s' % (dv,v,CodeEOL(language),CodeComment(language),enum)
    else:
        return "ApproxInt(%s,state['%s'],Ts)%s %s %s" % (dv,v,CodeEOL(language),CodeComment(language),enum)    
    
def IntegralHallComponent(model, g, language):
    fzSub = zip(model.f, np.zeros(len(model.f),dtype=np.int64))
    resGen = []
    integ = []
    iState = []
    for e,v,enum in zip(model.syme[g.row],np.array(model.x)[g.col],np.array(model.e)[g.row]):
        if not IsDifferentialConstraint(e):
            sol=sym.solve(e.subs(fzSub),v)
            genCode = "%s = %s%s %s %s" %(v,ExprToCode(sol[0], language), CodeEOL(language),CodeComment(language),enum)
            resGen.append(genCode)
        else:
            dv = DVar(e)
            iv = IVar(e)
            genCode = v + " = " + CodeApproxInt(v,dv,enum,language)
            integ.append(genCode)
            iState.append(v)
    return (resGen,integ,iState)

def MixedHallComponent(model, g, language):
    fzSub = zip(model.f, np.zeros(len(model.f),dtype=np.int64))
    resGen = []
    iState = []
    dState = []
    integ = []
    for e,v,enum in zip(model.syme[g.row],np.array(model.x)[g.col],np.array(model.e)[g.row]):
        if not IsDifferentialConstraint(e):
            sol=sym.solve(e.subs(fzSub),v)
            genCode = "%s = %s%s %s %s" %(v,ExprToCode(sol[0], language), CodeEOL(language),CodeComment(language),enum)
            resGen.append(genCode)
        elif v is IVar(e):
            dv = DVar(e)
            iv = IVar(e)
            genCode = v + " = " + CodeApproxInt(v,dv,enum,language)
            integ.append(genCode)
            iState.append(v)
        else: # v is DVar(e)
            genCode = '%s = ApproxDiff(%s,state.%s,Ts)%s  %s %s' % (DVar(e),IVar(e),IVar(e),CodeEOL(language),
                                                                    CodeComment(language),enum)
            resGenM0.append(genCode)
            dState.append(Ivar(e))

    return (resGen,integ,iState,dState)

In [410]:
def GenerateResidualEquations( model, resEq, diffres, language):
    if language is 'C':
        resvar = 'r[0]'
    else:
        resvar = 'r'
    e = model.syme[resEq]
    if not IsDifferentialConstraint(e):
        resExpr = e.lhs - e.rhs
        genCode = ["%s = %s%s %s %s" % (resvar,ExprToCode(resExpr,language), CodeEOL(language), 
                                       CodeComment(language), np.array(model.e)[resEq])]
        iState = []
        dState = []
        integ = []
    else: # IsDifferentialConstraint(e)
        if diffres is 'der':
            iv = IVar(e)
            dv = DVar(e)
            genCode = ['%s = %s-ApproxDiff(%s, state.%s,Ts)%s %s %s' % (resvar,dv,iv,iv,CodeEOL(language),
                                                                       CodeComment(language), np.array(model.e)[resEq])]
            iState = []
            dState = [dv]
            integ = []
        else: # diffres is 'int'
            iv = IVar(e)
            dv = DVar(e)
            if language is 'Python':
                genCode = ["%s = %s-state['%s']%s %s %s" % (resvar,v,v,CodeEOL(language),
                                                         CodeComment(language), np.array(model.e)[resEq])]
            else:
                genCode = ['%s = %s-state.%s%s %s %s' % (resvar,v,v,CodeEOL(language),
                                                         CodeComment(language), np.array(model.e)[resEq])]
            iState = [iv];
            dState = []
            integ = [iv + " = " + CodeApproxInt(v,dv,np.array(model.e)[resEq],language)]
    return (np.array(genCode), iState, dState, integ)

def GenerateExactlyDetermined( model, Gamma, language):
    resGenM0 = np.array([])
    integ = np.array([])
    iState = np.array([])
    dState = np.array([])
    
    for g in Gamma.matching:
        sys.stdout.write('.')
        sys.stdout.flush()
        if g.matchType is 'algebraic':
            codeGen = AlgebraicHallComponent(model, g, language)
            resGenM0 = np.concatenate((resGenM0, codeGen))
        elif g.matchType is 'int':
            codeGen,gInteg,giState = IntegralHallComponent(model, g, language)
            iState = np.concatenate((iState, giState))
            resGenM0 = np.concatenate((resGenM0, codeGen))
            integ = np.concatenate((integ, gInteg))
        elif g.matchType is 'der':
            dc = model.syme[g.row[0]];
            codeGen = '%s = ApproxDiff(%s,state.%s,Ts)%s  %s %s' % (DVar(dc),IVar(dc),IVar(dc),CodeEOL(language),
                                                                    CodeComment(language),model.e[g.row[0]])
            resGenM0 = np.concatenate((resGenM0, [codeGen]))
            dState = np.concatenate((dState, [IVar(e)]))
        elif g.matchType is 'mixed':
            codeGen,gInteg,giState,gdState = IntegralHallComponent(model, g, language)
            resGenM0 = np.concatenate((resGenM0, codeGen))    
            iState = np.concatenate((iState, giState))
            dState = np.concatenate((dState, gdState))
            integ = np.concatenate((integ, gInteg))
    sys.stdout.write('\n')
    return (resGenM0, iState, dState, integ)

language = 'Python'
print GenerateExactlyDetermined(model, Gamma, language)
print GenerateResidualEquations(model, red, 'int', language)

..
(array(['p1 = y1 # e7', 'q3 = p3/Rv3 # e3', 'q2 = (p2 - p3)/Rv2 # e2',
       'dp3 = (q2 - q3)/CT3 # e6', 'q1 = (p1 - p2)/Rv1 # e1',
       'dp2 = (q1 - q2)/CT2 # e5'], 
      dtype='|S32'), array(['p3', 'p2'], 
      dtype='|S32'), array([], dtype=float64), array(["p3 = ApproxInt(dp3,state['p3'],Ts) # e12",
       "p2 = ApproxInt(dp2,state['p2'],Ts) # e11"], 
      dtype='|S40'))
(array(['r = q2 - y2 # e8'], 
      dtype='|S16'), [], [], [])


In [400]:
def WriteApproxIntFunction(f,language):
    if language is 'C':
        pass
    elif language is 'Python':
        f.write('    def ApproxInt(dx,x0,Ts):\n')
        f.write('        return x0 + Ts*dx\n')
    elif language is 'Matlab':
        pass
    
    
def WriteResGenPython( model, resGen, state, integ, name, batch, resGenCausality, resGenEqs ):
    f=open(name + ".py", 'w')
    if not batch:
        f.write('def ' + name + "(z,state,params,Ts):\n")
        f.write("# " + name.upper() + " Sequential residual generator for model '" + model.name + "'\n")
        f.write('# Causality: ' + resGenCausality + '\n')
        f.write('#\n')
        
        fSens = np.array(model.f)[np.any(np.array(model.F[q,:].todense()),axis=0)]
        f.write('# Structurally sensitive to faults: ')
        for fault in fSens[:-1]:
            f.write(fault + ', ')
        f.write(fSens[-1] + '\n')
        f.write('#\n')
        f.write('# Example of basic usage:\n')
        f.write('#   Let z be the observations, then the residual generator can be simulated by:\n')
        f.write('#\n')
        f.write('#   for zk in z:\n')
        f.write('#     rk, state = ResGen_3_8( zk, state, params, 1/fs )\n')
        f.write('#\n')
        f.write('#   where state is a structure with the state of the residual generator.\n')
        f.write('#   The state has fields: ')
        for s in state[:-1]:
            f.write(s + ', ')
        f.write(state[-1] + '\n')

        f.write('\n# File generated ' + time.strftime('%c') + '\n')
        f.write('\n')

        if len(state)>0:
            WriteApproxIntFunction(f,'Python')
        f.write('\n')
        
        # Parameters
        usedParams = UsedVars(model.syme[resGenEqs], model.parameters)
        if len(usedParams)>0:
            f.write('    # Parameters\n')
            for p in usedParams:
                f.write('    ' + p + " = params['" + p + "']\n")
            f.write('\n')
            
        # Known variables
        usedKnownSignals = UsedVars(model.syme[resGenEqs], model.z)
        f.write('    # Known signals\n')
        for zIdx,zv in enumerate(usedKnownSignals):
            f.write('    ' + zv + ' = z[' + str(zIdx) + ']\n')
        f.write('\n') 
            
        # Initialize state variables
        if len(state)>0:
            f.write('    # Initialize state variables\n')
            for sv in state:
                f.write('    ' + sv + " = state['" + sv + "']\n")
            f.write('\n')            
            
        # Residual generator body
        f.write('    # Residual generator body\n')
        for pyExpr in resGen:
            f.write('    ' + pyExpr + '\n')
        f.write('\n')
        
        # Update integrator variables
        if len(integ)>0:
            f.write('    # Update integrator variables\n')
            for iExpr in integ:
                f.write('    ' + iExpr + '\n')
            f.write('\n')            

        # Update state variables
        if len(state)>0:
            f.write('    # Update state variables\n')
            for sv in state:
                f.write("    state['" + sv + "'] = " + sv + '\n')
            f.write('\n')

        # Return value
        f.write('    return r\n')
    else: # batch
        pass
    f.close()
    
SeqResGen(model, Gamma, red, 'ResGen_3_8', language='Python')

Generating residual generator ResGen_3_8 (Python)
  Generating code for the exactly determined part: ..
  Generating code for the residual equations
  Writing residual generator file
File ResGen_3_8.py generated.


In [390]:
def SeqResGen(model, Gamma, resEq, name, diffres='int', language='Python', batch=False, external=[]):
    if not model.type is 'Symbolic':
        print "Code generation only possible for symbolic models"
        return []
    
    print "Generating residual generator " + name + " (" + language + ")"
    sys.stdout.write("  Generating code for the exactly determined part: ")
    m0Code, m0iState, m0dState, m0integ = GenerateExactlyDetermined( model, Gamma, language)

    print "  Generating code for the residual equations"
    resCode, resiState, resdState, resinteg = GenerateResidualEquations( model, resEq, diffres, language)

    # Collect code, integrators, and states
    resGenCode = np.concatenate((m0Code,[' '],resCode))
    resGeniState = np.concatenate((m0iState,resiState))
    resGendState = np.concatenate((m0dState,resdState))
    resGenState  = np.concatenate((resGeniState,resGendState))
    resGenInteg = np.concatenate((m0integ,resinteg))
    
    print "  Writing residual generator file"
    resGenCausality = SeqResGenCausality( Gamma, model.syme[resEq], diffres )
    resGenEqs = np.array([],dtype=np.int64)
    for g in Gamma.matching:
        resGenEqs = np.concatenate((resGenEqs,g.row))    
    
    if language is 'Python':
        WriteResGenPython( model, resGenCode, resGenState, resGenInteg, name, batch, resGenCausality, resGenEqs )
        print 'File ' + name + '.py generated.'
    elif language is 'Matlab':
        pass
    elif language is 'C':
        pass

#SeqResGen(model, Gamma, red, 'ResGen_3_8', language='Python')

In [392]:
class FDTStruct:
    pass

In [393]:
params = FDTStruct()
params.Rv1 = 1
params.Rv2 = 2
params.Rv3 = 3
params.CT2 = 4
params.CT3 = 5

In [397]:
params = {}
params['Rv1'] = 1
params['Rv2'] = 2
params['Rv3'] = 3
params['CT2'] = 4
params['CT3'] = 5

In [398]:
params

{'CT2': 4, 'CT3': 5, 'Rv1': 1, 'Rv2': 2, 'Rv3': 3}