In [220]:
from typing import List, Tuple

import pnlp
from pnlp import Text, num_norm, cut_zhchar

from dataclasses import dataclass, field
from collections import Counter
from itertools import chain
import numpy as np
import pandas as pd

import ahocorasick
from Levenshtein import jaro
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from LAC import LAC

ROOT = Path.cwd()

## 数据集

In [180]:
@dataclass
class Dataset:
    
    file_path: Path
    test_size: float = 0.2
    
    def __post_init__(self):
        self.df = pd.read_csv(self.file_path, sep="\t")
        self.train, self.test = self.split()
        
    def split(self):
        return train_test_split(self.df, test_size=self.test_size, random_state=42)

## 文本预处理

In [222]:
@dataclass
class PreProcessor:
    
    rules: List[str] = field(
        default_factory=lambda: ['pic', 'lnk'])
    
    def __post_init__(self):
        self.clean_rule = Text(self.rules)
    
    def clean(self, text: str) -> str:
        return self.clean_rule.clean(text)
    
    def normalize(self, text: str) -> str:
        return text
    
    def __call__(self, text: str) -> str:
        return self.normalize(self.clean(text))

## Tokenize

In [210]:
@dataclass
class Tokenzier:
    
    dict_path: str = None
    type: str = "word"
    
    def __post_init__(self):
        self.word_segmentor = LAC(mode="seg")
    
    def tokenize2word(self, text: str) -> List[str]:
        return self.word_segmentor.run(text)
    
    def tokenize2char(self, text: str) -> List[str]:
        return cut_zhchar(text)
    
    def tokenize(self, text: str) -> List[str]:
        return getattr(self, "tokenize2" + self.type)(text)
    
    def __call__(self, text: str) -> List[int]:
        pass

## 特征表征

In [223]:
@dataclass
class DataLoader:
    
    file_path: Path
    test_size: float = 0.2
    rules = ['pic', 'lnk']
    dict_path = None
    token_type: str = "word"
    
    def __post_init__(self):
        self.ds = Dataset(self.file_path, self.test_size)
        self.pp = PreProcessor(self.rules)
        self.tk = Tokenzier(self.dict_path, self.token_type)
    
    def dm_data(self, type: str = "train"):
        data = getattr(self.ds , type)
        for item in self.ds.train.itertuples(index=False):
            tokens = self.tk.tokenize(self.pp(item.text_a))
            yield tokens, item.label

In [224]:
dl = DataLoader(ROOT / "NLPCC14-SC/train.tsv")

### 词典/规则

In [245]:
@dataclass
class DictModel:
    
    top_n: int = 3000
        
    def __post_init__(self):
        self.pos = []
        self.neg = []
    
    def build_aho(self, pos: List[str], neg: List[str]):
        aho = ahocorasick.Automaton()
        for idx, key in enumerate(pos):
            aho.add_word(key, (1, key))
        for idx, key in enumerate(neg):
            aho.add_word(key, (-1, key))
        return aho
    
    def search(self, text: str) -> int:
        i = 0
        for end_index, (val, original_value) in self.aho.iter(text):
            i += val
        return i
    
    def _match(self, dct: List[str], text: str) -> float:
        res = 0.0
        sample = np.random.choice(dct, size=100, replace=False)
        for v in sample:
            res += jaro(text, v)
        return res / len(dct)
    
    def match(self, text: str) -> int:
        return int(self._match(self.pos, text) > self._match(self.neg, text))
    
    def train(self, data: List[Tuple[List[str], int]]):
        for tokens,label in data:
            if label == 1:
                self.pos.extend(tokens)
            else:
                self.neg.extend(tokens)
        pos_count = [v for v,f in Counter(self.pos).most_common(self.top_n)]
        neg_count = [v for v,f in Counter(self.neg).most_common(self.top_n)]
        self.aho = self.build_aho(pos_count, neg_count)
        self.aho.make_automaton()
        
    
    def predict(self, data: List[str]) -> int:
        num = self.search(" ".join(data))
        return self.match("".join(data)) if num == 0 else int(num > 0)
    
    def evaluate(self, data: List[Tuple[List[str], int]]) -> float:
        error = 0
        i = 0
        for tokens, label in data:
            pred = self.predict(tokens)
            error += (pred != label)
            i += 1
        return error / i

In [246]:
dm = DictModel()

In [247]:
dm.train(dl.dm_data("train"))

In [248]:
dm.evaluate(dl.dm_data("test"))

0.4985

In [249]:
lac.run("百度是一家科技公司。")

['百度', '是', '一家', '科技', '公司', '。']

In [254]:
import tensorflow.keras as tfk
import tensorflow as tf
from hnlp import gen_hidden

In [1]:
from paddlenlp import Taskflow
seg = Taskflow("word_segmentation")
seg("百度是一家科技公司。")

100%|██████████| 30975/30975 [00:09<00:00, 3214.41it/s]


In [2]:
seg("百度是一家科技公司。")

['百度', '是', '一家', '科技', '公司', '。']

In [261]:
conv = tfk.layers.Conv2D(
            filters=1,
            kernel_size=(2, 300),
            strides=(1, 1),
            padding="valid",
            data_format="channels_last",
            activation="relu",
            kernel_initializer="glorot_normal",
            bias_initializer=tfk.initializers.constant(0.1)
        )

In [264]:
emd = gen_hidden(1, 30, 300, 1)

In [265]:
conv(emd)

<tf.Tensor: shape=(1, 29, 1, 1), dtype=float32, numpy=
array([[[[0.        ]],

        [[0.00795222]],

        [[0.17629689]],

        [[0.1786648 ]],

        [[0.09023976]],

        [[0.        ]],

        [[0.        ]],

        [[0.07314899]],

        [[0.        ]],

        [[0.0391703 ]],

        [[0.20579764]],

        [[0.        ]],

        [[0.21271077]],

        [[0.13188985]],

        [[0.        ]],

        [[0.        ]],

        [[0.        ]],

        [[0.3476765 ]],

        [[0.2666411 ]],

        [[0.        ]],

        [[0.        ]],

        [[0.500645  ]],

        [[0.        ]],

        [[0.        ]],

        [[0.        ]],

        [[0.        ]],

        [[0.        ]],

        [[0.        ]],

        [[0.23655248]]]], dtype=float32)>