In [21]:
from pyspark.sql.types import ArrayType, StructField, StructType, DoubleType, StringType
from pyspark.sql import Row
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel
from pyspark.ml.clustering import LDA, LDAModel
from pyspark.sql.functions import udf
from pyspark.sql import functions as f
from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.mllib.linalg import Vectors, VectorUDT
from pyspark.mllib.linalg import DenseVector, SparseVector

import numpy as np
import pandas as pd

import json
import findspark
findspark.init()


conf = SparkConf().setAppName('dev').setMaster('local[*]')

spark = SparkSession.builder.config(
    conf=conf).enableHiveSupport().getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)

#### 数据类型注意事项

- 选中DataFrame  
``` 
df.select(colName) 
return: DataFrame[colName]
```

- 选中列  
```
df.colName  
df[colName]
return: Column<b'colName'>
```

## 处理嵌套数据

In [5]:
struct1 = StructType([StructField("distCol", DoubleType(), True), StructField("url", StringType(), True)])
struct2 = StructType([StructField("urlA", StringType(), True), StructField("urlB", ArrayType(struct1), True)])

# Create DataFrame
dft = spark.createDataFrame([
        ['url_a1', [[0.03, 'url1'], [0.02, 'url2'], [0.01, 'url3']]],
        ['url_a2', [[0.05, 'url4'], [0.03, 'url5']]]
    ], struct2)

print(dft)
print(dft.show(truncate=False))
# Define udf
top_N = 5
def rank_url(array):
    ranked_url = sorted(array, key=lambda x: x['distCol'])[0:top_N]
    return ranked_url
url_udf = f.udf(rank_url, ArrayType(struct1))

# Apply udf
df2 = dft.select('urlA', url_udf('urlB'))
df2.show(truncate=False)

DataFrame[urlA: string, urlB: array<struct<distCol:double,url:string>>]
+------+------------------------------------------+
|urlA  |urlB                                      |
+------+------------------------------------------+
|url_a1|[[0.03, url1], [0.02, url2], [0.01, url3]]|
|url_a2|[[0.05, url4], [0.03, url5]]              |
+------+------------------------------------------+

None
+------+------------------------------------------+
|urlA  |rank_url(urlB)                            |
+------+------------------------------------------+
|url_a1|[[0.01, url3], [0.02, url2], [0.03, url1]]|
|url_a2|[[0.03, url5], [0.05, url4]]              |
+------+------------------------------------------+



In [6]:
a = [1,2,3,4,5,6,7,8,9,10]
df = spark.createDataFrame([['a b c d e f g h i j '],], ['col1'])
df = df.withColumn("NewColumn", f.array([f.lit(x) for x in a]))
df.show(truncate=False)

+--------------------+-------------------------------+
|col1                |NewColumn                      |
+--------------------+-------------------------------+
|a b c d e f g h i j |[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]|
+--------------------+-------------------------------+



In [118]:
df = spark.sql("select * from default.JiebaCom_f45c00b4_out1_157")
df.show()

