<a href="https://colab.research.google.com/github/Sphinomni/condHC/blob/main/i232_HW6_Q3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Homework6 Question 3

This notebook is used to answer I232 Homework6 Question 3.

Please change the inputs in the "Inputs" part. Then run all the code in this notebook. 

The final answer(1. a generated markov chain sample; 2. the conditional huffman code; 3. the recovered sample; 4. rate R per symbol) will be shown at the end of the notebook.


## Inputs

In [1]:
import numpy as np

# the length of markov chain
n = 8

# the volumne of source symbol space
m = 4

# the initial probabilities of markov chain
pX1 = np.array([1/2, 1/4, 1/8, 1/8])

# the transition probabilities of markov chain
# P(Xn|Xn-1=i) is represented by the i-th column of P
P = np.array([
     [1/2, 1/4, 1/8, 1/8],
     [1/4, 1/8, 1/8, 1/2],
     [1/8, 1/8, 1/2, 1/4],
     [1/8, 1/2, 1/4, 1/8]
])

#### You can change this value to control whether random seed is used
USE_RANDOM_SEED = False

#### You can change this random seed value
RSEED = 42

#### Please don't change the following lines
#### This line will generate the symbol space {1, 2, ..., m}
symbols = [_ for _ in range(1,m+1)]

## Huffman Tree and Conditional Huffman Code Encoder/Decoder

In [2]:
################################################################################
# Function Name: getHuffmanDicts
# Description: Generate  the binary huffman code for input symbols with corresponding probs.
# Input:
#    symbols: list, the symbols to encode, e.g: [1,2,3,4]...
#    probs: list, the probabilities of the symbols
# Output:
#    symbol2codeMap : map, keys are the symbols and values are the huffman codes
#    code2symboldMap : map, keys are the huffman code and values are the symbols
################################################################################
def getHuffmanDicts(symbols, probs):

  # huffman tree node to store infomation in the construction progress
  class HuffmanTreeNode():
    def __init__(self, symbol, prob, leftChild=None, rightChild=None):
      self.leftChild = leftChild
      self.rightChild = rightChild

      self.symbol = symbol
      self.prob = prob
    
    def __str__(self):
      return "({0}, {1})".format(self.symbol,self.prob)

    def isLeaf(self):
      return self.leftChild is None and self.rightChild is None
  
  # combining two nodes and return a new parent node
  def mergeNodes(n1, n2):
    parent = HuffmanTreeNode(None, n1.prob + n2.prob, n1, n2)
    return parent

  def printHuffmanTree(node):
    if node is None:
      return
    print(node.__str__(),end="")
    print("[", end="")
    printHuffmanTree(node.leftChild)
    print(",",end="")
    printHuffmanTree(node.rightChild)
    print("]", end="")
  
  # creating a working list
  treeNodes = [HuffmanTreeNode(symbol, prob) for symbol, prob in zip(symbols,probs)]
  treeNodes = sorted(treeNodes, key=lambda x:x.prob, reverse=True)

  # repeat constructing the huffman tree until there's no more than 1 node in the working list
  while len(treeNodes) > 1:
    # get the two nodes with smallest probs
    n1 = treeNodes.pop()
    n2 = treeNodes.pop()

    # merge them into a new node
    newTreeNode = mergeNodes(n1, n2)

    # insert the new node to the working list
    treeNodes.append(newTreeNode)
    treeNodes = sorted(treeNodes, key=lambda x:x.prob, reverse=True)
  
  # get the root for the constructed huffman tree
  # root = treeNodes[0]
  # printHuffmanTree(root)

  # repeat constructing huffman codes according to the huffman tree generated
  symbol2codeMap, code2symbolMap = dict(),dict()
  def generateHuffmanCodeForNode(node, prefix):
    if node is None:
      return
    # if the node is a leaf , then the code for this leaf is stored in prefix
    if node.isLeaf():
      symbol2codeMap[node.symbol] = prefix
      code2symbolMap[prefix] = node.symbol
      return
    # else, continue generating, mark route to left child as 1 and right as 0
    generateHuffmanCodeForNode(node.leftChild, prefix + "1")
    generateHuffmanCodeForNode(node.rightChild, prefix + "0")

  generateHuffmanCodeForNode(treeNodes[0], "")
  return symbol2codeMap, code2symbolMap

################################################################################
# Function Name: getCondHuffmanDicts
# Description: Generate  the binary conditional huffman code 
#                   for input symbols with corresponding probs.
# Input:
#    symbols: list, the symbols to encode, e.g: [1,2,3,4]...
#    Pinit: list, the initial probabilities of the symbols, P(X1)
#    Ptrans: matrix, the conditional probabilities of a symbol, P(Xn|Xn-1)
# Output:
#    s2cMaps : list, the  symbol2code maps, s2cMaps[0] is the code based on PX1
#    c2sMaps : likewise
################################################################################
def getCondHuffmanDicts(m, symbols, Pinit, Ptrans):
  s2cMapInit, c2sMapInit = getHuffmanDicts(symbols, Pinit)

  # the  symbol2code and code2symbol maps
  # ...Maps[0] is the code based on the inital probs PX1
  s2cMaps = [s2cMapInit]
  c2sMaps = [c2sMapInit]

  # generating the conditional huffman maps
  for i in range(m):
    s2cMapCond, c2sMapCond = getHuffmanDicts(symbols, Ptrans[:, i])
    s2cMaps.append(s2cMapCond)
    c2sMaps.append(c2sMapCond)
  
  return s2cMaps, c2sMaps

