### 计算TFIDF
#### 1、先计算分词之后的每篇文章的词频，得到CV模型
#### 2、然后根据词频计算IDF以及词，得到IDF模型

In [1]:
import os 
import sys
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(0, os.path.join(BASE_DIR))
PYSPARK_PYTHON = "/miniconda2/envs/reco_sys/bin/python"

os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

from offline import SparkSessionBase

class KeywordsToTfidf(SparkSessionBase):
    SPARK_APP_NAME = "keywordsByTFIDF"
    SPARK_EXECUTOR_MEMORY = "7g"
    ENABLE_HIVE_SUPPORT = True

    def __init__(self):
        self.spark = self._create_spark_session()

In [2]:
ktt = KeywordsToTfidf()

In [3]:
# 分词
def segmentation(partition):
    import os
    import re

    import jieba
    import jieba.analyse
    import jieba.posseg as pseg
    import codecs

    abspath = "/root/words"

    # 结巴加载用户词典
    userDict_path = os.path.join(abspath, "ITKeywords.txt")
    jieba.load_userdict(userDict_path)

    # 停用词文本
    stopwords_path = os.path.join(abspath, "stopwords.txt")

    def get_stopwords_list():
        """返回stopwords列表"""
        stopwords_list = [i.strip()
                          for i in codecs.open(stopwords_path).readlines()]
        return stopwords_list

    # 所有的停用词列表
    stopwords_list = get_stopwords_list()

    # 分词
    def cut_sentence(sentence):
        """对切割之后的词语进行过滤，去除停用词，保留名词，英文和自定义词库中的词，长度大于2的词"""
        # print(sentence,"*"*100)
        # eg:[pair('今天', 't'), pair('有', 'd'), pair('雾', 'n'), pair('霾', 'g')]
        seg_list = pseg.lcut(sentence)
        seg_list = [i for i in seg_list if i.flag not in stopwords_list]
        filtered_words_list = []
        for seg in seg_list:
            # print(seg)
            if len(seg.word) <= 1:
                continue
            elif seg.flag == "eng":
                if len(seg.word) <= 2:
                    continue
                else:
                    filtered_words_list.append(seg.word)
            elif seg.flag.startswith("n"):
                filtered_words_list.append(seg.word)
            elif seg.flag in ["x", "eng"]:  # 是自定一个词语或者是英文单词
                filtered_words_list.append(seg.word)
        return filtered_words_list

    for row in partition:
        sentence = re.sub("<.*?>", "", row.sentence)    # 替换掉标签数据
        words = cut_sentence(sentence)
        yield row.article_id, row.channel_id, words

In [4]:
ktt.spark.sql("use article")
article_dataframe = ktt.spark.sql("select * from article_data limit 20")
words_df = article_dataframe.rdd.mapPartitions(segmentation).toDF(["article_id", "channel_id", "words"])

In [5]:
type(article_dataframe.rdd)

pyspark.rdd.RDD

In [6]:
article_dataframe.show()

+----------+----------+------------+--------------------+--------------------+--------------------+
|article_id|channel_id|channel_name|               title|             content|            sentence|
+----------+----------+------------+--------------------+--------------------+--------------------+
|         1|        17|          前端|     Vue props用法小结原荐|<p><strong>Vue pr...|前端,Vue props用法小结原...|
|         2|        17|          前端|vue.js响应式原理解析与实现—...|<p>上次我们已经分析了vue.j...|前端,vue.js响应式原理解析与...|
|         3|        17|          前端|JavaScript中浅拷贝和深拷...|<p>要理解 JavaScript...|前端,JavaScript中浅拷贝...|
|         4|        17|          前端|基于vue2.0 +vuex+ e...|<p>效果演示地址,</p><p>...|前端,基于vue2.0 +vuex...|
|         5|        17|          前端|immutability因Reac...|<p><img src="http...|前端,immutability因R...|
|         6|        17|          前端|简单了解 node npm cnp...|<span id="OSC_h1_...|前端,简单了解 node npm ...|
|         7|        17|          前端|       Web工程师以太坊入门原荐|<p>我经常构建使用以太坊的Web...|前端,Web工程师以太坊入门原荐,...|


In [7]:
words_df.show()

