长尾效应

# 1.准备词典

In [1]:
from pyhanlp import *

In [2]:
def load_dictionary():
    """
    加载HanLP中的mini词库
    :return: 一个set形式的词库
    """
    IOUtil = JClass('com.hankcs.hanlp.corpus.io.IOUtil') # 根据Java类得到一个Python类
    path = HanLP.Config.CoreDictionaryPath.replace('.txt', '.mini.txt')
    dic = IOUtil.loadDictionary([path])
    return set(dic.keySet())

In [3]:
dic = load_dictionary()
print(len(dic))
print(list(dic)[0])

85584
贺年卡


# 2.切分算法

## 2.1.完全切分

不是标准意义的切词，找出所有单词

In [4]:
def fully_segment(text, dic):
    word_list = []
    for i in range(len(text)):                  # i 从 0 到text的最后一个字的下标遍历
        for j in range(i + 1, len(text) + 1):   # j 遍历[i + 1, len(text)]区间
            word = text[i:j]                    # 取出连续区间[i, j]对应的字符串
            if word in dic:                     # 如果在词典中，则认为是一个词
                word_list.append(word)
    return word_list

In [5]:
dic = load_dictionary()
print(fully_segment('商品和服务', dic))

['商', '商品', '品', '和', '和服', '服', '服务', '务']


## 2.2.正向最长匹配

优先输出更长的单词（最长匹配算法）

In [6]:
def forward_segment(text, dic):
    word_list = []
    i = 0
    while i < len(text):
        longest_word = text[i]                      # 当前扫描位置的单字
        for j in range(i + 1, len(text) + 1):       # 所有可能的结尾
            word = text[i:j]                        # 从当前位置到结尾的连续字符串
            if word in dic:                         # 在词典中
                if len(word) > len(longest_word):   # 并且更长
                    longest_word = word             # 则更优先输出
        word_list.append(longest_word)              # 输出最长词
        i += len(longest_word)                      # 正向扫描
    return word_list

In [7]:
dic = load_dictionary()
print(forward_segment('就读北京大学', dic))
print(forward_segment('研究生命起源', dic))

['就读', '北京大学']
['研究生', '命', '起源']


## 2.3.逆向最长匹配

In [8]:
def backward_segment(text, dic):
    word_list = []
    i = len(text) - 1
    while i >= 0:                                   # 扫描位置作为终点
        longest_word = text[i]                      # 扫描位置的单字
        for j in range(0, i):                       # 遍历[0, i]区间作为待查询词语的起点
            word = text[j: i + 1]                   # 取出[j, i]区间作为待查询单词
            if word in dic:
                if len(word) > len(longest_word):   # 越长优先级越高
                    longest_word = word
                    break
        word_list.insert(0, longest_word)           # 逆向扫描，所以越先查出的单词在位置上越靠后
        i -= len(longest_word)
    return word_list

In [9]:
dic = load_dictionary()
print(backward_segment('研究生命起源', dic))

['研究', '生命', '起源']


## 2.4.双向最长匹配

正向最长和逆向最长的取长补短

逆向最长的成功次数更多

流程：

- 同时执行正向和逆向，若两者词数不同，返回词数更少的那个

- 否则，返回两者中单字更少的那一个。当单字数也相同时，优先返回逆向最长匹配的结果。

In [10]:
def count_single_char(word_list: list):  # 统计单字成词的个数
    return sum(1 for word in word_list if len(word) == 1)


def bidirectional_segment(text, dic):
    f = forward_segment(text, dic)
    b = backward_segment(text, dic)
    if len(f) < len(b):                                  # 词数更少优先级更高
        return f
    elif len(f) > len(b):
        return b
    else:
        if count_single_char(f) < count_single_char(b):  # 单字更少优先级更高
            return f
        else:
            return b                                     # 都相等时逆向匹配优先级更高

In [11]:
dic = load_dictionary()
print(bidirectional_segment('研究生命起源', dic))

['研究', '生命', '起源']


## 2.5.速度测评

词典分词的核心价值在速度

In [12]:
import time