################################################################################
# Function Name: encode
# Description: generate conditional huffman code
# Input:
#    seq: list, the plaintext message, e.g [1,2,3,4,2,2,1]
#    s2cMaps: list, a list of symbol to conditional huffman code maps
#    sep: str, a seperator between codes for each symbol, default none
# Output:
#    code: the conditional huffman code of the input plain text, with/without seperators
################################################################################
def encode(seq, s2cMaps, sep=""):
  code = ""
  s2cMap = s2cMaps[0]

  for i,symbol in enumerate(seq):
    if i != 0:
      code = code + sep + s2cMap[symbol]
    else:
      code = code + s2cMap[symbol]
    s2cMap = s2cMaps[symbol]
  
  return code

################################################################################
# Function Name: decode
# Description: decode from a conditional huffman code
# Input:
#    code: str, encoded sequence of 0 and 1, e.g "01001110110100"(without sep) 
#             or "0 10 0 111 0 110 10 0"(with sep = " ")
#    c2sMaps: list, a list of conditional huffman code to symbol maps
#    sep: str, a seperator between codes for each symbol
# Output:
#    seq: the original message
################################################################################
def decode(code, c2sMaps, sep=""):
  # decode a code withou seps between symbols
  def decode_without_sep(code, c2sMaps):
        
        seq_ = []
        start = 0
        end = start+1
        c2sMap = c2sMaps[0]

        while start < len(code):

          # find the first matched code in the key set
          while end < len(code) and code[start:end] not in c2sMap.keys():
            end += 1

          if code[start:end] in c2sMap.keys():
            # map code to symbol
            symbol = c2sMap[code[start:end]]
            seq_.append(symbol)
            c2sMap = c2sMaps[symbol]
          else:
            # the code sequence didn't end correctly
            print("Error!")
          
          start = end
      
        return seq_   

  # decode the code seq with seps between symbols
  def decode_with_sep(code, c2sMaps, sep):
    codes = code.split(sep)
    c2sMap = c2sMaps[0]
    seq_ = []

    for code4sym in codes:
      symbol = c2sMap[code4sym]
      seq_.append(symbol)
      c2sMap = c2sMaps[symbol]

    return seq_

  # choose the decode scheme according to the sep
  if sep is None or sep == "":
    return decode_without_sep(code, c2sMaps)
  else:
    return decode_with_sep(code, c2sMaps, sep)

In [3]:
# #Test case 1

# # Kai
# m_test = 4
# symbols_test = [_ for _ in range(1,m+1)]

# # Pinit
# pX1_test = np.array([1/2, 1/4, 1/8, 1/8])

# # Ptrans
# P_test = np.array([
#      [1, 0, 0, 0],
#      [0, 0.5, 0.5, 0],
#      [0, 0.25, 0.25, 0.5],
#      [0, 0.25, 0.25, 0.5]
# ])

# # test input
# seq_test = [3,2,1,1,1,2,4,1]
# print("text:\t\t",seq_test)

# # test
# s2cMaps, c2sMaps = getCondHuffmanDicts(m_test, symbols_test, pX1_test, P_test)

# code_test = encode(seq_test, s2cMaps)
# print("code:\t", code_test)
# seq_test_ = decode(code_test,c2sMaps)
# print("recover:\t",seq_test_)

text:		 [3, 2, 1, 1, 1, 2, 4, 1]
code:	 11001110010110110
recover:	 [3, 2, 1, 1, 1, 2, 4, 1]


## Experiment

In [4]:
# generate a markov chain according to pX1 and P
def getSample(symbols, pX1, P, n):
  from random import random
  from numpy.random import choice

  seq = []
  # markov process
  probs = pX1
  for i in range(n):

    # choose a symbol according to the initial Prob. or transition Prob.
    ind = choice(range(len(symbols)), p=probs)
    seq.append(symbols[ind])

    # get Ptrans according to current symbol
    probs = P[:, ind]
  
  return seq

# check two sequences
# 1. length
# 2. value of each bit
def checkAns(x, x_):
  if len(x) != len(x_):
    print("wrong length! should be {}, got {}".format(len(x),len(x_)))
    return False

  retVal = True
  for i,(xx,xx_) in enumerate(zip(x, x_)):
    if (xx!=xx_):
      print("different symbol at {}, should be {}, got {}".format(i, xx,xx_))
      retVal = False

  return retVal



In [5]:
import numpy as np
import random

if USE_RANDOM_SEED:
  random.seed(RSEED)
  np.random.seed(RSEED)

# step1: get a markov chain sample
x = getSample(symbols, pX1, P, n)
print("markov chain({}):\t\t".format(n), x)

# step2: encode x
s2cMaps, c2sMaps = getCondHuffmanDicts(m,symbols, pX1, P)
# print(s2cMaps)
code = encode(x, s2cMaps)
code_sep = encode(x, s2cMaps, sep=" ")
print("code:\t\t\t",code)
print("code(with sep):\t\t", code_sep)
print("rate per symbol:\t\t%.03f"% (1.0*len(code)/n))

# step3: decode
x_ = decode(code, c2sMaps)
print("decoded:\t\t\t", x_)
print("checked:\t\t\t",checkAns(x, x_))


markov chain(8):		 [4, 2, 2, 2, 4, 4, 3, 3]
code:			 11101101100111100
code(with sep):		 111 0 110 110 0 111 10 0
rate per symbol:		2.125
decoded:			 [4, 2, 2, 2, 4, 4, 3, 3]
checked:			 True