+----------+----------+--------------------+
|article_id|channel_id|               words|
+----------+----------+--------------------+
|         1|        17|[Vue, props, 用法, ...|
|         2|        17|[vue, 响应式, 原理, mo...|
|         3|        17|[JavaScript, 浅拷贝,...|
|         4|        17|[vue2, vuex, elem...|
|         5|        17|[immutability, Re...|
|         6|        17|[node, npm, cnpm,...|
|         7|        17|[Web, 工程师, 以太坊, 入...|
|         8|        17|[Web, pa, api, we...|
|         9|        17|[vue, 中用, 数据驱动, 视...|
|        10|        17|[程序, WebSocket, 长...|
|        11|        17|[flux, 架构, flux, ...|
|        12|        17|[合格, TypeScript, ...|
|        13|        17|[专属, 插件, Easy, Sl...|
|        14|        17|[前后端分离, vue, 网站前台...|
|        15|        17|[ajax, 页面, 重复提交, ...|
|        17|        17|[JSsearch, 购物网站, ...|
|        18|        17|[web, pa, react, ...|
|        19|        17|[合格, 事顶, 项目, 自我介绍...|
|        20|        17|[总结, jQuery, 用法, ...|
|        2

### 计算词频

- 只有20行的数据

In [None]:
# 词语与词频统计
from pyspark.ml.feature import CountVectorizer
# 总词汇的大小，文本中必须出现的次数
cv = CountVectorizer(inputCol="words", outputCol="countFeatures", vocabSize=200*10000, minDF=1.0)
# 训练词频统计模型
cv_model = cv.fit(words_df)
cv_model.write().overwrite().save("hdfs://hadoop-master:9000/headlines/models/CV.model")

### 训练idf模型，保存

In [8]:
# 词语与词频统计
from pyspark.ml.feature import CountVectorizerModel
cv_model = CountVectorizerModel.load("hdfs://hadoop-master:9000/headlines/models/CV.model")
# 得出词频向量结果
cv_result = cv_model.transform(words_df)

In [9]:
# 训练IDF模型
from pyspark.ml.feature import IDF
idf = IDF(inputCol="countFeatures", outputCol="idfFeatures")
idfModel = idf.fit(cv_result)
# idfModel.write().overwrite().save("hdfs://hadoop-master:9000/headlines/models/IDF.model")

In [10]:
import pyspark.spark.sparkContext as sc
keywords_list_with_idf = list(zip(cv_model.vocabulary, idf_model.idf.toArray()))
def func(data):
    for index in range(len(data)):
        print(1,data)
        data[index] = list(data[index])
        data[index].append(index)
        data[index][1] = float(data[index][1])
        print(2, data)
        break;
func(keywords_list_with_idf)
# sc = spark.sparkContext
rdd = sparkContext.parallelize(keywords_list_with_idf)
df = rdd.toDF(["keywords", "idf", "index"])

# 保存
# df.write.insertInto('idf_keywords_values')

ModuleNotFoundError: No module named 'pyspark.spark'

### 训练TFIDF模型

In [11]:
from pyspark.ml.feature import CountVectorizerModel
cv_model = CountVectorizerModel.load("hdfs://hadoop-master:9000/headlines/models/countVectorizerOfArticleWords.model")
from pyspark.ml.feature import IDFModel
idf_model = IDFModel.load("hdfs://hadoop-master:9000/headlines/models/IDFOfArticleWords.model")
cv_result = cv_model.transform(words_df)
tfidf_result = idf_model.transform(cv_result)


# tfidf_result的idfFeatures列,其中结构位SparseVector(length, {word_index:tfidf_fvalue})
def func(partition):
    TOPK = 20
    for row in partition:
        # 找到索引与IDF值并进行排序
        _ = list(zip(row.idfFeatures.indices, row.idfFeatures.values))
        _ = sorted(_, key=lambda x: x[1], reverse=True)
        result = _[:TOPK]
        for word_index, tfidf in result:
            yield row.article_id, row.channel_id, int(word_index), round(float(tfidf), 4)

_keywordsByTFIDF = tfidf_result.rdd.mapPartitions(func).toDF(["article_id", "channel_id", "index", "tfidf"])

In [12]:
cv_model.vocabulary

['&#',
 'pa',
 'ul',
 '代码',
 '方法',
 '数据',
 'return',
 '对象',
 'name',
 'this',
 'public',
 'int',
 'new',
 '问题',
 '函数',
 '.a',
 'class',
 '文件',
 'amp',
 'com',
 '元素',
 'function',
 '用户',
 'String',
 '内容',
 '时候',
 'var',
 '参数',
 'for',
 '属性',
 '方式',
 'void',
 'data',
 '__',
 'the',
 '技术',
 'value',
 '结果',
 '信息',
 'log',
 '定义',
 '类型',
 '项目',
 '时间',
 'import',
 'div',
 '线程',
 '系统',
 '情况',
 'print',
 '节点',
 '.h',
 'http',
 'self',
 '功能',
 'key',
 'true',
 'res',
 'type',
 'java',
 '数组',
 '数据库',
 '组件',
 '过程',
 'console',
 'https',
 'index',
 '页面',
 '状态',
 '程序',
 'get',
 'test',
 '服务器',
 'static',
 'from',
 '命令',
 'private',
 'else',
 '字符串',
 'org',
 '模块',
 'out',
 '模型',
 '总结',
 'System',
 '算法',
 '目录',
 'false',
 '消耗',
 'list',
 '部分',
 'Java',
 '实例',
 'www',
 'const',
 'root',
 '任务',
 'size',
 '大家',
 'text',
 'url',
 '版本',
 '浏览器',
 '内存',
 'main',
 'Python',
 '事件',
 '文章',
 'user',
 'and',
 'string',
 '模式',
 '列表',
 '图片',
 '地址',
 '公司',
 '索引',
 'let',
 'html',
 'println',
 '客户端',
 'Object',
 '资源'

In [13]:
idf_model.idf.toArray()

array([ 1.41782959,  0.66513853,  0.80705912, ..., 11.14699352,
       11.14699352, 11.14699352])

In [14]:
_keywordsByTFIDF.show()

+----------+----------+------+--------+
|article_id|channel_id| index|   tfidf|
+----------+----------+------+--------+
|         1|        17| 96675|256.3809|
|         1|        17|115370|200.6459|
|         1|        17|   515|174.1541|
|         1|        17| 20134|141.6573|
|         1|        17|   591|126.9896|
|         1|        17|    62|125.4149|
|         1|        17|    45|101.2453|
|         1|        17|   391| 85.5727|
|         1|        17|   491| 56.6656|
|         1|        17|  5650| 51.6382|
|         1|        17|  1465| 37.7835|
|         1|        17|415962|  33.441|
|         1|        17|426351|  33.441|
|         1|        17|   347| 31.9532|
|         1|        17|151480| 28.0657|
|         1|        17|   314| 26.1522|
|         1|        17|  1353| 25.2814|
|         1|        17|  5808|  23.998|
|         1|        17|     5| 23.4382|
|         1|        17|    28| 22.5731|
+----------+----------+------+--------+
only showing top 20 rows



In [15]:
cv_result.show()

+----------+----------+--------------------+--------------------+
|article_id|channel_id|               words|       countFeatures|
+----------+----------+--------------------+--------------------+
|         1|        17|[Vue, props, 用法, ...|(1234544,[1,2,3,5...|
|         2|        17|[vue, 响应式, 原理, mo...|(1234544,[1,2,3,4...|
|         3|        17|[JavaScript, 浅拷贝,...|(1234544,[0,1,2,4...|
|         4|        17|[vue2, vuex, elem...|(1234544,[1,2,4,5...|
|         5|        17|[immutability, Re...|(1234544,[1,2,4,5...|
|         6|        17|[node, npm, cnpm,...|(1234544,[0,1,2,3...|
|         7|        17|[Web, 工程师, 以太坊, 入...|(1234544,[1,2,3,4...|
|         8|        17|[Web, pa, api, we...|(1234544,[1,2,3,1...|
|         9|        17|[vue, 中用, 数据驱动, 视...|(1234544,[1,3,4,5...|
|        10|        17|[程序, WebSocket, 长...|(1234544,[1,3,7,9...|
|        11|        17|[flux, 架构, flux, ...|(1234544,[1,4,5,6...|
|        12|        17|[合格, TypeScript, ...|(1234544,[1,2,3,4...|
|        1

In [16]:
# 测试tfidf_result的属性
print(type(tfidf_result))
df_tfidf_result = tfidf_result.toPandas()
len(list(df_tfidf_result.idfFeatures[0]))

<class 'pyspark.sql.dataframe.DataFrame'>


1234544

In [17]:
# 利用结果索引与”idf_keywords_values“合并词 
keywordsIndex = ktt.spark.sql("select keyword, index idx from idf_keywords_values")
# 利用结果索引与”idf_keywords_values“合并词
keywordsByTFIDF = _keywordsByTFIDF.join(keywordsIndex, keywordsIndex.idx == _keywordsByTFIDF.index).select(["article_id", "channel_id", "keyword", "tfidf"])
# keywordsByTFIDF.write.insertInto("tfidf_keywords_values")

In [18]:
keywordsByTFIDF.show()

+----------+----------+---------+--------+
|article_id|channel_id|  keyword|   tfidf|
+----------+----------+---------+--------+
|         3|        17|      var| 22.4813|
|         7|        17|      var| 41.2158|
|        13|        17|      var| 74.9377|
|        21|        17|      var| 14.9875|
|        11|        17|  Actions| 98.2025|
|        13|        17|barNumber|  55.735|
|        18|        17|   loader|125.6564|
|         7|        17|     uint| 29.6332|
|         3|        17|      堆内存|  9.8193|
|        17|        17| document| 16.8342|
|         9|        17|  caidan2| 10.7415|
|         3|        17|       &#| 41.1171|
|        12|        17|    Watch| 42.4374|
|        20|        17|  closest| 33.8248|
|         4|        17|      cmd| 15.7818|
|        10|        17|       石头| 44.6937|
|        18|        17|      mod| 42.2456|
|         2|        17|     元素节点|  83.279|
|        20|        17|     提交表单|  27.922|
|         1|        17|      Vue| 31.9532|
+----------

### TextRank

In [19]:
def textrank(partition):
    import os

    import jieba
    import jieba.analyse
    import jieba.posseg as pseg
    import codecs

    abspath = "/root/words"

    # 结巴加载用户词典
    userDict_path = os.path.join(abspath, "ITKeywords.txt")
    jieba.load_userdict(userDict_path)

    # 停用词文本
    stopwords_path = os.path.join(abspath, "stopwords.txt")

    def get_stopwords_list():
        """返回stopwords列表"""
        stopwords_list = [i.strip()
                          for i in codecs.open(stopwords_path).readlines()]
        return stopwords_list

    # 所有的停用词列表
    stopwords_list = get_stopwords_list()

    class TextRank(jieba.analyse.TextRank):
        def __init__(self, window=20, word_min_len=2):
            super(TextRank, self).__init__()
            self.span = window  # 窗口大小
            self.word_min_len = word_min_len  # 单词的最小长度
            # 要保留的词性，根据jieba github ，具体参见https://github.com/baidu/lac
            self.pos_filt = frozenset(
                ('n', 'x', 'eng', 'f', 's', 't', 'nr', 'ns', 'nt', "nw", "nz", "PER", "LOC", "ORG"))

        def pairfilter(self, wp):
            """过滤条件，返回True或者False"""

            if wp.flag == "eng":
                if len(wp.word) <= 2:
                    return False

            if wp.flag in self.pos_filt and len(wp.word.strip()) >= self.word_min_len \
                    and wp.word.lower() not in stopwords_list:
                return True
    # TextRank过滤窗口大小为5，单词最小为2
    textrank_model = TextRank(window=5, word_min_len=2)
    allowPOS = ('n', "x", 'eng', 'nr', 'ns', 'nt', "nw", "nz", "c")

    for row in partition:
        tags = textrank_model.textrank(row.sentence, topK=20, withWeight=True, allowPOS=allowPOS, withFlag=False)
        for tag in tags:
            yield row.article_id, row.channel_id, tag[0], tag[1]

In [20]:
# 计算TextRank
textrank_keywords_df = article_dataframe.rdd.mapPartitions(textrank).toDF(
["article_id", "channel_id", "keyword", "textrank"])

# textrank_keywords_df.write.insertInto("textrank_keywords_values")

In [21]:
textrank_keywords_df.show()

+----------+----------+-----------+-------------------+
|article_id|channel_id|    keyword|           textrank|
+----------+----------+-----------+-------------------+
|         1|        17|         组件|                1.0|
|         1|        17|      props| 0.5154370285370792|
|         1|        17|        msg| 0.4702870805040915|
|         1|        17|         数据|0.45582871346014814|
|         1|        17|      child|0.31296871088663686|
|         1|        17|     strong| 0.3089686862986876|
|         1|        17|       code| 0.3032954542098871|
|         1|        17|        Vue|0.24087919593391022|
|         1|        17|         pa|0.22048638072881815|
|         1|        17|         ul| 0.2018632319447092|
|         1|        17|  childNode|0.19610401758526286|
|         1|        17|     String|0.17134793062324802|
|         1|        17|forChildMsg| 0.1668240799844303|
|         1|        17|       defa| 0.1655549274585362|
|         1|        17|        pre|0.15524657279

### 文章画像结果

In [22]:
idf = ktt.spark.sql("select * from idf_keywords_values")
idf = idf.withColumnRenamed("keyword", "keyword1")
result = textrank_keywords_df.join(idf,textrank_keywords_df.keyword==idf.keyword1)
keywords_res = result.withColumn("weights", result.textrank * result.idf).select(["article_id", "channel_id", "keyword", "weights"])

In [23]:
result.show()

+----------+----------+---------+-------------------+---------+------------------+------+
|article_id|channel_id|  keyword|           textrank| keyword1|               idf| index|
+----------+----------+---------+-------------------+---------+------------------+------+
|         2|        17|    input|  0.298715457651815|    input|2.5936612831652797|   139|
|         1|        17|childNode|0.19610401758526286|childNode| 7.869848788205214| 20134|
|        12|        17|   import| 0.2446905384177029|   import|1.9502451027406746|    44|
|        10|        17|      amp| 0.6220890404292903|      amp|1.5313880611157102|    18|
|        12|        17|      amp| 0.1938349932225792|      amp|1.5313880611157102|    18|
|        13|        17|      amp| 0.2704438727699735|      amp|1.5313880611157102|    18|
|        20|        17| dragsort|0.19898628640236093| dragsort|10.741528413089226|153844|
|         3|        17|      jpg| 0.5186416300247046|      jpg| 3.486643603011888|   727|
|         

In [24]:
keywords_res.show()

+----------+----------+---------+-------------------+
|article_id|channel_id|  keyword|            weights|
+----------+----------+---------+-------------------+
|         2|        17|    input| 0.7747667171945103|
|         1|        17|childNode|  1.543308965155555|
|        12|        17|   import|  0.477206524236104|
|        10|        17|      amp| 0.9526597294643435|
|        12|        17|      amp| 0.2968365944475024|
|        13|        17|      amp|0.41415451796183356|
|        20|        17| dragsort| 2.1374168492060703|
|         3|        17|      jpg| 1.8083185215812947|
|         4|        17|      jpg|  2.223011841251718|
|         6|        17|      jpg| 1.4291555587143352|
|         8|        17|      jpg| 0.9015062380497989|
|         2|        17|  textReg| 3.1675605534945737|
|        11|        17|       事件| 0.8244571284607454|
|        20|        17|       事件|   1.17658632184865|
|        20|        17|       h5|  4.757592132402036|
|         2|        17| frag

In [25]:
keywords_res.registerTempTable("temptable")
merge_keywords = ktt.spark.sql("select article_id, min(channel_id) channel_id, collect_list(keyword) keywords, collect_list(weights) weights from temptable group by article_id")

# 合并关键词权重合并成字典
def _func(row):
    return row.article_id, row.channel_id, dict(zip(row.keywords, row.weights))

keywords_info = merge_keywords.rdd.map(_func).toDF(["article_id", "channel_id", "keywords"])

In [26]:
keywords_info.show()

+----------+----------+--------------------+
|article_id|channel_id|            keywords|
+----------+----------+--------------------+
|        19|        17|Map(pre -> 1.4355...|
|         7|        17|Map(pre -> 1.1325...|
|         6|        17|Map(pre -> 1.2026...|
|         9|        17|Map(pre -> 1.4452...|
|        17|        17|Map(static -> 1.1...|
|         5|        17|Map(pre -> 0.8062...|
|         1|        17|Map(pre -> 0.5692...|
|        10|        17|Map(源码 -> 1.08268...|
|         3|        17|Map(对象 -> 1.04255...|
|        12|        17|Map(pre -> 1.0007...|
|         8|        17|Map(pre -> 1.2047...|
|        11|        17|Map(pre -> 1.4338...|
|         2|        17|Map(test -> 0.711...|
|         4|        17|Map(jpg -> 2.2230...|
|        13|        17|Map(border -> 0.8...|
|        18|        17|Map(pre -> 0.7541...|
|        14|        17|Map(角色 -> 1.48177...|
|        21|        17|Map(modal -> 6.13...|
|        15|        17|Map(window -> 1.3...|
|        2

In [None]:
keywords_info.toPandas()

In [28]:
topic_sql = '''
    select t.article_id aid, collect_set(t.keyword) topics from tfidf_keywords_values  t
    inner join 
    textrank_keywords_values r
    where t.keyword = r.keyword
    group by aid
'''

article_topics = ktt.spark.sql(topic_sql)

In [29]:
article_topics.show()

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/miniconda2/envs/reco_sys/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 3267, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-29-3fb2161d0a43>", line 1, in <module>
    article_topics.show()
  File "/miniconda2/envs/reco_sys/lib/python3.6/site-packages/pyspark/sql/dataframe.py", line 336, in show
    print(self._jdf.showString(n, 20))
  File "/miniconda2/envs/reco_sys/lib/python3.6/site-packages/py4j/java_gateway.py", line 1255, in __call__
    answer = self.gateway_client.send_command(command)
  File "/miniconda2/envs/reco_sys/lib/python3.6/site-packages/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/miniconda2/envs/reco_sys/lib/python3.6/site-packages/py4j/java_gateway.py", line 1152, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/miniconda2/envs/reco_sys/lib/python3.6/socket.py", line 586

KeyboardInterrupt: 