In [1]:
import jieba
import numpy as np
import torch
import re
from itertools import permutations

# 加载文件

In [2]:
def loadtext(path):
    return open(path,'r').read()

In [3]:
text = loadtext('/Users/manmanzhang/Library/Mobile Documents/com~apple~CloudDocs/MyProject/InferenceSystem/src/I5_algorithm/NLP数据集合/豆瓣电影数据集(2019.3)/豆瓣电影简介.txt')

# 只保留中文

In [4]:
def findchinese(text):
    return "".join(re.findall('[\u4E00-\u9FA5]',text))

# 词概率计算

In [5]:
def creat_word_dict(text,outputType = "dict"):

    chinese = findchinese(text)

    words = np.array(list(jieba.cut_for_search(chinese)))

    unique, counts = np.unique(words, return_counts=True)

    n = words.size
    
    if outputType == "dict":
        return dict(zip(unique,counts/n))
    elif outputType == 'array':
        return np.c_[unique,counts/n]

In [6]:
word_dict = creat_word_dict(text)

Building prefix dict from /usr/local/lib/python3.7/site-packages/jieba/dict.txt ...
Loading model from cache /var/folders/sl/q8x6_03132dfk7rktf00yh880000gn/T/jieba.cache
Loading model cost 1.0650320053100586 seconds.
Prefix dict has been built succesfully.


In [7]:
creat_word_dict(text,outputType='array')

array([['一', '0.0008042721298864966'],
       ['一一', '4.4752821217849576e-05'],
       ['一一对', '5.114608139182808e-07'],
       ...,
       ['龟裂', '5.114608139182808e-07'],
       ['龟鹤', '2.557304069591404e-07'],
       ['龢', '2.557304069591404e-07']], dtype='<U34')

## 检查特征字典得到概率

In [188]:
words = ["经常","经","有","有意见","意见","分歧","见","意","见分歧","分"]
prob_list = [0.1,0.05,0.1,0.1,0.2,0.2,0.05,0.05,0.05,0.1]
word_dict = dict(zip(words,-np.log(prob_list)))

In [189]:
def check_dict(word,word_dict=word_dict):
    return np.array(word_dict[word])

## 处理查表各种情况

In [9]:
def prob(words):
    if isinstance(words,str):
        try:
            return check_dict(words)
        except Exception:
            return 0
    elif isinstance(words,(tuple,list,np.ndarray)):
        n = len(words)
        result = []
        for word in words:
            try:
                temp = check_dict(word)
            except Exception:
                temp = 0
            result.append(temp)
        return np.array(result)

In [190]:
prob(['我'])

array([0])

In [191]:
def cum_log_joint_prob(words,display=False):
    P = prob(words)
    logfile = []
    expr = 0
    for p in P:
        if p == 0:
            expr += 10**(-8)
            if display:
                logfile.append({"p":p,"log(p)":0,"expr":expr})
        else:
            logp = -np.log(p)
            expr += logp
            if display:
                logfile.append({"p":p,"log(p)":logp,"expr":expr})
    if display:
        return expr,{''.join(words):logfile}
    return expr

In [192]:
cum_log_joint_prob(['在','北京'],display= True)

(2e-08,
 {'在北京': [{'p': 0, 'log(p)': 0, 'expr': 1e-08},
   {'p': 0, 'log(p)': 0, 'expr': 2e-08}]})

In [193]:
cum_log_joint_prob(['1','路'],display= True)

(2e-08,
 {'1路': [{'p': 0, 'log(p)': 0, 'expr': 1e-08},
   {'p': 0, 'log(p)': 0, 'expr': 2e-08}]})

# 换底公式

In [194]:
def log(a,b=2):
    return np.log(a)/np.log(b)
log(10)

3.3219280948873626

## 枚举词语组合可能性

In [195]:
def perm(iter):
    return list(permutations(iter))

## Ngram

In [16]:
def Ngram(word_list,n):
    sentence = to_numpy(word_list)
    m = sentence.size
    return np.array([np.array([word_list[j] for j in range(i) ][-n:]) for i in range(m+1)][1:])

# Ngram语言模型

In [196]:
def to_numpy(data):
    if isinstance(data,np.ndarray):
        return data
    else:
        return np.array(data)
to_numpy(["他","是","一","个","人"])

array(['他', '是', '一', '个', '人'], dtype='<U1')

In [206]:

def score(textList,n=2,display= False):
    if n ==1:
        model_name = 'Unigram'
    elif n == 2:
        model_name = 'Bigram'
    elif n == 3:
        model_name = 'Trigram'
    else:
        model_name = 'N-gram'

    text , m ,expr  = [],len(textList),0
    model_type = n == 1 and 'Bayesian Model' or "Markov Model"
    number = 0

    for arr,char in zip(Ngram(textList,n),textList):
        arr_dim = arr.size
        check_prob = cum_log_joint_prob(arr,display=display)
        

        if isinstance(check_prob,float):

            PAB = check_prob
        else:
            PAB , state = check_prob
            text.append(state)

        if arr_dim == 1:
            expr += PAB
        elif arr_dim > 1:
            PB = prob(char)
            expr += PAB/(-np.log(PB))

    if display:
        return {"{}{}{}".format(model_type,"->",model_name):expr},text
    return {"{}{}{}".format(model_type,"->",model_name):expr}

In [198]:
score("它 是 一 个 人".split(),3,display=False)

{'Markov Model->Trigram': -inf}

In [199]:
def Language_Model(words_list,n=2,display=False):
    possibility = perm(words_list)
    result = []
    for group in possibility:
        check = score(group,n,display=False)
        model_name = list(check)[0]
        score_ = check[model_name]
        result.append(score_)
        if display:
            print(model_name,score_)
    return possibility[result.index(min(result))]


In [205]:
Language_Model(words,2)

('经', '经常', '有', '有意见', '意见', '见', '意', '见分歧', '分', '分歧')

In [204]:
Language_Model(('去','北京','玩','不'),2),Language_Model(["我","要","不"],n=3)

(('去', '北京', '玩', '不'), ('我', '要', '不'))

## 分词

In [105]:
def forwoard_slide(original,index_,max_len = 5):
    return original[index_:index_+max_len]


def forwoard_test_word(text,dict_):
    char_len = len(text)
    for char_index in range(char_len,0,-1):
        temp_word = text[0:char_index]
        if temp_word in dict_:
            return temp_word

def forwoard_max_matching(original,max_len,dictionaries):
    result = []
    dict_ = list(dictionaries)
    n = len(original)
    for i in range(n):
        temp_text = forwoard_slide(original,i,max_len)
        temp_word = forwoard_test_word(temp_text,dictionaries)
        if temp_word != None:
            result.append(temp_word)
    return result


def backward_slide(original,index_,max_len = 5):
    cut = index_-max_len
    if cut < 0:
        cut = 0
    return original[cut:index_]

def backward_test_word(text,dict_):
    char_len = len(text)
    for char_index in range(char_len):
        temp_word = text[char_index:char_index+char_len]
        if temp_word in dict_:
            return temp_word

def backward_max_matching(original,max_len,dictionaries):
    result = []
    dict_ = list(dictionaries)
    n = len(original)
    for i in range(n-1,0,-1):
        temp_text = backward_slide(original,i)
        temp_word = backward_test_word(temp_text,dict_)
        if temp_word:
            result.append(temp_word)
    return result[::-1]


def MaxMatching(original,max_len,word_dict,modeltype="search"):
    set_backward , set_forwoard = set(backward_max_matching(original,max_len,word_dict)),set(forwoard_max_matching(original,max_len,word_dict))
    filter_one = lambda SET : set(filter(lambda x : len(x) == 1,SET))
    filter_one_plus = lambda SET : set(filter(lambda x : len(x) > 1,SET))
    distion = lambda SET : filter_one_plus(SET) | {char for char in filter_one(SET) if char not in "".join(filter_one_plus(SET))}
    if modeltype == "search":
        return set_backward | set_forwoard
    elif modeltype == 'all':
        return {"backward":distion(set_backward),"forwoard":distion(set_forwoard)}
    elif modeltype == "exact":
        union = set_backward & set_forwoard
        return distion(union)


In [113]:
split_words = MaxMatching("他们总说这个的薪资多少那个薪资多少",2,word_dict,modeltype = "exact")
split_words,Language_Model(split_words)

({'他们', '多少', '总说', '的', '薪资', '这个', '那个'},
 ('总说', '这个', '他们', '多少', '薪资', '那个', '的'))

In [115]:
Language_Model(jieba.cut("他们总说这个的薪资多少那个薪资多少"))

('总说', '这个', '他们', '多少', '薪资', '那个', '薪资', '多少', '的')

In [119]:
score(list(jieba.cut("他们总说这个的薪资多少那个薪资多少")))

{'Markov Model->Bigram': 81.47624950157825}

In [120]:
list(jieba.cut("他们总说这个的薪资多少那个薪资多少"))

['他们', '总说', '这个', '的', '薪资', '多少', '那个', '薪资', '多少']