In [1]:
PYCANTONESE_PATH = r'/home/lun/csrp/corpuses/pycantonese/'
CORPUS_PATH = r'/home/lun/csrp/corpuses/hkcancor/'
OUTPUT_PATH = r'/home/lun/csrp/jieba/jieba/'

import sys, re, glob, math, collections
sys.path.insert(0, PYCANTONESE_PATH)
import pycantonese as pc
import pandas as pd
import numpy as np
import pickle
from pprint import pprint
from contextlib import redirect_stdout

### Extract text and pos tags from HKCanCor CHAT files

### Compile HMM data from output in Part 1

In [2]:
corpus_size = 58
dataframes = []
for i in range(corpus_size):
    with open(CORPUS_PATH + r'text/hk_cantonese_corpus_%d.txt' % i, 
        'r', encoding='utf-8') as ftext:
        
        # put each string in list into a list of words,
        # removing all empty entries in latter
        text_list = [x.split(" ") for x in ftext.read().splitlines()]
        text_list = [list(filter(str.strip, x )) for x in text_list]
    assert(ftext.closed)
    
    
    with open(CORPUS_PATH + r'pos/hk_cantonese_corpus_pos_%d.txt' % i,
        'r', encoding='utf-8') as fpos:
        
        # put each string in list into a list of pos tags
        # removing all empty entries in latter
        pos_list = [x.split(" ") for x in fpos.read().splitlines()]
        pos_list = [list(filter(str.strip, x)) for x in pos_list]
    assert(fpos.closed)
    
    
    table = pd.DataFrame({ 'text' : text_list, 'pos' : pos_list })
    dataframes.append(table)
    
df = pd.concat(dataframes, axis=0, ignore_index=True)

In [3]:
df = df[['text', 'pos']].copy()

In [4]:
df

Unnamed: 0,text,pos
0,"[有, 有, 一, 間, 呢, 就, 賣, 開, 誒, 家庭, 用品, 嘅, 就, 即係, ...","[v1, v1, m, q, y1, d, v, u, e, n, n, y, d, c, ..."
1,"[就, 爭取, 到, 六成, 嘅, 減, 租, 啊]","[d, v, u, m, u, v, ng, y]"
2,"[噉樣, 就, 即係, 令到, 個, 商鋪, 呢, 打, 咗, 支, 強心針, 就, 堅持,...","[c, d, c, v, q, n, y1, v, u, q, n, d, v, d, d,..."
3,"[重, 有得, 搞]","[d, vu, v]"
4,[喀],[e]
5,"[但係, 其實, 而家, 已經, 變, 咗, 攻防戰, 𡃉, 嚹]","[c, d, t, d, v, u, n, y, y]"
6,"[即, 係, 你有過牆梯, 唔係, 你有張良計, 我有過牆梯, 𡃉, 嘞]","[c, v, l, v, l, l, y, y]"
7,[你有過牆梯],[l]
8,"[噉, 𠻺, 點, 呢]","[c, d, r, y]"
9,"[我, 就, 出, 多, 自己, 一, 條, 張良計, 噉樣, 啊, 喀]","[r, d, v, a, r, m, q, l, c, y, e]"


In [5]:
# function to check if string is an ascii
def string_is_ascii(string):
    try:
        string.encode(encoding='ascii')
    except UnicodeEncodeError:
        return False
    return True


# preprocessing function for BMES tagging of words
#
# It separates words to a list of characters
# To preserve the ascii word,
# find the first pointer ascii character position
# find the last pointer ascii character position
# concatenate ascii characters in the sublist, 
# removing empty strings and white spaces in between
# parameter: word - word
# returns: a list of tokens
def tokenize_word(word):
    char_list = list(word)
#     print("Word is separated to : " + str(char_list))

    first_ascii_pos = []
    last_ascii_pos = []
    
    ascii_flag = False
    for i, c in enumerate(char_list):
#         print("The current character is %s" %c)
        if string_is_ascii(c):           
            if ascii_flag == False:
#                 print("ascii set to true")
                first_ascii_pos.append(i)
                ascii_flag = True
            if i == len(char_list) - 1:
                last_ascii_pos.append(len(char_list))
        else:
            if ascii_flag == True:
