In [6]:
# prompt: install pomegranate version 0.15.0

# !pip install pomegranate==0.15.0
from pomegranate import *


**BAYESIAN NETWORK**

Next, we create a node representing each node in the bayesian network, in this case rain, maintenance, train and appointment. With rain being the parent node, maintenance a child node from rain and a parent node to train, etc.

In [2]:
# Rain node has no parents
rain = Node(DiscreteDistribution({
    "none": 0.7,
    "light": 0.2,
    "heavy": 0.1
}), name="rain")

# Track maintenance node is conditional on rain
maintenance = Node(ConditionalProbabilityTable([
    ["none", "yes", 0.4],
    ["none", "no", 0.6],
    ["light", "yes", 0.2],
    ["light", "no", 0.8],
    ["heavy", "yes", 0.1],
    ["heavy", "no", 0.9]
], [rain.distribution]), name="maintenance")

# Train node is conditional on rain and maintenance
train = Node(ConditionalProbabilityTable([
    ["none", "yes", "on time", 0.8],
    ["none", "yes", "delayed", 0.2],
    ["none", "no", "on time", 0.9],
    ["none", "no", "delayed", 0.1],
    ["light", "yes", "on time", 0.6],
    ["light", "yes", "delayed", 0.4],
    ["light", "no", "on time", 0.7],
    ["light", "no", "delayed", 0.3],
    ["heavy", "yes", "on time", 0.4],
    ["heavy", "yes", "delayed", 0.6],
    ["heavy", "no", "on time", 0.5],
    ["heavy", "no", "delayed", 0.5],
], [rain.distribution, maintenance.distribution]), name="train")

# Appointment node is conditional on train
appointment = Node(ConditionalProbabilityTable([
    ["on time", "attend", 0.9],
    ["on time", "miss", 0.1],
    ["delayed", "attend", 0.6],
    ["delayed", "miss", 0.4]
], [train.distribution]), name="appointment")

In [3]:
# Create a Bayesian Network and add states
model = BayesianNetwork()
model.add_states(rain, maintenance, train, appointment)

In [4]:
# Add edges connecting nodes
model.add_edge(rain, maintenance)
model.add_edge(rain, train)
model.add_edge(maintenance, train)
model.add_edge(train, appointment)

In [5]:
# Finalize model
model.bake()

In [7]:
# calculate the probability for a given observation
probability = model.probability([["none", "no", "on time", "miss"]])
print(probability)

0.037800000000000014


We can draw inferences from our model

In [11]:
# Calculate predictions
predictions = model.predict_proba({
    "train": "delayed",
    "rain" : "heavy"
})

# Print predictions for each node
for node, prediction in zip(model.states, predictions):
    if isinstance(prediction, str):
        print(f"{node.name}: {prediction}")
    else:
        print(f"{node.name}")
        for value, probability in prediction.parameters[0].items():
            print(f"    {value}: {probability:.4f}")

rain: heavy
maintenance
    yes: 0.1176
    no: 0.8824
train: delayed
appointment
    attend: 0.6000
    miss: 0.4000


**MARKOV CHAIN**

In [7]:
# Define starting probabilities/distribution
start = DiscreteDistribution({
    "sun": 0.5, # sunny 50% of the time
    "rain": 0.5 # rainy 50% of the time
})

# Define transition model
transitions = ConditionalProbabilityTable([
    ["sun", "sun", 0.8], # if its sunny today then prob. 0.8 it will be sunny tomorrow
    ["sun", "rain", 0.2], # if its sunny today then prob. 0.2 it will be rainy tomorrow
    ["rain", "sun", 0.3], # if its rainy today then prob. 0.3 it will be sunny tomorrow
    ["rain", "rain", 0.7] # if its rainy today then prob. 0.7 it will be rainy tomorrow
], [start])

In [8]:
# Create Markov chain
model = MarkovChain([start, transitions])

In [9]:
# Sample 50 states from chain, it basically sample 50 instances of the data, (sun and rain)

print(model.sample(50))

['rain', 'rain', 'sun', 'sun', 'rain', 'rain', 'rain', 'rain', 'sun', 'rain', 'rain', 'rain', 'sun', 'rain', 'rain', 'rain', 'rain', 'rain', 'rain', 'rain', 'rain', 'rain', 'rain', 'rain', 'rain', 'sun', 'sun', 'rain', 'rain', 'rain', 'rain', 'rain', 'sun', 'sun', 'sun', 'sun', 'sun', 'rain', 'sun', 'sun', 'sun', 'sun', 'rain', 'rain', 'sun', 'sun', 'sun', 'sun', 'sun', 'sun']


**HIDDEN MARKOV MODEL**

In [10]:
# Observation model for each state
sun = DiscreteDistribution({
    "umbrella": 0.2,
    "no umbrella": 0.8
})

rain = DiscreteDistribution({
    "umbrella": 0.9,
    "no umbrella": 0.1
})

states = [sun, rain]

# Transition model
transitions = numpy.array(
    [[0.8, 0.2], # Tomorrow's predictions if today = sun
     [0.3, 0.7]] # Tomorrow's predictions if today = rain
)

# Starting probabilities
starts = numpy.array([0.5, 0.5])

In [11]:
# Create the model
model = HiddenMarkovModel.from_matrix(
    transitions, states, starts,
    state_names=["sun", "rain"]
)
model.bake()


In [12]:
# Observed data
observations = [
    "umbrella",
    "umbrella",
    "no umbrella",
    "umbrella",
    "umbrella",
    "umbrella",
    "umbrella",
    "no umbrella",
    "no umbrella"
]

# Predict underlying states
predictions = model.predict(observations)
for prediction in predictions:
    print(model.states[prediction].name)

rain
rain
sun
rain
rain
rain
rain
sun
sun