+---+--------------------+--------------------+
| id|             comment|         comment_lst|
+---+--------------------+--------------------+
|  0|  感觉不是很适合太小的孩子，不太喜欢，|[感觉, 不是, 很, 适合, 太...|
|  1|房间整洁，空间狭小，椅子和床之间太...|[房间, 整洁, ，, 空间, 狭...|
|  2|我是听西门子的朋友推荐选择这家酒店...|[我, 是, 听, 西门子, 的,...|
|  3|            蒙牛又出来丢人了|  [蒙牛, 又, 出来, 丢人, 了]|
|  4|有点太厚了，键盘比较没手感！屏幕下...|[有点, 太厚, 了, ，, 键盘...|
|  5|买这本书是因为原作者王国维。其实我...|[买, 这, 本书, 是因为, 原...|
|  6|爸妈来北京，看爸爸很喜欢保健，便给...|[爸妈, 来, 北京, ，, 看,...|
|  7|比较适合国内客。一楼餐厅风格上有点...|[比较, 适合, 国内, 客, 。...|
|  8|又发现的缺点：一、将使用数码相机照...|[又, 发现, 的, 缺点, ：,...|
|  9|             "感觉有些过时|     [", 感觉, 有些, 过时]|
| 10|            早就不喝蒙牛了！|[早就, 不, 喝, 蒙牛, 了, ！]|
| 11|这本书的作者自己有多年食品添加剂行...|[这, 本书, 的, 作者, 自己...|
| 12|酒店条件非常差！明明说有网络，就是...|[酒店, 条件, 非常, 差, ！...|
| 13|似乎这是上大学以来第一次半夜起来上...|[似乎, 这, 是, 上, 大学,...|
| 14|这么一个平庸的作品，怎么炒得这么火...|[这么, 一个, 平庸, 的, 作...|
| 15|买这本书的目的是想让宝宝知道什么是...|[买, 这, 本书, 的, 目的,...|
| 16|已经给儿子念完两本了，书是好书，但...|[已经, 给, 儿子, 念完, 两...|
| 17|             坐等蒙牛倒闭。|     [坐等, 蒙牛, 

In [119]:
cvt = CountVectorizer(inputCol="comment_lst", outputCol="cvt_comment_lst")
cvtModel = cvt.fit(df)

In [120]:
df = cvtModel.transform(df)
df.show()

+---+--------------------+--------------------+--------------------+
| id|             comment|         comment_lst|     cvt_comment_lst|
+---+--------------------+--------------------+--------------------+
|  0|  感觉不是很适合太小的孩子，不太喜欢，|[感觉, 不是, 很, 适合, 太...|(46989,[0,1,8,35,...|
|  1|房间整洁，空间狭小，椅子和床之间太...|[房间, 整洁, ，, 空间, 狭...|(46989,[0,2,3,10,...|
|  2|我是听西门子的朋友推荐选择这家酒店...|[我, 是, 听, 西门子, 的,...|(46989,[0,1,2,4,5...|
|  3|            蒙牛又出来丢人了|  [蒙牛, 又, 出来, 丢人, 了]|(46989,[3,41,72,2...|
|  4|有点太厚了，键盘比较没手感！屏幕下...|[有点, 太厚, 了, ，, 键盘...|(46989,[0,3,6,12,...|
|  5|买这本书是因为原作者王国维。其实我...|[买, 这, 本书, 是因为, 原...|(46989,[0,1,2,3,4...|
|  6|爸妈来北京，看爸爸很喜欢保健，便给...|[爸妈, 来, 北京, ，, 看,...|(46989,[0,1,2,3,8...|
|  7|比较适合国内客。一楼餐厅风格上有点...|[比较, 适合, 国内, 客, 。...|(46989,[1,2,47,51...|
|  8|又发现的缺点：一、将使用数码相机照...|[又, 发现, 的, 缺点, ：,...|(46989,[0,1,2,16,...|
|  9|             "感觉有些过时|     [", 感觉, 有些, 过时]|(46989,[31,35,152...|
| 10|            早就不喝蒙牛了！|[早就, 不, 喝, 蒙牛, 了, ！]|(46989,[3,6,11,41...|
| 11|这本书的作者自己有多年食品添加剂行...|[这, 本书, 

In [121]:
lda = LDA(featuresCol="cvt_comment_lst", maxIter=10, k=10, seed=2020, optimizer="em")
ldaModel = lda.fit(df)

In [122]:
ldaModel.describeTopics(3).show()

+-----+-----------+--------------------+
|topic|termIndices|         termWeights|
+-----+-----------+--------------------+
|    0|  [0, 1, 2]|[0.08745377459007...|
|    1|  [0, 1, 2]|[0.08630206789175...|
|    2|  [0, 1, 2]|[0.08796209828596...|
|    3|  [0, 1, 2]|[0.08769912200535...|
|    4|  [0, 1, 2]|[0.08927700774315...|
|    5|  [0, 1, 2]|[0.08752347485694...|
|    6|  [0, 1, 2]|[0.08671938733868...|
|    7|  [0, 1, 2]|[0.08759557917376...|
|    8|  [0, 1, 2]|[0.08686134075597...|
|    9|  [0, 1, 2]|[0.08881329549220...|
+-----+-----------+--------------------+



In [124]:
ldaModel.transform(df).show()

+---+--------------------+--------------------+--------------------+--------------------+
| id|             comment|         comment_lst|     cvt_comment_lst|   topicDistribution|
+---+--------------------+--------------------+--------------------+--------------------+
|  0|  感觉不是很适合太小的孩子，不太喜欢，|[感觉, 不是, 很, 适合, 太...|(46989,[0,1,8,35,...|[0.09993617633755...|
|  1|房间整洁，空间狭小，椅子和床之间太...|[房间, 整洁, ，, 空间, 狭...|(46989,[0,2,3,10,...|[0.09887284582834...|
|  2|我是听西门子的朋友推荐选择这家酒店...|[我, 是, 听, 西门子, 的,...|(46989,[0,1,2,4,5...|[0.10499174459807...|
|  3|            蒙牛又出来丢人了|  [蒙牛, 又, 出来, 丢人, 了]|(46989,[3,41,72,2...|[0.10012136925165...|
|  4|有点太厚了，键盘比较没手感！屏幕下...|[有点, 太厚, 了, ，, 键盘...|(46989,[0,3,6,12,...|[0.10034992500070...|
|  5|买这本书是因为原作者王国维。其实我...|[买, 这, 本书, 是因为, 原...|(46989,[0,1,2,3,4...|[0.08239620486390...|
|  6|爸妈来北京，看爸爸很喜欢保健，便给...|[爸妈, 来, 北京, ，, 看,...|(46989,[0,1,2,3,8...|[0.11197208844595...|
|  7|比较适合国内客。一楼餐厅风格上有点...|[比较, 适合, 国内, 客, 。...|(46989,[1,2,47,51...|[0.10056862674848...|
|  8|又发现的缺

In [66]:
df1 = spark.sql("select * from default.CountVectorizerCom_13722993_out1_157")

In [14]:
pdf = df1.select("cvt_comment_lst").toPandas()

In [19]:
res = df1.select("cvt_comment_lst").rdd.map(lambda row: tuple(
    (row[0].type, row[0].size, row[0].indices, row[0].values))).map(VectorUDT().deserialize)
res.collect()[1]

SparseVector(46989, {0: 7.0, 2: 1.0, 3: 1.0, 10: 1.0, 11: 1.0, 17: 1.0, 21: 1.0, 33: 1.0, 80: 1.0, 92: 1.0, 101: 1.0, 135: 1.0, 224: 1.0, 226: 1.0, 310: 1.0, 638: 1.0, 689: 1.0, 770: 1.0, 854: 1.0, 1050: 1.0, 1488: 1.0, 1659: 1.0, 1737: 1.0, 1920: 1.0, 2228: 1.0, 2688: 1.0, 4637: 1.0, 5091: 1.0, 5630: 1.0, 6165: 1.0, 12258: 1.0, 25340: 1.0, 34748: 1.0, 39602: 1.0})

## 转换hive sparseVector为sparseVector

In [136]:
df1 = spark.sql("select * from default.CountVectorizerCom_13722993_out1_157")

In [137]:
df1.dtypes

[('id', 'int'),
 ('comment', 'string'),
 ('comment_lst', 'array<string>'),
 ('cvt_comment_lst',
  'struct<type:tinyint,size:int,indices:array<int>,values:array<double>>')]

In [138]:
df1["cvt_comment_lst"]

Column<b'cvt_comment_lst'>

In [139]:
df1.cvt_comment_lst

Column<b'cvt_comment_lst'>

In [140]:
f = udf(lambda row: Row(VectorUDT().deserialize((row.type, row.size, row.indices, row.values)))[0], VectorUDT())
df1 = df1.withColumn("cvt_comment_lst", f(df1["cvt_comment_lst"]))
df1.show()

+---+--------------------+--------------------+--------------------+
| id|             comment|         comment_lst|     cvt_comment_lst|
+---+--------------------+--------------------+--------------------+
|  0|  感觉不是很适合太小的孩子，不太喜欢，|[感觉, 不是, 很, 适合, 太...|(46989,[0,1,8,35,...|
|  1|房间整洁，空间狭小，椅子和床之间太...|[房间, 整洁, ，, 空间, 狭...|(46989,[0,2,3,10,...|
|  2|我是听西门子的朋友推荐选择这家酒店...|[我, 是, 听, 西门子, 的,...|(46989,[0,1,2,4,5...|
|  3|            蒙牛又出来丢人了|  [蒙牛, 又, 出来, 丢人, 了]|(46989,[3,41,72,2...|
|  4|有点太厚了，键盘比较没手感！屏幕下...|[有点, 太厚, 了, ，, 键盘...|(46989,[0,3,6,12,...|
|  5|买这本书是因为原作者王国维。其实我...|[买, 这, 本书, 是因为, 原...|(46989,[0,1,2,3,4...|
|  6|爸妈来北京，看爸爸很喜欢保健，便给...|[爸妈, 来, 北京, ，, 看,...|(46989,[0,1,2,3,8...|
|  7|比较适合国内客。一楼餐厅风格上有点...|[比较, 适合, 国内, 客, 。...|(46989,[1,2,47,51...|
|  8|又发现的缺点：一、将使用数码相机照...|[又, 发现, 的, 缺点, ：,...|(46989,[0,1,2,16,...|
|  9|             "感觉有些过时|     [", 感觉, 有些, 过时]|(46989,[31,35,152...|
| 10|            早就不喝蒙牛了！|[早就, 不, 喝, 蒙牛, 了, ！]|(46989,[3,6,11,41...|
| 11|这本书的作者自己有多年食品添加剂行...|[这, 本书, 

In [184]:
df1.cvt_comment_lst

Column<b'cvt_comment_lst'>

In [93]:
# res = df1.select("cvt_comment_lst").rdd.map(lambda row: tuple(
#     (row[0].type, row[0].size, row[0].indices, row[0].values))).map(lambda x: row(VectorUDT().deserialize(x))).toDF()

def func(row):
    indices = tuple((row.type, row.size, row.indices, row.values))
    res = Row(VectorUDT().deserialize(indices))
    return res[0]

f = udf(func, VectorUDT())

df1 = df1.withColumn("cvt_comment_lst", f(df1.cvt_comment_lst))
df1.show()

# row = Row("features")
# df1.select("*", df1.select("cvt_comment_lst").rdd.map(lambda row: tuple(
#     (row[0].type, row[0].size, row[0].indices, row[0].values))).map(lambda x: row(VectorUDT().deserialize(x))).toDF())

+---+--------------------+--------------------+--------------------+
| id|             comment|         comment_lst|     cvt_comment_lst|
+---+--------------------+--------------------+--------------------+
|  0|  感觉不是很适合太小的孩子，不太喜欢，|[感觉, 不是, 很, 适合, 太...|(46989,[0,1,8,35,...|
|  1|房间整洁，空间狭小，椅子和床之间太...|[房间, 整洁, ，, 空间, 狭...|(46989,[0,2,3,10,...|
|  2|我是听西门子的朋友推荐选择这家酒店...|[我, 是, 听, 西门子, 的,...|(46989,[0,1,2,4,5...|
|  3|            蒙牛又出来丢人了|  [蒙牛, 又, 出来, 丢人, 了]|(46989,[3,41,72,2...|
|  4|有点太厚了，键盘比较没手感！屏幕下...|[有点, 太厚, 了, ，, 键盘...|(46989,[0,3,6,12,...|
|  5|买这本书是因为原作者王国维。其实我...|[买, 这, 本书, 是因为, 原...|(46989,[0,1,2,3,4...|
|  6|爸妈来北京，看爸爸很喜欢保健，便给...|[爸妈, 来, 北京, ，, 看,...|(46989,[0,1,2,3,8...|
|  7|比较适合国内客。一楼餐厅风格上有点...|[比较, 适合, 国内, 客, 。...|(46989,[1,2,47,51...|
|  8|又发现的缺点：一、将使用数码相机照...|[又, 发现, 的, 缺点, ：,...|(46989,[0,1,2,16,...|
|  9|             "感觉有些过时|     [", 感觉, 有些, 过时]|(46989,[31,35,152...|
| 10|            早就不喝蒙牛了！|[早就, 不, 喝, 蒙牛, 了, ！]|(46989,[3,6,11,41...|
| 11|这本书的作者自己有多年食品添加剂行...|[这, 本书, 

##  转换hive denseVector为densevector

In [149]:
df2 = spark.sql("select * from default.TransferW2VCom_6e4e1734_out1_157")

In [150]:
df2.dtypes

[('comment', 'string'),
 ('comment_lst', 'array<string>'),
 ('id', 'bigint'),
 ('w2v_res', 'array<double>')]

In [151]:
df2.show()

+--------------------+--------------------+---+--------------------+
|             comment|         comment_lst| id|             w2v_res|
+--------------------+--------------------+---+--------------------+
|  感觉不是很适合太小的孩子，不太喜欢，|[感觉, 不是, 很, 适合, 太...|  0|[-0.1150548502125...|
|房间整洁，空间狭小，椅子和床之间太...|[房间, 整洁, ，, 空间, 狭...|  1|[-0.0416673735249...|
|我是听西门子的朋友推荐选择这家酒店...|[我, 是, 听, 西门子, 的,...|  2|[-0.1165728564546...|
|            蒙牛又出来丢人了|  [蒙牛, 又, 出来, 丢人, 了]|  3|[0.07461750898510...|
|有点太厚了，键盘比较没手感！屏幕下...|[有点, 太厚, 了, ，, 键盘...|  4|[-0.2101077857342...|
|买这本书是因为原作者王国维。其实我...|[买, 这, 本书, 是因为, 原...|  5|[0.05922474306779...|
|爸妈来北京，看爸爸很喜欢保健，便给...|[爸妈, 来, 北京, ，, 看,...|  6|[0.01157951669703...|
|比较适合国内客。一楼餐厅风格上有点...|[比较, 适合, 国内, 客, 。...|  7|[-0.0927039539584...|
|又发现的缺点：一、将使用数码相机照...|[又, 发现, 的, 缺点, ：,...|  8|[-0.0081130786178...|
|             "感觉有些过时|     [", 感觉, 有些, 过时]|  9|[-0.1039343569427...|
|            早就不喝蒙牛了！|[早就, 不, 喝, 蒙牛, 了, ！]| 10|[-0.0993022664139...|
|这本书的作者自己有多年食品添加剂行...|[这, 本书, 的, 作

In [153]:
f = udf(lambda x: Vectors.dense(x), VectorUDT())
df2 = df2.withColumn("w2v_res", f(df2["w2v_res"]))

In [154]:
df2.show()

+--------------------+--------------------+---+--------------------+
|             comment|         comment_lst| id|             w2v_res|
+--------------------+--------------------+---+--------------------+
|  感觉不是很适合太小的孩子，不太喜欢，|[感觉, 不是, 很, 适合, 太...|  0|[-0.1150548502125...|
|房间整洁，空间狭小，椅子和床之间太...|[房间, 整洁, ，, 空间, 狭...|  1|[-0.0416673735249...|
|我是听西门子的朋友推荐选择这家酒店...|[我, 是, 听, 西门子, 的,...|  2|[-0.1165728564546...|
|            蒙牛又出来丢人了|  [蒙牛, 又, 出来, 丢人, 了]|  3|[0.07461750898510...|
|有点太厚了，键盘比较没手感！屏幕下...|[有点, 太厚, 了, ，, 键盘...|  4|[-0.2101077857342...|
|买这本书是因为原作者王国维。其实我...|[买, 这, 本书, 是因为, 原...|  5|[0.05922474306779...|
|爸妈来北京，看爸爸很喜欢保健，便给...|[爸妈, 来, 北京, ，, 看,...|  6|[0.01157951669703...|
|比较适合国内客。一楼餐厅风格上有点...|[比较, 适合, 国内, 客, 。...|  7|[-0.0927039539584...|
|又发现的缺点：一、将使用数码相机照...|[又, 发现, 的, 缺点, ：,...|  8|[-0.0081130786178...|
|             "感觉有些过时|     [", 感觉, 有些, 过时]|  9|[-0.1039343569427...|
|            早就不喝蒙牛了！|[早就, 不, 喝, 蒙牛, 了, ！]| 10|[-0.0993022664139...|
|这本书的作者自己有多年食品添加剂行...|[这, 本书, 的, 作