#                 print("ascii set to false")
                last_ascii_pos.append(i)
                ascii_flag = False
        
    if len(first_ascii_pos): # if array is not empty
#         print(first_ascii_pos[::-1])
#         print(last_ascii_pos[::-1])
        for i, j in zip(first_ascii_pos[::-1], last_ascii_pos[::-1]):
#             print(i, j)
            char_list[i:j] = list(
                filter(None, "".join(char_list[i:j]).split(" ") ) )
    return char_list

    

# function to tag words using the
# BMES (begin, middle, end, single) tagging system
# precondition: string must not be empty
# returns list of separated words and corresponding
# BMES tags
def tagWord_BMES(word):
    word_length = len(word)
    assert(word_length)
    bmes_list = []
    
    word_list = tokenize_word(word)
    if len(word_list) == 1:
        bmes_list.append("S")
    else:
        for i, w in enumerate(word_list):
            if i == 0:
                bmes_list.append("B")
            elif i == len(word_list) - 1:
                bmes_list.append("E")
            else:
                bmes_list.append("M")
        
    return bmes_list

In [6]:
# # test cases
# print(tokenize_word('你office land過牆梯'))
# print(tokenize_word('office'))
# print(tokenize_word('你office'))
# print(tokenize_word('hello過牆梯world'))
# print(tokenize_word('hello過牆梯world過牆'))
# print(tokenize_word('你過牆梯'))
# print(tokenize_word('Hello  World'))

# print(tagWord_BMES('你有過牆梯')) # BMMME
# print(tagWord_BMES('office')) # S
# print(tagWord_BMES('你office')) # BE
# print(tagWord_BMES('office牆')) # BE
# print(tagWord_BMES('Hello World')) # BE
# print(tagWord_BMES('有有')) # BE
# print(tagWord_BMES('有')) # S

In [7]:
# a helper to count total number of start instances
def countTotalStartInstances(start_dict):
    total = 0
    for _, value in start_dict.items():
        total += value
    return total


# create training algorithm to calculate 
# emission (BMES->word) and transition (BMES->BMES)
# probabilities
# returns: a tuple of prob_trans, prob_emit, and prob_start
def trainingHMM_BMESTagging(text_lists):
    emission = {}
    transition = {}
    context = {} 
    start = {}
    
    # for prob_*.* files in jieba
    prob_trans = collections.defaultdict(dict)
    prob_emit = collections.defaultdict(dict)
    prob_start = {}
    
    
    # this is for the training part
    for line_list in text_lists:
        previous = '<s>'
        if previous not in context:
            context[previous] = 0
        context[previous] += 1
        
        for j, character in enumerate(line_list):
#             print("The entry contains %s" % character)
            text_bmesTags_list = tagWord_BMES(character)
            if j == 0:
                start_tag = text_bmesTags_list[0]
                if start_tag not in start:
                    start[start_tag] = 0
                start[start_tag] += 1
        
            for i, bmesTag in enumerate(text_bmesTags_list):
                
                transition_bigram = previous + " " + bmesTag
                if transition_bigram not in transition:
                    transition[transition_bigram] = 0
                transition[transition_bigram] += 1

                if bmesTag not in context:
                    context[bmesTag] = 0
                context[bmesTag] += 1

                bigram_emission = bmesTag + " " + character[i]
                if bigram_emission not in emission:
                    emission[bigram_emission] = 0
                emission[bigram_emission] += 1

                previous = bmesTag
            
        bigram_transition = previous + " </s>"
        if bigram_transition not in transition:
            transition[bigram_transition] = 0
        transition[bigram_transition] += 1

    # output transition, emission and start probabilities
#     print(context)
    for key, value in transition.items():
        previous_tag, current_tag = key.split(" ", maxsplit=1)
        if previous_tag != '<s>' and current_tag != "</s>":
            prob_trans[previous_tag][current_tag] = math.log2(float(value)/context[previous_tag])
#         print("Transition probability of %s is %.15f" % (key, math.log2(float(value)/context[previous_tag]) ) )        
#     print("\n\n\n")
    
    for key, value in emission.items():
        tag, word = key.split(" ", maxsplit=1)
