In [1]:
# import necessary packages
from pygmodels.pgm.pgmodel.lwfchain import LWFChainGraph
from pygmodels.pgm.pgmodel.bayesian import BayesianNetwork
from pygmodels.graph.graphtype.edge import Edge, EdgeType
from pygmodels.factor.factor import Factor
from pygmodels.factor.factorfunc.factorops import FactorOps
# from pygmodels.pgm.pgmtype.randomvariable import NumCatRVariable
from pygmodels.randvar.randvarmodel.categorical import NumCatRandomVariable as NumCatRVariable

In [2]:
# define data and nodes
idata = {"outcome-values": [True, False]}
Smoking = NumCatRVariable(   
    #node_id="Smoking", 
    randvar_id="Smoking",
    input_data=idata,
    marginal_distribution=lambda x: 0.5
)
Bronchitis = NumCatRVariable(
       
    # node_id="Bronchitis",
    randvar_id="Bronchitis",
    input_data=idata, marginal_distribution=lambda x: 0.5
)
LungCancer = NumCatRVariable(
       randvar_id="LungCancer", input_data=idata, marginal_distribution=lambda x: 0.5
)
EitherTL = NumCatRVariable(
       randvar_id="EitherTL", input_data=idata, marginal_distribution=lambda x: 0.5
)
VisitAsia = NumCatRVariable(
       randvar_id="VisitAsia", input_data=idata, marginal_distribution=lambda x: 0.5
)
Tuberculosis = NumCatRVariable(
       randvar_id="Tuberculosis", input_data=idata, marginal_distribution=lambda x: 0.5
)
Xray = NumCatRVariable(
       randvar_id="Xray", input_data=idata, marginal_distribution=lambda x: 0.5
)
Cough = NumCatRVariable(
       randvar_id="Cough", input_data=idata, marginal_distribution=lambda x: 0.5
)
Dysponea = NumCatRVariable(
       randvar_id="Dysponea", input_data=idata, marginal_distribution=lambda x: 0.5
)

In [3]:
# define edges
#
#  Cowell 2005, p. 110
#
# Smoking                                      VisitAsia
#   |                                              |
#   +--------------+                        Tuberculosis <---+
#                  |                                         |
#   Bronchitis <---+---> LungCancer ----> EitherTL <---------+
#   |                                          |
#   +---> Cough <------------------------------+----> Xray
#   |       |
#   +-----> Dysponea
#
SmokingBronchitis_c = Edge(
  edge_id="SmokingBronchitis",
  start_node=Smoking,
  end_node=Bronchitis,
  edge_type=EdgeType.DIRECTED,
)
SmokingLungCancer_c = Edge(
  edge_id="SmokingLungCancer",
  start_node=Smoking,
  end_node=LungCancer,
  edge_type=EdgeType.DIRECTED,
)
LungCancerEitherTL_c = Edge(
  edge_id="LungCancerEitherTL",
  start_node=LungCancer,
  end_node=EitherTL,
  edge_type=EdgeType.DIRECTED,
)
VisitAsiaTuberculosis_c = Edge(
  edge_id="VisitAsiaF",
  start_node=VisitAsia,
  end_node=Tuberculosis,
  edge_type=EdgeType.DIRECTED,
)
TuberculosisEitherTL_c = Edge(
  edge_id="TuberculosisEitherTL",
  start_node=Tuberculosis,
  end_node=EitherTL,
  edge_type=EdgeType.DIRECTED,
)
EitherTLXray_c = Edge(
  edge_id="EitherTLXray",
  start_node=EitherTL,
  end_node=Xray,
  edge_type=EdgeType.DIRECTED,
)
EitherTLCough_c = Edge(
  edge_id="EitherTLCough",
  start_node=EitherTL,
  end_node=Cough,
  edge_type=EdgeType.DIRECTED,
)
BronchitisCough_c = Edge(
  edge_id="BronchitisCough",
  start_node=Bronchitis,
  end_node=Cough,
  edge_type=EdgeType.DIRECTED,
)
BronchitisDysponea_c = Edge(
  edge_id="BronchitisI",
  start_node=Bronchitis,
  end_node=Dysponea,
  edge_type=EdgeType.DIRECTED,
)
CoughDysponea_c = Edge(
  edge_id="CoughI",
  start_node=Cough,
  end_node=Dysponea,
  edge_type=EdgeType.UNDIRECTED,
)

In [4]:
# define factor functions

def phi_VisitAsia(scope_product):
    "Visit to Asia factor p(a)"
    ss = set(scope_product)
    if ss == set([("VisitAsia", True)]):
        return 0.01
    elif ss == set([("VisitAsia", False)]):
        return 0.99
    else:
        raise ValueError("Unknown scope product")

