In [1]:
import os
import sys
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
sys.path.insert(0, os.path.join(BASE_DIR))

from offline import SparkSessionBase

# 生成spark Session对象

In [2]:
class OriginArticleData(SparkSessionBase):
    
    SPARK_APP_NAME = "mergeArticle"
    ENABLE_HIVE_SUPPORT = True
    SPARK_EXECUTOR_MEMORY = "4g"
    
    def __init__(self):
        self.spark = self._create_spark_session()

In [3]:
oa = OriginArticleData()

# 读取hive中的文章数据

In [4]:
oa.spark.sql("use fytang")
article_data = oa.spark.sql("select * from article_data limit 5")

# 将文章进行分词

In [5]:
def segmentation(partition):
    import os
    import re
    import jieba
    import jieba.analyse
    import jieba.posseg as pseg
    import codecs
    
    abspath = "/Users/hycao/text"
    
    userDict_path = os.path.join(abspath, "ITKeywords.txt")
    jieba.load_userdict(userDict_path)
    stopwords_path = os.path.join(abspath, "stopwords.txt")
    
    def cut_sentence(sentence):
        seg_list = pseg.lcut(sentence)
        seg_list = [i for i in seg_list if i.flag not in stopwords_path]
        filtered_words_list = []
        for seg in seg_list:
            if len(seg.word) <= 1:
                continue
            elif seg.flag == "eng":
                if len(seg.word) <= 2:
                    continue
                else:
                    filtered_words_list.append(seg.word)
            elif seg.flag.startswith("n"):
                filtered_words_list.append(seg.word)
            elif seg.flag in ["X", "eng"]:
                filtered_words_list.append(seg.word)
        return filtered_words_list
    
    for row in partition:
        sentence = re.sub("<.*?>", "", row.sentence)
        words = cut_sentence(sentence)
        yield row.article_id, row.channel_id, words
        

In [6]:
words_df = article_data.rdd.mapPartitions(segmentation).toDF(["article_id", "channel_id", "words"])
words_df.show()