#         print("Context contains %d instances" % context[tag])
#         print("tag is %s, which emits %s, with emission probability of %.15f\n" % (
#             tag, word,  math.log2(float(value)/context[tag])))
        prob_emit[tag][word] = math.log2(float(value)/context[tag])
    
#     print("Start dict contains" + str(start))
    for tag, value in start.items():
        prob_start[tag] = math.log2(float(value)/countTotalStartInstances(start))
    prob_start["M"] = -3.14e100 # minimum float value defined in jieba (MIN_FLOAT)
    prob_start["E"] = -3.14e100 # minimum float value defined in jieba (MIN_FLOAT)
    
    return dict(prob_trans), dict(prob_emit), prob_start

In [8]:
prob_trans1, prob_emit1, prob_start1 = trainingHMM_BMESTagging(df.text.tolist())

In [9]:
# create training algorithm to calculate 
# emission (pos->word) and transition (pos-pos)
# probabilities
def trainingHMM_POSTagging(text_lists, posTags_lists):
    emission = {}
    transition = {}
    context = {} 
    start = {}
    char_state = {}
    
    # for char_state_tab.*, prob_*.* files in jieba
    prob_trans = collections.defaultdict(dict)
    prob_emit = collections.defaultdict(dict)
    prob_start = {}
    
    pos_tagset1 = [pos_tag for posTag_list in posTags_lists for pos_tag in posTag_list]
    pos_tagset2 = ['ag', 'a', 'ad', 'an', 'bg', 'b', 'c', 'dg', 'd', 
        'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'mg', 'm', 'ng', 'n', 
        'nr', 'ns', 'nt', 'nx', 'nz', 'o', 'p', 'qg', 'q', 'rg', 'r', 
        's', 'tg', 't', 'ug', 'u', 'vg', 'v', 'vd', 'vn', 'w', 'x', 
        'yg', 'y', 'z'] # official HKCanCor tagset
    unique_pos_tagset = set(pos_tagset1 + pos_tagset2)
    bmes_tagset = {'B', 'M', 'E', 'S'}
    
    # this is for the training part
    for line_list, linePosTags_list in zip(text_lists, posTags_lists):
        previous = ('<s>') # start sentence tag
        if previous not in context:
            context[previous] = 0
        context[previous] += 1
       
        for j, (character, pos_tag) in enumerate(
            zip(line_list, linePosTags_list) ):
#             print("The entry contains %s with tag %s " % (character, pos_tag) )
            character_bmesTags_list = tagWord_BMES(character)
            character_token_list = tokenize_word(character)
            
            # build up start dictionary
            if j == 0:
                start_tag = character_bmesTags_list[0]
                if (start_tag, pos_tag) not in start:
                    start[(start_tag, pos_tag)] = 0
                start[(start_tag, pos_tag)] += 1
        
            # build up transition, emission dictionaries
            for i, (token, bmesTag) in enumerate(
                zip(character_token_list, character_bmesTags_list)):
                
                tag_pair = (bmesTag, pos_tag)
                
                transition_bigram = (previous, tag_pair)
                if transition_bigram not in transition:
                    transition[transition_bigram] = 0
                transition[transition_bigram] += 1

                if tag_pair not in context:
                    context[tag_pair] = 0
                context[tag_pair] += 1

                bigram_emission = (tag_pair, token)
                if bigram_emission not in emission:
                    emission[bigram_emission] = 0
                emission[bigram_emission] += 1
                
                if token not in char_state:
                    char_state[token] = []
                char_state[token].append(tag_pair)
                    
                
                previous = tag_pair
            
        bigram_transition = (previous, "</s>")
        if bigram_transition not in transition:
            transition[bigram_transition] = 0
        transition[bigram_transition] += 1
        
    
# output transition, emission and start probabilities
#     print(context)
    for (previous_tag_pair, current_tag_pair), value in transition.items():
        if previous_tag_pair != ('<s>') and current_tag_pair != ("</s>"):
            prob_trans[previous_tag_pair][current_tag_pair] = math.log2(float(value)/context[previous_tag_pair])