In [13]:
def evaluate_speed(segment, text, dic):
    start_time = time.time()
    for i in range(pressure):
        segment(text, dic)
    elapsed_time = time.time() - start_time
    print('%.2f 万字/秒' % (len(text) * pressure / 10000 / elapsed_time))

In [14]:
text = "江西鄱阳湖干枯，中国最大淡水湖变成大草原"
pressure = 10000
dic = load_dictionary()
print('由于JPype调用开销巨大，以下速度显著慢于原生Java')
evaluate_speed(forward_segment, text, dic)
evaluate_speed(backward_segment, text, dic)
evaluate_speed(bidirectional_segment, text, dic)

由于JPype调用开销巨大，以下速度显著慢于原生Java
133.49 万字/秒
120.31 万字/秒
57.90 万字/秒


# 3.字典树

匹配算法的瓶颈在于判断集合中是否有字符串。如果用有序集合（TreeMap），复杂度是O(logn)。

使用字典树（trie树、前缀树）存储，每一条路径都可以表示一个字符串。在相同前缀的字符串上有了加速时间和节省空间。

## 3.1.字典树的节点实现

In [15]:
class Node(object):
    def __init__(self, value) -> None:
        self._children = {}
        self._value = value

    def _add_child(self, char, value, overwrite=False):
        child = self._children.get(char)
        if child is None:
            child = Node(value)
            self._children[char] = child
        elif overwrite:
            child._value = value
        return child

## 3.2.增删改查

In [16]:
class Trie(Node):
    def __init__(self) -> None:
        super().__init__(None)

    def __contains__(self, key):
        return self[key] is not None

    def __getitem__(self, key):
        state = self
        for char in key:
            state = state._children.get(char)
            if state is None:
                return None
        return state._value

    def __setitem__(self, key, value):
        state = self
        for i, char in enumerate(key):
            if i < len(key) - 1:
                state = state._add_child(char, None, False) # 还在路上
            else:
                state = state._add_child(char, value, True) # 在该节点取到该值

In [17]:
trie = Trie()
# 增
trie['自然'] = 'nature'
trie['自然人'] = 'human'
trie['自然语言'] = 'language'
trie['自语'] = 'talk	to oneself'
trie['入门'] = 'introduction'
assert '自然' in trie
# 删
trie['自然'] = None
assert '自然' not in trie
# 改
trie['自然语言'] = 'human language'
assert trie['自然语言'] == 'human language'
# 查
assert trie['入门'] == 'introduction'

# 4.双数组字典树

（Double Array Trie，DAT），状态转移复杂度为常数（BinTrie为logn），由base和check两个数组组成。

双数组字典树是DFA（有穷状态机），其状态由base和check中的元素和下标表示。

当状态b接受字符c转移到状态p：

`p = base[b] + c`

`check[p] = base[b]`

## 4.1.状态转移

由于Python默认的散列函数不适合字符。可以借用Java的hashCode()

## 4.2.查询

有了transition，对key的查询就变成至多len(key)+1次状态转移，多出来的一次针对' \0 '。

先查询key的字典序，再去取出value。

具体做法：将字典序作为自然数赋予作为单词结尾的那些节点。约定当状态p满足`base[p] < 0`时，对应单词结尾，且单词的字典序为-base[p]-1

In [18]:
class DoubleArrayTrie(object):
    def __init__(self, dic: dict) -> None:
        m = JClass('java.util.TreeMap')()
        for k, v in dic.items():
            m[k] = v
        DoubleArrayTrie = JClass('com.hankcs.hanlp.collection.trie.DoubleArrayTrie')
        dat = DoubleArrayTrie(m)
        self.base = dat.getBase()
        self.check = dat.getCheck()
        self.value = dat.getValueArray([''])

    @staticmethod
    def char_hash(c) -> int:
        return JClass('java.lang.Character')(c).hashCode()

    def transition(self, c, b) -> int:
        """
        状态转移
        :param c: 字符
        :param b: 初始状态
        :return: 转移后的状态，-1表示失败
        """
        p = self.base[b] + self.char_hash(c) + 1
        if self.base[b] == self.check[p]:
            return p
        else:
            return -1

    def __getitem__(self, key: str):
        b = 0
        for i in range(0, len(key)):  # len(key)次状态转移
            print(key[i])
            p = self.transition(key[i], b)
            print(p)
            if p != -1:
                b = p
            else:
                return None

        p = self.base[b]  # 按字符'\0'进行状态转移
        n = self.base[p]  # 查询base
        print(n)
        if p == self.check[p] and n < 0:  # 状态转移成功且对应词语结尾
            index = -n - 1  # 取得字典序
            return self.value[index]
        return None