def phi_TuberculosisVisitAsia(scope_product):
    "Tuberculosis | Visit to Asia factor p(t,a)"
    ss = set(scope_product)
    if ss == set([("Tuberculosis", True), ("VisitAsia", True)]):
        return 0.05
    elif ss == set([("Tuberculosis", False), ("VisitAsia", True)]):
        return 0.95
    elif ss == set([("Tuberculosis", True), ("VisitAsia", False)]):
        return 0.01
    elif ss == set([("Tuberculosis", False), ("VisitAsia", False)]):
        return 0.99
    else:
        raise ValueError("Unknown scope product")


def phi_EitherTLXray(scope_product):
    "either tuberculosis or lung cancer | x ray p(e,x)"
    ss = set(scope_product)
    if ss == set([("EitherTL", True), ("Xray", True)]):
        return 0.98
    elif ss == set([("EitherTL", False), ("Xray", True)]):
        return 0.05
    elif ss == set([("EitherTL", True), ("Xray", False)]):
        return 0.02
    elif ss == set([("EitherTL", False), ("Xray", False)]):
        return 0.95
    else:
        raise ValueError("Unknown scope product")

def phi_Smoking(scope_product):
    "smoke factor p(s)"
    ss = set(scope_product)
    if ss == set([("Smoking", True)]):
        return 0.5
    elif ss == set([("Smoking", False)]):
        return 0.5
    else:
        raise ValueError("Unknown scope product")


def phi_SmokingBronchitis(scope_product):
    "smoke given bronchitis p(s,b)"
    ss = set(scope_product)
    if ss == set([("Smoking", True), ("Bronchitis", True)]):
        return 0.6
    elif ss == set([("Smoking", False), ("Bronchitis", True)]):
        return 0.3
    elif ss == set([("Smoking", True), ("Bronchitis", False)]):
        return 0.4
    elif ss == set([("Smoking", False), ("Bronchitis", False)]):
        return 0.7
    else:
        raise ValueError("Unknown scope product")


def phi_SmokingLungCancer(scope_product):
    "lung cancer given smoke p(s,l)"
    ss = set(scope_product)
    if ss == set([("Smoking", True), ("LungCancer", True)]):
        return 0.1
    elif ss == set([("Smoking", False), ("LungCancer", True)]):
        return 0.01
    elif ss == set([("Smoking", True), ("LungCancer", False)]):
        return 0.9
    elif ss == set([("Smoking", False), ("LungCancer", False)]):
        return 0.99
    else:
        raise ValueError("Unknown scope product")


def phi_LungCancerEitherTLTuberculosis(scope_product):
    "either tuberculosis or lung given lung cancer and tuberculosis p(e, l, t)"
    ss = set(scope_product)
    if ss == set([("LungCancer", True), ("EitherTL", True), ("Tuberculosis", True)]):
        return 1
    elif ss == set([("LungCancer", True), ("EitherTL", False), ("Tuberculosis", True)]):
        return 0
    elif ss == set([("LungCancer", False), ("EitherTL", True), ("Tuberculosis", True)]):
        return 1
    elif ss == set([("LungCancer", False), ("EitherTL", False), ("Tuberculosis", True)]):
        return 0
    elif ss == set([("LungCancer", True), ("EitherTL", True), ("Tuberculosis", False)]):
        return 1
    elif ss == set([("LungCancer", True), ("EitherTL", False), ("Tuberculosis", False)]):
        return 0
    elif ss == set([("LungCancer", False), ("EitherTL", True), ("Tuberculosis", False)]):
        return 0
    elif ss == set([("LungCancer", False), ("EitherTL", False), ("Tuberculosis", False)]):
        return 1
    else:
        raise ValueError("Unknown scope product")


def phi_DysponeaCoughBronchitis(scope_product):
    "cough, dyspnoea, bronchitis I, H, B p(c,d,b)"
    ss = set(scope_product)
    if ss == set([("Cough", True), ("Dysponea", True), ("Bronchitis", True)]):
        return 16
    elif ss == set([("Cough", True), ("Dysponea", False), ("Bronchitis", True)]):
        return 1
    elif ss == set([("Cough", False), ("Dysponea", True), ("Bronchitis", True)]):
        return 4
    elif ss == set([("Cough", False), ("Dysponea", False), ("Bronchitis", True)]):
        return 1
    elif ss == set([("Cough", True), ("Dysponea", True), ("Bronchitis", False)]):
        return 2
    elif ss == set([("Cough", True), ("Dysponea", False), ("Bronchitis", False)]):
        return 1
    elif ss == set([("Cough", False), ("Dysponea", True), ("Bronchitis", False)]):
        return 1
    elif ss == set([("Cough", False), ("Dysponea", False), ("Bronchitis", False)]):
        return 1
    else:
        raise ValueError("Unknown scope product")