#         print("Transition probability of %s is %.15f" % (key, math.log2(float(value)/context[previous_tag]) ) )        
#     print("\n\n\n")
    for bmes_tag in bmes_tagset: # do this for empty tag pairs 
        for pos_tag in unique_pos_tagset:
            if (bmes_tag, pos_tag) not in prob_trans:
                prob_trans[(bmes_tag, pos_tag)] = {}
                
    
    for (token, tag_pair_list) in char_state.items():
        char_state[token] = tuple(set(tag_pair_list)) # only keep unique tag sets
        
    
    for (tag_pair, word), value in emission.items():
#         print("Context contains %d instances" % context[tag_pair])
#         print("tag is %s, which emits %s, with emission probability of %.15f\n" % (
#             tag, word,  math.log2(float(value)/context[tag_pair])))
        prob_emit[tag_pair][word] = math.log2(float(value)/context[tag_pair])
    
    for bmes_tag in bmes_tagset: # do this for empty tag pairs 
        for pos_tag in unique_pos_tagset:
            if (bmes_tag, pos_tag) not in prob_emit:
                prob_emit[(bmes_tag, pos_tag)] = {}

                
#     print("Start dict contains" + str(start))
    for tag_pair, value in start.items():
        prob_start[tag_pair] = math.log2(float(value)/countTotalStartInstances(start))
    for bmes_tag in bmes_tagset: # do this for empty tag pairs 
        for pos_tag in unique_pos_tagset:
            if (bmes_tag, pos_tag) not in prob_start:
                prob_start[(bmes_tag, pos_tag)] = -3.14e100 # minimum float value defined in jieba (MIN_FLOAT)
    
    return dict(prob_trans), dict(prob_emit), prob_start, char_state

In [10]:
prob_trans2, prob_emit2, prob_start2, char_state2 = trainingHMM_POSTagging(df.text.tolist(), df.pos.tolist())

In [11]:
# char_state2

In [12]:
# s4, s5, s6, s7 = trainingHMM_POSTagging([['重', '有得', '搞']], [['d', 'vu', 'v']])

In [13]:
# s4

### Output prob_trans, prob_emit, prob_start

In [14]:
def outputDictionary(filename, prob_dict):
    with open(OUTPUT_PATH + filename, 'w', encoding='utf-8') as f:
        with redirect_stdout(f):
            print("P=", end='')
            pprint(prob_dict) 
    assert(f.closed)

def pickleDictionary(filename, prob_dict):
    with open(OUTPUT_PATH + filename, 'wb') as f:
        # jieba uses protocol 0 encoding for its pickle files
        pickle.dump(prob_dict, f, protocol=0)
    assert(f.closed)
    
def depickleDictionary(filename):
    with open(OUTPUT_PATH + filename, 'rb') as f:
        prob_dict = pickle.load(f, encoding='utf-8')
    assert(f.closed)
    return prob_dict
    
outputDictionary("posseg/prob_trans.py", prob_trans2)
outputDictionary("posseg/prob_emit.py", prob_emit2)
outputDictionary("posseg/prob_start.py", prob_start2)
outputDictionary("posseg/char_state_tab.py", char_state2)

pickleDictionary("posseg/prob_trans.p", prob_trans2)
pickleDictionary("posseg/prob_emit.p", prob_emit2)
pickleDictionary("posseg/prob_start.p", prob_start2)
pickleDictionary("posseg/char_state_tab.p", char_state2)

outputDictionary("finalseg/prob_trans.py", prob_trans1)
outputDictionary("finalseg/prob_emit.py", prob_emit1)
outputDictionary("finalseg/prob_start.py", prob_start1)

pickleDictionary("finalseg/prob_trans.p", prob_trans1)
pickleDictionary("finalseg/prob_emit.p", prob_emit1)
pickleDictionary("finalseg/prob_start.p", prob_start1)


# s1 = depickleDictionary("finalseg/prob_trans.p")
# s2 = depickleDictionary("finalseg/prob_emit.p")
# s3 = depickleDictionary("finalseg/prob_start.p")

# s1 = depickleDictionary("posseg/prob_trans.p")
# s2 = depickleDictionary("posseg/prob_emit.p")
# s3 = depickleDictionary("posseg/prob_start.p")
# s4 = depickleDictionary("posseg/char_state_tab.p")

In [15]:
# s4

### Compile Statistical Data from HKCanCor CHAT files