In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark import SparkContext, SparkConf
from graphframes import GraphFrame
import time

In [2]:
#初始化并读取所有数据
#conf = SparkConf().setAppName("MySpark").setMaster("local[*]")
#sc = SparkContext(conf=conf)
#lines = sc.textFile('twitter_combined.txt')
spark = SparkSession.builder.appName("MySpark").getOrCreate()
sc = spark.sparkContext
lines = sc.textFile('twitter_sub.txt')

In [3]:
#提取所有实体并去重，然后转换成dataFrame
entity = lines.flatMap(lambda x:x.split(' ')).distinct()
entity_zip = entity.zipWithIndex()
entity_row = entity_zip .map(lambda e: Row(id = e[1], name = e[0]))
entity_table = spark.createDataFrame(entity_row) 
entity_table.show()

+---+---------+
| id|     name|
+---+---------+
|  0| 17116707|
|  1| 28465635|
|  2|380580781|
|  3| 18996905|
|  4|153460275|
|  5|222261763|
|  6| 88323281|
|  7| 19933035|
|  8| 17434613|
|  9|364971269|
| 10|100581193|
| 11|279787626|
| 12| 69592091|
| 13|187773078|
| 14|262802533|
| 15|280935165|
| 16|285312927|
| 17|254839786|
| 18|204317520|
| 19| 21548772|
+---+---------+
only showing top 20 rows



In [4]:
#实体数量
entity.count()

638

In [5]:
#提取所有关系，然后转换成dataFrame（实体id，实体id，关系）
entity_dict = dict(entity_zip .map(lambda e: (e[0], e[1])).collect())
relation_row = lines.map(lambda x:x.split(' ')).map(lambda r: Row(src = entity_dict[r[0]], dst = entity_dict[r[1]],\
                                                                  relationship = 'concerned '))
relation_table = spark.createDataFrame(relation_row) 
relation_table.show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|339|340|  concerned |
|  0|  1|  concerned |
|  2|  3|  concerned |
|341|  4|  concerned |
|342|343|  concerned |
|344|  5|  concerned |
|345|340|  concerned |
|  5|  6|  concerned |
|  7|346|  concerned |
|347|  8|  concerned |
|346|348|  concerned |
|  9|348|  concerned |
| 10| 11|  concerned |
|349| 12|  concerned |
|344| 13|  concerned |
|350| 14|  concerned |
|351|  6|  concerned |
| 15|352|  concerned |
|  5|353|  concerned |
| 16|344|  concerned |
+---+---+------------+
only showing top 20 rows



In [6]:
#关系数量
relation_row.count()

11604

In [7]:
#将提取出来的实体表和关系表写入csv文件夹，以供查看
entity_table.toPandas().to_csv('entity2id.csv',encoding='utf_8_sig',index=False)
relation_table.toPandas().to_csv('relation.csv',encoding='utf_8_sig',index=False)

In [8]:
#构建Graph
g = GraphFrame(entity_table, relation_table)

In [9]:
#入度取前10展示
g.inDegrees.sort("inDegree",ascending=False).show(10)

+---+--------+
| id|inDegree|
+---+--------+
| 35|     174|
| 47|     167|
| 41|     160|
|340|     159|
|  3|     117|
|364|     112|
|353|     111|
|343|     110|
|388|     106|
|359|     101|
+---+--------+
only showing top 10 rows



In [10]:
#使用PageRank算法选出重要用户
tic = time.time()
result = g.pageRank(resetProbability=0.15, maxIter=5)
result.vertices.sort('pagerank',ascending=False).show(10)
toc = time.time()
print("PageRank算法用时:" + str(toc-tic))

+---+---------+------------------+
| id|     name|          pagerank|
+---+---------+------------------+
| 35| 40981798| 9.571762777234309|
| 47| 43003845| 8.536144105795914|
|230|  7861312| 7.479196505734391|
| 41| 22462180| 7.092325211932122|
|340| 34428380|7.0908748071041465|
|353| 27633075| 5.410156272908095|
|514| 25970331| 5.396491631422866|
|364| 31331740| 5.349122654587942|
| 72|133055665| 5.105792570054932|
|359|  8088112|  5.01979464154914|
+---+---------+------------------+
only showing top 10 rows

PageRank算法用时:7.063082933425903


In [16]:
#使用LPA算法来做社区发现（CommunityDetection）
tic = time.time()
results = g.labelPropagation(maxIter=5)
results.orderBy('label').show(10)
toc = time.time()
print("LPA算法用时:" + str(toc-tic))

+---+---------+-----+
| id|     name|label|
+---+---------+-----+
|348|153226312|   23|
|415| 55033682|   23|
|385| 57490887|   23|
| 19| 21548772|   23|
|112|102765423|   23|
|367|430268163|   23|
| 65| 86221475|   23|
|418|186212304|   23|
|442|273149543|   23|
| 54|220068522|   23|
+---+---------+-----+
only showing top 10 rows

LPA算法用时:2.697854518890381


In [18]:
#将结果写入csv文件夹，以供查看
results.toPandas().to_csv('results.csv',encoding='utf_8_sig',index=False)

In [26]:
#查看社区分类情况
results.groupBy('label').count().collect()

[Row(label=487, count=31),
 Row(label=227, count=2),
 Row(label=621, count=1),
 Row(label=246, count=2),
 Row(label=307, count=2),
 Row(label=328, count=2),
 Row(label=185, count=2),
 Row(label=550, count=193),
 Row(label=89, count=1),
 Row(label=254, count=1),
 Row(label=451, count=67),
 Row(label=559, count=1),
 Row(label=478, count=13),
 Row(label=83, count=15),
 Row(label=514, count=78),
 Row(label=286, count=1),
 Row(label=313, count=1),
 Row(label=312, count=1),
 Row(label=23, count=207),
 Row(label=633, count=1),
 Row(label=282, count=1),
 Row(label=469, count=15)]

In [27]:
sc.stop()

In [28]:
try:
    !jupyter nbconvert --to python CommunityDetection
except:
    pass

[NbConvertApp] Converting notebook CommunityDetection.ipynb to python
[NbConvertApp] Writing 2238 bytes to CommunityDetection.py