+----------+----------+--------------------+
|article_id|channel_id|               words|
+----------+----------+--------------------+
|         1|        17|[Vue, props, 用法, ...|
|         2|        17|                  []|
|         3|        17|[JavaScript, 区别, ...|
|         4|        17|[vue2, vuex, elem...|
|         5|        17|[immutability, Re...|
+----------+----------+--------------------+



In [17]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="words", outputCol="countFeatures", vocabSize=200*10000, minDF=1.0)
cv_model = cv.fit(words_df)
cv_model.write().overwrite().save("hdfs://localhost:9000/fytang/models/CV.model")

In [20]:
from pyspark.ml.feature import CountVectorizerModel
cv_model = CountVectorizerModel.load("hdfs://localhost:9000/fytang/models/CV.model")
cv_result = cv_model.transform(words_df)

In [21]:
from pyspark.ml.feature import IDF

idf = IDF(inputCol = "countFeatures", outputCol = "idfFeature")
idfmodel = idf.fit(cv_result)
idfmodel.write().overwrite().save("hdfs://localhost:9000/fytang/models/IDF2.model")


In [23]:
cv_model.vocabulary

['数据',
 '组件',
 'child',
 'let',
 'obj',
 'div',
 'log',
 'console',
 'props',
 'msg',
 'forChildMsg',
 'obj2',
 'for',
 'mongodb',
 'var',
 'ownChildMsg',
 'childNode',
 '数组',
 'key',
 'immutable',
 'data',
 'name',
 'array',
 'template',
 'return',
 '对象',
 'update',
 '项目',
 'children',
 'JSON',
 'arr',
 'function',
 'value',
 '用法',
 'the',
 '方法',
 'mongod',
 '首席',
 '苏南',
 '原始数据',
 'Object',
 'set',
 'Vue',
 'data2',
 'rentNode',
 'object',
 '拷贝',
 'Array',
 '文章',
 '定义',
 '系统',
 'rent',
 '文件夹',
 'String',
 '数据库',
 'remove',
 'prop',
 '字符串',
 'class',
 'stringify',
 'apply',
 'rawObj',
 'bin',
 'components',
 '总结',
 'add',
 'vue2',
 'splice',
 'amp',
 '属性',
 '函数',
 'tougu',
 'Number',
 'element',
 'this',
 'rse',
 'API',
 'merge',
 'model',
 'input',
 '方式',
 'val',
 '命令',
 'address',
 'deepclone',
 '模板',
 '类型',
 'unset',
 'windows',
 'ssign',
 '警告',
 'unshift',
 'original',
 'type',
 'ShenZhen',
 'Map',
 'hobby',
 'nodejs',
 'new',
 'validator',
 'copyObj',
 'from',
 '传递数据',
 'Set',
 'd

In [24]:
idfmodel.idf.toArray()[:20]

array([0.40546511, 1.09861229, 1.09861229, 0.40546511, 0.69314718,
       1.09861229, 0.18232156, 0.40546511, 1.09861229, 1.09861229,
       1.09861229, 1.09861229, 0.40546511, 1.09861229, 0.40546511,
       1.09861229, 1.09861229, 0.69314718, 0.69314718, 1.09861229])

In [25]:
from pyspark.ml.feature import CountVectorizerModel
cv_model = CountVectorizerModel.load("hdfs://localhost:9000/fytang/models/CV.model")
from pyspark.ml.feature import IDFModel
idf_model = IDFModel.load("hdfs://localhost:9000/fytang/models/IDF2.model")
cv_result = cv_model.transform(words_df)
tfidf_result = idf_model.transform(cv_result)

In [26]:
tfidf_result.show()

+----------+----------+--------------------+--------------------+--------------------+
|article_id|channel_id|               words|       countFeatures|          idfFeature|
+----------+----------+--------------------+--------------------+--------------------+
|         1|        17|[Vue, props, 用法, ...|(529,[0,1,2,3,5,6...|(529,[0,1,2,3,5,6...|
|         2|        17|                  []|         (529,[],[])|         (529,[],[])|
|         3|        17|[JavaScript, 区别, ...|(529,[3,4,6,7,12,...|(529,[3,4,6,7,12,...|
|         4|        17|[vue2, vuex, elem...|(529,[0,6,13,27,3...|(529,[0,6,13,27,3...|
|         5|        17|[immutability, Re...|(529,[0,3,4,6,7,1...|(529,[0,3,4,6,7,1...|
+----------+----------+--------------------+--------------------+--------------------+



In [29]:
def func(partition):
    TOPK = 20
    for row in partition:
        _ = list(zip(row.idfFeature.indices, row.idfFeature.values))
        _ = sorted(_, key=lambda x : x[1], reverse=True)
        result = _[:TOPK]
        for words_index, tfidf in result:
            yield row.article_id, row.channel_id, int(words_index), round(float(tfidf), 4)
tfidf_index_values = tfidf_result.rdd.mapPartitions(func).toDF(["article_id", "channel_id", "index", "tfidf"])
tfidf_index_values.show()

+----------+----------+-----+-------+
|article_id|channel_id|index|  tfidf|
+----------+----------+-----+-------+
|         1|        17|    1|60.4237|
|         1|        17|    2|49.4376|
|         1|        17|    5|43.9445|
|         1|        17|    8| 34.057|
|         1|        17|    9|26.3667|
|         1|        17|   10|25.2681|
|         1|        17|   15| 19.775|
|         1|        17|   16| 19.775|
|         1|        17|   23|17.5778|
|         1|        17|    0|10.1366|
|         1|        17|   42| 9.8875|
|         1|        17|   44| 9.8875|
|         1|        17|   51| 8.7889|
|         1|        17|   53| 8.7889|
|         1|        17|   56| 8.7889|
|         1|        17|   58| 7.6903|
|         1|        17|   12| 7.2984|
|         1|        17|   63| 6.5917|
|         1|        17|   69| 6.5917|
|         1|        17|   78| 5.4931|
+----------+----------+-----+-------+
only showing top 20 rows



In [41]:
def textrank(partition):
    import os
    import jieba
    import jieba.analyse
    import jieba.posseg as pseg
    import codecs
    
    abspath = "/Users/hycao/text"
    userDict_path = os.path.join(abspath, "ITKeywords.txt")
    jieba.load_userdict(userDict_path)
    stopwords_path = os.path.join(abspath, "stopwords.txt")
    
    def get_stopwords_list():
        stopwords_list = [i.strip()
                           for i in codecs.open(stopwords_path).readlines()
                         ]
        return stopwords_list
    
    stopwords_list = get_stopwords_list()
    
    class TextRank(jieba.analyse.TextRank):
        def __init__(self, windows=20, word_min_len=2):
            super(TextRank, self).__init__()
            self.span = windows
            self.word_min_len = word_min_len
            self.pos_filt = frozenset(('n', 'x', 'eng', 'f', 's', 't', 'nr', 'ns', 'nt', 'nw', 'nz', 'PER', 'LOC', "ORG"))
            
        def pairfilter(self, wp):
            if wp.flag == "eng":
                if len(wp.word) <= 2:
                    return False
                if wp.flag in self.pos_filt and len(wp.word.strip()) >= self.word_min_len and wp.word.lower() not in stopwords_list:
                    return True
        
    textrank_model = TextRank(windows=5, word_min_len=2
                             )
    allwPOS = ('n', 'x', 'eng', 'nr', 'ns', 'nt', 'nw', 'nz', 'c')
    for row in partition:
        tags = textrank_model.textrank(row.sentence, topK=20, withWeight=True, allowPOS =allwPOS , withFlag=False)
        for tag in tags:
            yield row.article_id, row.channel_id, tag[0], tag[1]

In [42]:
textrank_keywords_df = article_data.rdd.mapPartitions(textrank).toDF(["article_id", "channel_id", "keyword", "textrank"])

In [43]:
textrank_keywords_df.show()

+----------+----------+-----------+-------------------+
|article_id|channel_id|    keyword|           textrank|
+----------+----------+-----------+-------------------+
|         1|        17|        msg|                1.0|
|         1|        17|       code| 0.6678609841743154|
|         1|        17|      child| 0.6665863972022452|
|         1|        17|      props|  0.390571729326677|
|         1|        17|     String| 0.3571663376160206|
|         1|        17|        pre|  0.347249917686024|
|         1|        17|      model| 0.3112348130693862|
|         1|        17|       defa| 0.2878898408898146|
|         1|        17|     Number| 0.2862310465838811|
|         1|        17|forChildMsg|   0.28233337644711|
|         1|        17|     return|0.27961147459830155|
|         1|        17|      class|0.27464130361555694|
|         1|        17|  childNode| 0.2725460378336616|
|         1|        17|   computed| 0.2385683350414798|
|         1|        17|     strong|0.23129374583

In [45]:
idf_keywords_values = oa.spark.sql("select * from idf_keywords_values")
idf_keywords_values = idf_keywords_values.withColumnRenamed('keyword', 'keyword1')
keywords_weights = textrank_keywords_df.join(idf_keywords_values, textrank_keywords_df.keyword == idf_keywords_values.keyword1)
keywords_weights.show()

+----------+----------+---------+-------------------+---------+------------------+------+
|article_id|channel_id|  keyword|           textrank| keyword1|               idf| index|
+----------+----------+---------+-------------------+---------+------------------+------+
|         1|        17|childNode| 0.2725460378336616|childNode| 7.869848788205214| 20134|
|         3|        17|      amp|0.29969222129374834|      amp|1.5313880611157102|    18|
|         3|        17|      jpg| 0.5269060219822252|      jpg| 3.486643603011888|   727|
|         4|        17|      jpg|                1.0|      jpg| 3.486643603011888|   727|
|         1|        17|   Number| 0.2862310465838811|   Number|3.9439600931418446|  1060|
|         4|        17|  element| 0.5477235243943445|  element|3.5778397599698883|   494|
|         1|        17| computed| 0.2385683350414798| computed| 5.091381154265656|  2754|
|         4|        17|    touzi|0.44956362539337796|    touzi|11.146993521197391|348908|
|         

In [46]:
keywords_weights = keywords_weights.withColumn('weights', keywords_weights.textrank * keywords_weights.idf).select(["article_id","channel_id","keyword","weights"])
keywords_weights.show()

+----------+----------+---------+-------------------+
|article_id|channel_id|  keyword|            weights|
+----------+----------+---------+-------------------+
|         1|        17|childNode| 2.1448961055753744|
|         3|        17|      amp| 0.4589450896984936|
|         3|        17|      jpg| 1.8371335109327669|
|         4|        17|      jpg|  3.486643603011888|
|         1|        17|   Number| 1.1288838251450513|
|         4|        17|  element| 1.9596670030489227|
|         1|        17| computed| 1.2146423250347254|
|         4|        17|    touzi|  5.011282819625995|
|         5|        17|   upload|  1.043572244555899|
|         1|        17|     code| 1.8179344783232367|
|         3|        17|     code| 0.9625267193815098|
|         5|        17|     code| 2.7220252738236703|
|         5|        17|   helper| 0.6881462563434709|
|         1|        17|      log| 0.4236514065804126|
|         4|        17|      log| 0.6173358842768913|
|         5|        17|     

In [51]:
keywords_weights.registerTempTable("kw")
keywords_list = oa.spark.sql("select article_id, max(channel_id) channel_id, collect_list(keyword) keyword, collect_list(weights) weights from kw group by article_id")

In [52]:
keywords_list.show()

+----------+----------+--------------------+--------------------+
|article_id|channel_id|             keyword|             weights|
+----------+----------+--------------------+--------------------+
|         5|        17|[upload, code, he...|[1.04357224455589...|
|         1|        17|[childNode, Numbe...|[2.14489610557537...|
|         3|        17|[amp, jpg, code, ...|[0.45894508969849...|
|         4|        17|[jpg, element, to...|[3.48664360301188...|
+----------+----------+--------------------+--------------------+



In [53]:
def _func(row):
    return row.article_id, row.channel_id, dict(zip(row.keyword,row.weights))
keywords_info = keywords_list.rdd.map(_func).toDF(['article_id', 'channel_id', 'keywords'])
keywords_info.show()

+----------+----------+--------------------+
|article_id|channel_id|            keywords|
+----------+----------+--------------------+
|         5|        17|Map(pre -> 0.6960...|
|         1|        17|Map(pre -> 1.2732...|
|         3|        17|Map(number -> 0.9...|
|         4|        17|Map(msc -> 2.9346...|
+----------+----------+--------------------+



In [81]:
topic_info = oa.spark.sql("select t.article_id as article_id2, collect_list(t.keyword) topic from tfidf_keywords_values t inner join textrank_keywords_values r where t.keyword = r.keyword group by article_id")

In [82]:
topic_info.show()

+-----------+--------------------+
|article_id2|               topic|
+-----------+--------------------+
|        471|             [title]|
|        496|            [Object]|
|       2659|          [jpg, jpg]|
|       2866|      [class, class]|
|       3794|          [function]|
|       4935|        [data, data]|
|       5518|            [Object]|
|       5803|[Object, function...|
|       7754|               [bin]|
|       7833|  [language, String]|
|       7982|            [String]|
|       8389|    [Object, String]|
|       9376|     [amp, key, key]|
|       9852|[log, log, log, c...|
|      10206|        [data, data]|
|      10362|[log, log, log, f...|
|      11141|      [class, class]|
|      12046|[log, log, log, c...|
|      13289|        [data, data]|
|      13623|            [String]|
+-----------+--------------------+
only showing top 20 rows

