In [1]:
import os
import math
import pickle
import multiprocessing
import logging
import pickle

import numpy as np
import pandas as pd
import scipy.sparse as sp
import pyltp
import jieba
from sklearn.feature_extraction.text import TfidfVectorizer

In [1]:
def parallelize_map(ary, func, num_worker=None, num_partitions=None, cpu_reserve=1):
    """多进程map"""
    if not num_worker:
        num_worker = max(1, multiprocessing.cpu_count() - cpu_reserve)
    print("worker num = {}".format(num_worker))
    if not num_partitions:
        num_partitions = num_worker
    arys = np.array_split(ary, num_partitions)
    del ary
    pool = multiprocessing.Pool(num_worker)
    ret = pool.map(func, arys)
    pool.close()
    pool.join()
    return ret

## 预处理语料

In [4]:
def read_data():
    return pd.read_csv("../res/news_chinese.csv")[lambda x: x["content"].notna()]

In [5]:
data = read_data()

In [6]:
def process_raw_docs(raw_docs):
    """分词"""
    print('process id:', os.getpid())
    docs = []
    for i, raw_doc in enumerate(raw_docs):
        try:
            doc_wods = []
            for line in raw_doc.split("\\n"):
                if line:
                    words = jieba.lcut(line.replace("\u3000", ""))
                    if words:
                        doc_wods.extend(words)
            docs.append(doc_wods)
        except Exception as e:
            print(raw_doc)
    return docs

In [6]:
def preprocess_data():
    data = pd.read_csv("res/news_chinese.csv")[lambda x: x["content"].notna()]

    docs = []
    ret = parallelize_map(data["content"], process_raw_docs)
    for i in ret:
        docs.extend(i)
    with open("corpus", "wb") as f:
        pickle.dump(docs, f)

## 训练模型

In [7]:
def transform(x, model):
    print('process id:', os.getpid())
    return model.transform(x)

def extracting_feature():
    """训练模型"""
    # 加载处理后的语料
    with open("res/news_chinese_corpus.dat", "rb") as f:
        corpus_dat = pickle.load(f)
    stopwords = [x.strip() for x in open("res/stopwords.txt", "r", encoding="utf8")]
    vectorizer = TfidfVectorizer(stop_words=stopwords, token_pattern=r"(?u)\b\w+\b")
    vectorizer.fit(corpus_dat)
    with open("res/news_chinese_corpus_tfidf_fitted.model", "wb") as f:
        # vectorizer = pickle.load(f)`
        pickle.dump(vectorizer, f)

    f = functools.partial(transform, model=vectorizer)
    ret = parallelize_map(corpus_dat, f, num_worker=4)
    X = sp.vstack(ret, format='csr')
    with open("res/X.dat", "wb") as f:
        pickle.dump(X, f)
    y = (data["source"] == "新华社").astype(int)
    return X,y

In [8]:
with open("../res/X.dat", "rb") as f:
    X= pickle.load(f)

In [11]:
y = (data["source"] == "新华社").astype(int)

In [12]:
X.shape

(87054, 246678)

In [13]:
y.shape

(87054,)

In [18]:
data[['feature', "source"]]

Unnamed: 0,feature,source
0,"{""type"":""体育"",""site"":""新华社"",""url"":""http://home.x...",新华社
1,"{""type"":""其它"",""site"":""新华社"",""url"":""http://home.x...",新华社
2,"{""type"":""其它"",""site"":""新华社"",""url"":""http://home.x...",新华社
3,"{""type"":""宏观经济"",""site"":""新华社"",""url"":""http://home...",新华社
4,"{""type"":""冰球"",""site"":""新华社"",""url"":""http://home.x...",新华社
...,...,...
89606,"{""type"":""新闻"",""site"":""网易热门"",""commentNum"":""978"",...",深圳大件事
89607,"{""type"":""国际新闻"",""site"":""环球"",""commentNum"":""0"",""j...",新华社
89608,"{""type"":""科技"",""site"":""cnbeta"",""commentNum"":""18""...",快科技@http://www.kkj.cn/
89609,"{""type"":""科技"",""site"":""cnbeta"",""commentNum"":""15""...",快科技@http://www.kkj.cn/


<87054x256250 sparse matrix of type '<class 'numpy.float64'>'
	with 7540031 stored elements in Compressed Sparse Row format>

0        1
1        1
2        1
3        1
4        1
        ..
89606    0
89607    1
89608    0
89609    0
89610    0
Name: source, Length: 87054, dtype: int32

In [80]:
vectorizer = TfidfVectorizer()

In [81]:
?vectorizer

In [204]:
pool = multiprocessing.Pool(32)

In [206]:
pool.terminate()

In [2]:
 x = np.arange(9.0)

In [5]:
np.array_split(x, 4)

[array([0., 1., 2.]), array([3., 4.]), array([5., 6.]), array([7., 8.])]

In [221]:
num_cores = multiprocessing.cpu_count()
num_partitions = num_cores-2 # I like to leave some cores for other
#processes
print(num_partitions)

def parallelize_dataframe(df, func):
    a = np.array_split(df, num_partitions)
    del df
    pool = multiprocessing.Pool(num_cores)
    #df = pd.concat(pool.map(func, [a,b,c,d,e]))
    df = sp.vstack(pool.map(func, a), format='csr')
    pool.close()
    pool.join()
    return df
 
def test_func(data):
    #print("Process working on: ",data)
#     tfidf_matrix = tfidf_vectorizer.transform(data["text"])
    #return pd.DataFrame(tfidf_matrix.toarray())
#     return tfidf_matrix
    return vectorizer.fit_transform(data)

30


In [None]:
XX = vectorizer.fit_transform(corpus_dat)

In [None]:
%time XX = parallelize_dataframe(corpus_dat, test_func)

In [None]:
%time XXX = parallelize_dataframe(corpus_dat, test_func)

In [213]:
X.sum()

543589.5708239416