### 基于频率统计的N-gram

In [None]:
# %load tokenizer.py
import os,gc,re,sys
from itertools import chain
from pyhanlp import *


drop_pos_set=set(['xu','xx','y','yg','wh','wky','wkz','wp','ws','wyy','wyz','wb','u','ud','ude1','ude2','ude3','udeng','udh'])

Tokenizer = JClass('com.hankcs.hanlp.tokenizer.StandardTokenizer')
HanLP = JClass('com.hankcs.hanlp.HanLP')


def to_string(sentence,return_generator=False):
    if return_generator:
        return (word_pos_item.toString().split('/') for word_pos_item in Tokenizer.segment(sentence))
    else:
        return [(word_pos_item.toString().split('/')[0],word_pos_item.toString().split('/')[1]) for word_pos_item in Tokenizer.segment(sentence)]   


def to_string_hanlp(sentence,return_generator=False):
    if return_generator:
        return (word_pos_item.toString().split('/') for word_pos_item in HanLP.segment(sentence))
    else:
        return [(word_pos_item.toString().split('/')[0],word_pos_item.toString().split('/')[1]) for word_pos_item in Tokenizer.segment(sentence)]      


def seg_sentences(sentence,with_filter=True,return_generator=False):  
    segs = to_string(sentence,return_generator=return_generator)
    if with_filter:
        g = [word_pos_pair[0] for word_pos_pair in segs if len(word_pos_pair)==2 and word_pos_pair[0]!=' ' and word_pos_pair[1] not in drop_pos_set]
    else:
        g = [word_pos_pair[0] for word_pos_pair in segs if len(word_pos_pair)==2 and word_pos_pair[0]!=' ']
    return iter(g) if return_generator else g


def cut_hanlp(raw_sentence,return_list=True):
    if len(raw_sentence.strip())>0:
        return to_string(raw_sentence) if return_list else iter(to_string(raw_sentence))


In [1]:
import json,re
import numpy
import itertools
from itertools import chain
from tokenizer import seg_sentences

In [2]:
# 筛选出非字母和汉字的符号
pattern=re.compile(u'[^a-zA-Z\u4E00-\u9FA5]')


class NumpyEncoder(json.JSONEncoder):
    """Json处理numpy对象"""
    def default(self, obj):
        if isinstance(obj, numpy.integer):
            return int(obj)
        elif isinstance(obj, numpy.floating):
            return float(obj)
        elif isinstance(obj, numpy.ndarray):
            return obj.tolist()        
        return json.JSONEncoder.default(self, obj)


def generate_ngram(sentence, n=4, m=2):
    """生成ngram
    n: N-Gram最大的n
    m：N-Gram最小的n
    """
    if len(sentence) < n:
        n = len(sentence)
    
    temp = [
        tuple(sentence[i - k:i]) for k in range(m, n + 1)
        for i in range(k, len(sentence) + 1)
    ]
    
    # 正则过滤掉含有标点符号item
    return [
        item for item in temp if len(''.join(item).strip()) > 1
        and len(pattern.findall(''.join(item).strip())) == 0
    ]

In [3]:
if __name__ == "__main__":
    # 按字为单位
    # "RESUMEDOCSSTARTFLAG" 在text文档中表示文件开始
    copus_character = [
        generate_ngram(line.strip())
        for line in open('text.txt', 'r', encoding='utf8')
        if len(line.strip()) > 0 and "RESUMEDOCSSTARTFLAG" not in line
    ]
    
    # 按词为单位：优势在于实际语义保留更完整准确（而不是按字生成N-Gram）
    # 分词后的结果进行统计N-gram
    copus_word = [
        generate_ngram(seg_sentences(line.strip(), with_filter=True))
        for line in open('text.txt', 'r', encoding='utf8')
        if len(line.strip()) > 0 and "RESUMEDOCSSTARTFLAG" not in line
    ]
    # 提高效率
    copus_word = chain.from_iterable(copus_word)
    copus_word = ['_'.join(item) for item in copus_word]
    
    # N-gram频次
    fout = open("ngram2_4.txt", "w", encoding='utf-8')
    dic_filter = {}
    for item in copus_word:
        if item in dic_filter:
            dic_filter[item] += 1
        else:
            dic_filter[item] = 1
    sort_dic = sorted(dic_filter.items(), key=lambda val: val[1], reverse=True)
    
    fout.write(json.dumps(sort_dic, ensure_ascii=False, cls=NumpyEncoder))
    fout.close()