In [19]:
# 测试
dic = {'自然':'nature','自然人':'human','自然语言':'language','自语':'talk to oneself','入门':'introduction'}
dat = DoubleArrayTrie(dic)
assert dat['自然'] == 'nature'
assert dat['自然语言'] == 'language'
assert dat['不自然'] is None

自
33260
然
38380
-2
自
33260
然
38380
语
74203
言
38383
-4
不
-1


## 4.3.AC自动机

In [20]:
def classic_demo():
    words = ["hers", "his", "she", "he"]
    Trie = JClass('com.hankcs.hanlp.algorithm.ahocorasick.trie.Trie')
    trie = Trie()
    for w in words:
        trie.addKeyword(w)

    for emit in trie.parseText("ushers"):
        print("[%d:%d]=%s" % (emit.getStart(), emit.getEnd(), emit.getKeyword()))

In [21]:
classic_demo()

[2:3]=he
[1:3]=she
[2:5]=hers


## 4.4.基于双数组字典树的AC自动机

In [22]:
def classic_demo():
    words = ["hers", "his", "she", "he"]
    map = JClass('java.util.TreeMap')()     # 创建TreeMap实例
    for word in words:
        map[word] = word.upper()            # 存放键值对
    trie = JClass('com.hankcs.hanlp.collection.AhoCorasick.AhoCorasickDoubleArrayTrie')(map)
    for hit in trie.parseText("ushers"):    # 遍历查询结果
        print("[%d:%d]=%s" % (hit.begin, hit.end, hit.value))

In [23]:
classic_demo()

[1:4]=SHE
[2:4]=HE
[2:6]=HERS


In [26]:
HanLP.Config.ShowTermNature = False
segment = JClass('com.hankcs.hanlp.seg.Other.AhoCorasickDoubleArrayTrieSegment')(HanLP.Config.CoreDictionaryPath)
print(segment.seg("江西鄱阳湖干枯，中国最大淡水湖变成大草原"))

[江西, 鄱阳湖, 干枯, ，, 中国, 最大, 淡水湖, 变成, 大草原]


In [27]:
from pyhanlp.static import HANLP_DATA_PATH

HanLP.Config.ShowTermNature = False
dict1 = HANLP_DATA_PATH + "/dictionary/CoreNatureDictionary.mini.txt"
dict2 = HANLP_DATA_PATH + "/dictionary/custom/上海地名.txt ns"

segment = DoubleArrayTrieSegment(dict1)
print(segment.seg('江西鄱阳湖干枯，中国最大淡水湖变成大草原'))

segment = DoubleArrayTrieSegment([dict1, dict2])
print(segment.seg('上海市虹口区大连西路550号SISU'))

segment.enablePartOfSpeechTagging(True)
HanLP.Config.ShowTermNature = True
print(segment.seg('上海市虹口区大连西路550号SISU'))

for term in segment.seg('上海市虹口区大连西路550号SISU'):
    print("单词:%s 词性:%s" % (term.word, term.nature))


[江西, 鄱阳湖, 干枯, ，, 中国, 最, 大, 淡水湖, 变成, 大, 草原]
[上海市, 虹口区, 大连西路, 5, 5, 0, 号, S, I, S, U]
[上海市/ns, 虹口区/ns, 大连西路/ns, 550/m, 号/q, SISU/nx]
单词:上海市 词性:ns
单词:虹口区 词性:ns
单词:大连西路 词性:ns
单词:550 词性:m
单词:号 词性:q
单词:SISU 词性:nx


In [30]:
import re
from pyhanlp import *
from tests.test_utility import ensure_data

ModuleNotFoundError: No module named 'tests'