def phi_CoughBronchitisEitherTL(scope_product):
    "cough, either tuberculosis or lung cancer, bronchitis D, H, B p(c,b,e)"
    ss = set(scope_product)
    if ss == set([("Cough", True), ("EitherTL", True), ("Bronchitis", True)]):
        return 5
    elif ss == set([("Cough", True), ("EitherTL", False), ("Bronchitis", True)]):
        return 2
    elif ss == set([("Cough", False), ("EitherTL", True), ("Bronchitis", True)]):
        return 1
    elif ss == set([("Cough", False), ("EitherTL", False), ("Bronchitis", True)]):
        return 1
    elif ss == set([("Cough", True), ("EitherTL", True), ("Bronchitis", False)]):
        return 3
    elif ss == set([("Cough", True), ("EitherTL", False), ("Bronchitis", False)]):
        return 1
    elif ss == set([("Cough", False), ("EitherTL", True), ("Bronchitis", False)]):
        return 1
    elif ss == set([("Cough", False), ("EitherTL", False), ("Bronchitis", False)]):
        return 1
    else:
        raise ValueError("Unknown scope product")


def phi_BronchitisEitherTL(scope_product):
    "bronchitis, either tuberculosis or lung cancer B, D p(b,e)"
    ss = set(scope_product)
    if ss == set([("Bronchitis", True), ("EitherTL", True)]):
        return 1 / 90
    elif ss == set([("Bronchitis", False), ("EitherTL", True)]):
        return 1 / 11
    elif ss == set([("Bronchitis", True), ("EitherTL", False)]):
        return 1 / 39
    elif ss == set([("Bronchitis", False), ("EitherTL", False)]):
        return 1 / 5
    else:
        raise ValueError("Unknown scope product")



In [5]:
# instantiate factors with factor functions and implied random variables in scope

VisitAsia_cf = Factor(gid="VisitAsia_cf", scope_vars=set([VisitAsia]), factor_fn=phi_VisitAsia)
VisitAsiaTuberculosis_cf = Factor(
    gid="VisitAsiaTuberculosis_cf", scope_vars=set([VisitAsia, Tuberculosis]), 
    factor_fn=phi_TuberculosisVisitAsia
)
EitherTLXray_cf = Factor(
    gid="EitherTLXray_cf", scope_vars=set([EitherTL, Xray]), factor_fn=phi_EitherTLXray
)
Smoking_cf = Factor(gid="Smoking_cf", scope_vars=set([Smoking]), factor_fn=phi_Smoking)
SmokingBronchitis_cf = Factor(
    gid="SmokingBronchitis_cf", scope_vars=set([Smoking, Bronchitis]),
    factor_fn=phi_SmokingBronchitis
)
SmokingLungCancer_cf = Factor(
    gid="SmokingLungCancer_cf", scope_vars=set([Smoking, LungCancer]),
    factor_fn=phi_SmokingLungCancer
)
LungCancerEitherTLTuberculosis_cf = Factor(
    gid="LungCancerEitherTLTuberculosis_cf",
    scope_vars=set([EitherTL, LungCancer, Tuberculosis]), factor_fn=phi_LungCancerEitherTLTuberculosis
)

DysponeaCoughBronchitis_cf = Factor(
    gid="IHBronchitis_cf", scope_vars=set([Cough, Dysponea, Bronchitis]), factor_fn=phi_DysponeaCoughBronchitis
)

CoughBronchitisEitherTL_cf = Factor(
    gid="CoughBronchitisEitherTL_cf", scope_vars=set([Cough, EitherTL, Bronchitis]), 
    factor_fn=phi_CoughBronchitisEitherTL
)
BronchitisEitherTL_cf = Factor(
    gid="BronchitisEitherTL_cf", 
    scope_vars=set([EitherTL, Bronchitis]), 
    factor_fn=phi_BronchitisEitherTL
)

NameError: name 'pdb' is not defined

In [None]:
# instantiate lwf chain graph and make a query
cowell = LWFChainGraph(
    gid="cowell",
    nodes=set([Smoking, Bronchitis, LungCancer, EitherTL, VisitAsia, 
               Tuberculosis, Xray, Cough, Dysponea]),
    edges=set([SmokingBronchitis_c, SmokingLungCancer_c, 
               LungCancerEitherTL_c,
               VisitAsiaTuberculosis_c, TuberculosisEitherTL_c, 
               EitherTLXray_c, EitherTLCough_c, BronchitisCough_c,
               BronchitisDysponea_c, CoughDysponea_c]),
    factors=set([VisitAsia_cf, VisitAsiaTuberculosis_cf,
                 EitherTLXray_cf, Smoking_cf,
                 SmokingBronchitis_cf,
                 SmokingLungCancer_cf, 
                 LungCancerEitherTLTuberculosis_cf,
                 DysponeaCoughBronchitis_cf, CoughBronchitisEitherTL_cf, 
                 BronchitisEitherTL_cf])
)
evidences = set([("VisitAsia", True), ("Smoking", True), ("Xray", False)])

final_factor, a = cowell.cond_prod_by_variable_elimination(
    set([Bronchitis]), evidences
)

round(FactorOps.phi_normal(final_factor, set([("Cough", True)])), 4)
# 0.60