### 基于TF-idf计算N-gram重要度

说明：因以下算法文件读取方式，建议将输入文件的格式预处理为：一行一篇文档。

一行表示一篇文档。

In [7]:
from sklearn.feature_extraction.text import CountVectorizer  
from sklearn.feature_extraction.text import TfidfTransformer

from tokenizer import seg_sentences

In [8]:
def _replace_c(text):
    intab = ",?!"
    outtab = "，？！"
    deltab = ")(+_-.>< "
    trantab = text.maketrans(intab, outtab, deltab)
    return text.translate(trantab)


def tokenize_raw(text):
    """将原文档按标点符号分割"""
    split_sen = (i.strip()
                 for i in re.split('。|,|，|：|:|？|！|\t|\n', _replace_c(text))
                 if len(i.strip()) > 5)
    return [seg_sentences(sentence) for sentence in split_sen]


def list_2_ngram(sentence, n=4, m=2):
    """生成ngram
    n: N-Gram最大的n
    m：N-Gram最小的n
    """
    if len(sentence) < n:
        n = len(sentence)
    temp = [
        tuple(sentence[i - k:i]) for k in range(m, n + 1)
        for i in range(k,
                       len(sentence) + 1)
    ]
    
    return [
        item for item in temp if len(''.join(item).strip()) > 1
        and len(pattern.findall(''.join(item).strip())) == 0
    ]

In [9]:
if __name__ == "__main__":

    copus = [
        tokenize_raw(line.strip())
        for line in open('text.txt', 'r', encoding='utf8')
        if len(line.strip()) > 0 and "RESUMEDOCSSTARTFLAG" not in line
    ]
    
    doc = []
    if len(copus) > 1:
        for list_copus in copus:
            for t in list_copus:
                doc.extend([
                    ' '.join(['_'.join(i) for i in list_2_ngram(t, n=4, m=2)])
                ])
    doc = list(filter(None, doc))
    
    fout = open("ngram2_4_tfidf.txt", "w", encoding='utf-8')
    
    vectorizer1 = CountVectorizer()   # 向量化统计词频
    transformer = TfidfTransformer()  # 该类会统计每个词语的tf-idf权值
    
    freq1 = vectorizer1.fit_transform(doc)   # shape：（doc，word）
    tfidf = transformer.fit_transform(freq1)
    
    # 每个word的词频在每个doc的值的和（word在此处就是n-gram）
    word_freq = [freq1.getcol(i).sum() for i in range(freq1.shape[1])]
    
    # 每个word的tf-idf在每个doc的值的和（word在此处就是n-gram）
    tfidf_sum = [tfidf.getcol(i).sum() for i in range(tfidf.shape[1])]
    
    tfidf_dic = vectorizer1.vocabulary_   # 字典化
    tfidf_dic = dict(zip(tfidf_dic.values(), tfidf_dic.keys())) 
    
    dic_filter = {}
    def _add(wq, tf, i):
        """保存｛n-gram：（词频，tfidf）｝
        tfidf在此处准确来讲是该词在各个文档中的tfidf之和
        """
        dic_filter[tfidf_dic[i]] = [wq, tf]
    
    # 统计每个word全局的word_freq，tfidf_sum
    for i, (word_freq_one, w_one) in enumerate(zip(word_freq, tfidf_sum)):
        _add(word_freq_one, w_one, i)
    
    sort_dic = sorted(dic_filter.items(), key=lambda val: val[1],
               reverse=True)

    fout.write(json.dumps(sort_dic, ensure_ascii=False, cls=NumpyEncoder))
    fout.close()