In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from graphframes import GraphFrame


sc = SparkContext("local", appName="mysqltest")
sqlContext = SQLContext(sc)

vertices = sqlContext.createDataFrame([
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)], ["id", "name", "age"])

edges = sqlContext.createDataFrame([
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
], ["src", "dst", "relationship"])

print(type(vertices))
print(type(edges))

# 生成图
g = GraphFrame(vertices, edges)
print(g)
# GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])
print(type(g))
# <class 'graphframes.graphframe.GraphFrame'>


<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 1 more field])
<class 'graphframes.graphframe.GraphFrame'>


In [8]:
# g.inDegree()                #计算顶点的入度
g.inDegrees.sort("id").show()
# g.outDegree()               #计算顶点的出度
g.outDegrees.sort("id").show()
g.vertices.show()
g.vertices.groupBy().min("age").show()   #对顶点进行分组聚合
g.vertices.filter("age < 30").show()     #按条件筛选顶点
g.edges.show()
g.edges.filter("relationship = 'follow'").count()  #按条件筛选边

+---+--------+
| id|inDegree|
+---+--------+
|  a|       1|
|  b|       2|
|  c|       2|
|  d|       1|
|  e|       1|
|  f|       1|
+---+--------+

+---+---------+
| id|outDegree|
+---+---------+
|  a|        2|
|  b|        1|
|  c|        1|
|  d|        1|
|  e|        2|
|  f|        1|
+---+---------+

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  a|  Alice| 34|
|  b|    Bob| 36|
|  c|Charlie| 30|
|  d|  David| 29|
|  e| Esther| 32|
|  f|  Fanny| 36|
|  g|  Gabby| 60|
+---+-------+---+

+--------+
|min(age)|
+--------+
|      29|
+--------+

+---+-----+---+
| id| name|age|
+---+-----+---+
|  d|David| 29|
+---+-----+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
+---+---+------------+



4

In [9]:
#查找图中最重要的用户可以用pageRank函数：
results = g.pageRank(resetProbability=0.15, maxIter=10)
display(results.vertices)

DataFrame[id: string, name: string, age: bigint, pagerank: double]

In [12]:
results.vertices.show()

+---+-------+---+-------------------+
| id|   name|age|           pagerank|
+---+-------+---+-------------------+
|  g|  Gabby| 60|0.17073170731707318|
|  b|    Bob| 36| 2.7025217677349773|
|  e| Esther| 32| 0.3613490987992571|
|  a|  Alice| 34| 0.4485115093698443|
|  f|  Fanny| 36|0.32504910549694244|
|  d|  David| 29|0.32504910549694244|
|  c|Charlie| 30| 2.6667877057849627|
+---+-------+---+-------------------+



In [13]:
results.edges.show()

+---+---+------------+------+
|src|dst|relationship|weight|
+---+---+------------+------+
|  a|  b|      friend|   0.5|
|  b|  c|      follow|   1.0|
|  e|  f|      follow|   0.5|
|  e|  d|      friend|   0.5|
|  c|  b|      follow|   1.0|
|  a|  e|      friend|   0.5|
|  f|  c|      follow|   1.0|
|  d|  a|      friend|   1.0|
+---+---+------------+------+



In [2]:
# Motif finding 用 Domain-Specific Language (DSL) 指定structural queries

# Find chains of 4 vertices.
chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
 
from functools import reduce
from pyspark.sql.functions import lit
from pyspark.sql.functions import col
from pyspark.sql.functions import when
# Query on sequence, with state (cnt)
#  (a) Define method for updating state given the next element of the motif.
def cumFriends(cnt, edge):
  relationship = col(edge)["relationship"]
  return when(relationship == "friend", cnt + 1).otherwise(cnt)
 
#  (b) Use sequence operation to apply method to sequence of elements in motif.
#   In this case, the elements are the 3 edges.
edges = ["ab", "bc", "cd"]
numFriends = reduce(cumFriends, edges, lit(0))
    
chainWith2Friends2 = chain4.withColumn("num_friends", numFriends).where(numFriends >= 2)
chainWith2Friends2.show()

+---------------+--------------+---------------+--------------+---------------+--------------+----------------+-----------+
|              a|            ab|              b|            bc|              c|            cd|               d|num_friends|
+---------------+--------------+---------------+--------------+---------------+--------------+----------------+-----------+
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, e, friend]|[e, Esther, 32]|[e, f, follow]|  [f, Fanny, 36]|          2|
|[e, Esther, 32]|[e, d, friend]| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|          3|
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, e, friend]|[e, Esther, 32]|[e, d, friend]|  [d, David, 29]|          3|
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, b, friend]|   [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|          2|
|[e, Esther, 32]|[e, d, friend]| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|          3|
| [a, Al

In [49]:
from pyspark.sql import functions as F
chain4.groupBy("b").agg(F.count(chain4.b).alias("计数")).sort("b").show()

+----------------+----+
|               b|计数|
+----------------+----+
|  [a, Alice, 34]|   3|
|    [b, Bob, 36]|   2|
|[c, Charlie, 30]|   2|
|  [d, David, 29]|   2|
| [e, Esther, 32]|   2|
|  [f, Fanny, 36]|   1|
+----------------+----+



In [7]:
chain4.filter("ab.relationship = 'friend'").sort("b").show()

+---------------+--------------+---------------+--------------+----------------+--------------+----------------+
|              a|            ab|              b|            bc|               c|            cd|               d|
+---------------+--------------+---------------+--------------+----------------+--------------+----------------+
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|[e, d, friend]|  [d, David, 29]|
| [d, David, 29]|[d, a, friend]| [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|[e, f, follow]|  [f, Fanny, 36]|
| [a, Alice, 34]|[a, b, friend]|   [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|
|[e, Esther, 32]|[e, d, friend]| [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|
|[e, Esther, 32]|[e, d, friend]| [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|[a, e, friend]| 

In [3]:
chain4.select("ab.src", "ab.dst", "ab.relationship").show()

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  d|  a|      friend|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
|  f|  c|      follow|
|  b|  c|      follow|
|  d|  a|      friend|
|  e|  f|      follow|
|  c|  b|      follow|
|  a|  b|      friend|
|  e|  d|      friend|
|  a|  e|      friend|
+---+---+------------+



In [4]:
# 使用顶点和边的集合构造子图
e2 = chain4.select("ab.src", "ab.dst", "ab.relationship")
g2 = GraphFrame(g.vertices, e2)
g2.inDegrees.sort("id").show()
g2.outDegrees.sort("id").show()
g.edges.show()

+---+--------+
| id|inDegree|
+---+--------+
|  a|       3|
|  b|       2|
|  c|       2|
|  d|       2|
|  e|       2|
|  f|       1|
+---+--------+

+---+---------+
| id|outDegree|
+---+---------+
|  a|        3|
|  b|        1|
|  c|        1|
|  d|        3|
|  e|        3|
|  f|        1|
+---+---------+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  b|      friend|
|  b|  c|      follow|
|  c|  b|      follow|
|  f|  c|      follow|
|  e|  f|      follow|
|  e|  d|      friend|
|  d|  a|      friend|
|  a|  e|      friend|
+---+---+------------+



In [5]:
chain_i = g2.find("(A)-[e]->(B)")
chain_i.sort("B").show()

+----------------+--------------+----------------+
|               A|             e|               B|
+----------------+--------------+----------------+
|  [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|
|  [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|
|  [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|
|[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|
|  [a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|
|  [f, Fanny, 36]|[f, c, follow]|[c, Charlie, 30]|
|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|
| [e, Esther, 32]|[e, d, friend]|  [d, David, 29]|
| [e, Esther, 32]|[e, d, friend]|  [d, David, 29]|
|  [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|
|  [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|
| [e, Esther, 32]|[e, f, follow]|  [f, Fanny, 36]|
+----------------+--------------+----------------+



In [6]:
g2.inDegrees.filter("inDegree>2").show()

+---+--------+
| id|inDegree|
+---+--------+
|  a|       3|
+---+--------+



In [9]:
chain4.select("a.id","a.name","ab.src", "ab.dst", "ab.relationship","b").show()

+---+-------+---+---+------------+----------------+
| id|   name|src|dst|relationship|               b|
+---+-------+---+---+------------+----------------+
|  d|  David|  d|  a|      friend|  [a, Alice, 34]|
|  e| Esther|  e|  d|      friend|  [d, David, 29]|
|  d|  David|  d|  a|      friend|  [a, Alice, 34]|
|  a|  Alice|  a|  e|      friend| [e, Esther, 32]|
|  f|  Fanny|  f|  c|      follow|[c, Charlie, 30]|
|  b|    Bob|  b|  c|      follow|[c, Charlie, 30]|
|  d|  David|  d|  a|      friend|  [a, Alice, 34]|
|  e| Esther|  e|  f|      follow|  [f, Fanny, 36]|
|  c|Charlie|  c|  b|      follow|    [b, Bob, 36]|
|  a|  Alice|  a|  b|      friend|    [b, Bob, 36]|
|  e| Esther|  e|  d|      friend|  [d, David, 29]|
|  a|  Alice|  a|  e|      friend| [e, Esther, 32]|
+---+-------+---+---+------------+----------------+



TypeError: Can not infer schema for type: <class 'str'>

In [26]:
from pyspark.sql import Row
mylist = [
  {"id":"a","name":"Alice"},
  {"id":"f","name":"Funny"}
]
df2 = sqlContext.createDataFrame(Row(**x) for x in mylist)
df2.show(truncate=False)

df1 = chain4.select("a.id","a.name","ab.src", "ab.dst", "ab.relationship","b")

df1.join(df2,[df1.id==df2.id], "inner").show()

+---+-----+
|id |name |
+---+-----+
|a  |Alice|
|f  |Funny|
+---+-----+

+---+-----+---+---+------------+----------------+---+-----+
| id| name|src|dst|relationship|               b| id| name|
+---+-----+---+---+------------+----------------+---+-----+
|  f|Fanny|  f|  c|      follow|[c, Charlie, 30]|  f|Funny|
|  a|Alice|  a|  e|      friend| [e, Esther, 32]|  a|Alice|
|  a|Alice|  a|  b|      friend|    [b, Bob, 36]|  a|Alice|
|  a|Alice|  a|  e|      friend| [e, Esther, 32]|  a|Alice|
+---+-----+---+---+------------+----------------+---+-----+



In [15]:
sqlContext.createDataFrame(mylist).show(truncate=False)

'''
字典直接createDataFrame会报错
C:\Users\zqy\Anaconda3\lib\pyspark\sql\session.py:346: UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead
  warnings.warn("inferring schema from dict is deprecated,"
  
  (Row(**x) for x in mylist) 构造的是一个生成器对象<class 'generator'>, 不同于套[]创建list对象[Row(id='a', name='Alice'), Row(id='f', name='Funny')]，generator保存的只是算法从⽽节省⼤量的空间
'''




+---+-----+
|id |name |
+---+-----+
|a  |Alice|
|f  |Funny|
+---+-----+



In [27]:
chain4.select("*","a.id").show()

+----------------+--------------+----------------+--------------+----------------+--------------+----------------+---+
|               a|            ab|               b|            bc|               c|            cd|               d| id|
+----------------+--------------+----------------+--------------+----------------+--------------+----------------+---+
|  [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|[e, f, follow]|  [f, Fanny, 36]|  d|
| [e, Esther, 32]|[e, d, friend]|  [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|  e|
|  [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|[e, d, friend]|  [d, David, 29]|  d|
|  [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|[e, f, follow]|  [f, Fanny, 36]|[f, c, follow]|[c, Charlie, 30]|  a|
|  [f, Fanny, 36]|[f, c, follow]|[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|  f|
|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30

In [25]:
# Subgraphs 子图
g2 = g.filterEdges("relationship = 'friend'").filterVertices("age > 30").dropIsolatedVertices()
g2.vertices.show()
g2.edges.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  e|Esther| 32|
|  b|   Bob| 36|
|  a| Alice| 34|
+---+------+---+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|  e|      friend|
|  a|  b|      friend|
+---+---+------------+



In [26]:
# Standard graph algorithms 标准图算法
# Breadth-first search 广度优先搜索BFS 
paths = g.bfs("name = 'Esther'", "age < 32")
paths.show()

+---------------+--------------+--------------+
|           from|            e0|            to|
+---------------+--------------+--------------+
|[e, Esther, 32]|[e, d, friend]|[d, David, 29]|
+---------------+--------------+--------------+



In [27]:
filteredPaths = g.bfs(         \
  fromExpr = "name = 'Esther'", 
  toExpr = "age < 32",
  edgeFilter = "relationship != 'friend'",
  maxPathLength = 3)
filteredPaths.show()

+---------------+--------------+--------------+--------------+----------------+
|           from|            e0|            v1|            e1|              to|
+---------------+--------------+--------------+--------------+----------------+
|[e, Esther, 32]|[e, f, follow]|[f, Fanny, 36]|[f, c, follow]|[c, Charlie, 30]|
+---------------+--------------+--------------+--------------+----------------+



In [29]:
print(type(paths))

<class 'pyspark.sql.dataframe.DataFrame'>


In [6]:
filteredPaths_60 = g.bfs(         \
  fromExpr = "name = 'Esther'", 
  toExpr = "age < 60 and name != 'Esther'",
  maxPathLength = 3)
filteredPaths_60.show()

+---------------+--------------+--------------+
|           from|            e0|            to|
+---------------+--------------+--------------+
|[e, Esther, 32]|[e, f, follow]|[f, Fanny, 36]|
|[e, Esther, 32]|[e, d, friend]|[d, David, 29]|
+---------------+--------------+--------------+



In [7]:
result_60 = g.shortestPaths(["a", "b", "c","d", "e", "f"])
result_60.sort(["id"]).select("id", "distances").show(truncate=False)

+---+------------------------------------------------+
|id |distances                                       |
+---+------------------------------------------------+
|a  |[e -> 1, f -> 2, a -> 0, b -> 1, c -> 2, d -> 2]|
|b  |[b -> 0, c -> 1]                                |
|c  |[c -> 0, b -> 1]                                |
|d  |[e -> 2, f -> 3, a -> 1, b -> 2, c -> 3, d -> 0]|
|e  |[e -> 0, f -> 1, a -> 2, b -> 3, c -> 2, d -> 1]|
|f  |[f -> 0, c -> 1, b -> 2]                        |
|g  |[]                                              |
+---+------------------------------------------------+



In [19]:
from pyspark.sql.functions import explode
result_60.sort(["id"]).select("id", explode("distances")).show()

+---+---+-----+
| id|key|value|
+---+---+-----+
|  a|  e|    1|
|  a|  f|    2|
|  a|  a|    0|
|  a|  b|    1|
|  a|  c|    2|
|  a|  d|    2|
|  b|  b|    0|
|  b|  c|    1|
|  c|  c|    0|
|  c|  b|    1|
|  d|  e|    2|
|  d|  f|    3|
|  d|  a|    1|
|  d|  b|    2|
|  d|  c|    3|
|  d|  d|    0|
|  e|  e|    0|
|  e|  f|    1|
|  e|  a|    2|
|  e|  b|    3|
+---+---+-----+
only showing top 20 rows



In [24]:
from pyspark.sql.functions import col
set_指定本方 = ["a", "b", "c"]
df_path = result_60.sort(["id"]).select("id", explode("distances"))
df_指定source = df_path.filter(col("id").isin(set_指定本方 ))
df_指定深度 = df_指定source.filter("value>0 and value<3")
df_指定深度.show()

+---+---+-----+
| id|key|value|
+---+---+-----+
|  a|  e|    1|
|  a|  f|    2|
|  a|  b|    1|
|  a|  c|    2|
|  a|  d|    2|
|  b|  c|    1|
|  c|  b|    1|
+---+---+-----+



In [14]:
print(type(result_60.select("distances")))
result_60

<class 'pyspark.sql.dataframe.DataFrame'>


DataFrame[id: string, name: string, age: bigint, distances: map<string,int>]

In [33]:
result_60.sort(["id"]).select("id","name", explode("distances")).filter(length("name")<5).show()

NameError: name 'length' is not defined

In [10]:
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

DataFrame[a: struct<id:string,name:string,age:bigint>, e: struct<src:string,dst:string,relationship:string>, b: struct<id:string,name:string,age:bigint>, e2: struct<src:string,dst:string,relationship:string>]

In [4]:
chain = g.find("(a)-[ab]->(b);(b)-[bc]->(c);(c)-[cd]->(d)") 
print(type(chain))
print(chain.head(3))
'''
# 定义更新状态条件，关系为friends则+1
sumChain = lambda cnt, relation: when(\
    relationship == 'friends', cnt + 1) \
  .otherwise(cnt)
# 应用到chain，计算好友数量
condition = reduce(lambda cnt,sumChain(cnt, col(e).relationship),["ab", "bc", "cd"],lit(0))

# 计算好友圈数
chainWithFriends = chain.where(condition >= 2)
chainWithFriends.show()
'''

<class 'pyspark.sql.dataframe.DataFrame'>
[Row(a=Row(id='d', name='David', age=29), ab=Row(src='d', dst='a', relationship='friend'), b=Row(id='a', name='Alice', age=34), bc=Row(src='a', dst='e', relationship='friend'), c=Row(id='e', name='Esther', age=32), cd=Row(src='e', dst='f', relationship='follow'), d=Row(id='f', name='Fanny', age=36)), Row(a=Row(id='e', name='Esther', age=32), ab=Row(src='e', dst='d', relationship='friend'), b=Row(id='d', name='David', age=29), bc=Row(src='d', dst='a', relationship='friend'), c=Row(id='a', name='Alice', age=34), cd=Row(src='a', dst='e', relationship='friend'), d=Row(id='e', name='Esther', age=32)), Row(a=Row(id='d', name='David', age=29), ab=Row(src='d', dst='a', relationship='friend'), b=Row(id='a', name='Alice', age=34), bc=Row(src='a', dst='e', relationship='friend'), c=Row(id='e', name='Esther', age=32), cd=Row(src='e', dst='d', relationship='friend'), d=Row(id='d', name='David', age=29))]


In [4]:
#
# import pandas as pd
# from pyspark.sql import SparkSession
#
# spark = SparkSession \
#     .builder \
#     .appName('pyspark') \
#     .getOrCreate()
# # 原始数据
# test = spark.createDataFrame([('001','1',100,87,67,83,98), ('002','2',87,81,90,83,83), ('003','3',86,91,83,89,63),
#                             ('004','2',65,87,94,73,88), ('005','1',76,62,89,81,98), ('006','3',84,82,85,73,99),
#                             ('007','3',56,76,63,72,87), ('008','1',55,62,46,78,71), ('009','2',63,72,87,98,64)],
# 							['number','class','language','math','english','physic','chemical'])
# test.show()

# import pyspark
# from pyspark import SparkContext, SparkConf
# conf = SparkConf().setAppName("test").setMaster("local[4]")
# sc = SparkContext(conf=conf)
#
# #只需要5行代码就可以完成WordCount词频统计。
# file_path = 'D:/spark/eat_pyspark_in_10_days/data/hello.txt'
# rdd_line = sc.textFile(file_path)
# rdd_word = rdd_line.flatMap(lambda x:x.split(" "))
# rdd_one = rdd_word.map(lambda t:(t,1))
# rdd_count = rdd_one.reduceByKey(lambda x,y:x+y)
# print(rdd_count.collect())



from pyspark.sql import SparkSession
from graphframes import GraphFrame

spark = SparkSession \
    .builder \
    .appName('pyspark') \
    .getOrCreate()

vertices = spark.createDataFrame([
    ("a", "Alice", 34),
    ("b", "Bob", 36),
    ("c", "Charlie", 30),
    ("d", "David", 29),
    ("e", "Esther", 32),
    ("f", "Fanny", 36),
    ("g", "Gabby", 60)], ["id", "name", "age"])

edges = spark.createDataFrame([
    ("a", "b", "friend"),
    ("b", "c", "follow"),
    ("c", "b", "follow"),
    ("f", "c", "follow"),
    ("e", "f", "follow"),
    ("e", "d", "friend"),
    ("d", "a", "friend"),
    ("a", "e", "friend")
], ["src", "dst", "relationship"])

# 生成图
graph = GraphFrame(vertices, edges)

# print("顶点表视图：")
# graph.vertices.show() # graph.vertices 就是原始的vertices
# 顶点表视图：
# +---+-------+---+
# | id|   name|age|
# +---+-------+---+
# |  a|  Alice| 34|
# |  b|    Bob| 36|
# |  c|Charlie| 30|
# |  d|  David| 29|
# |  e| Esther| 32|
# |  f|  Fanny| 36|
# |  g|  Gabby| 60|
# +---+-------+---+

# print("边表视图：")
# graph.edges.show() # graph.edges 就是原始的 edges
# 边表视图：
# +---+---+------------+
# |src|dst|relationship|
# +---+---+------------+
# |  a|  b|      friend|
# |  b|  c|      follow|
# |  c|  b|      follow|
# |  f|  c|      follow|
# |  e|  f|      follow|
# |  e|  d|      friend|
# |  d|  a|      friend|
# |  a|  e|      friend|
# +---+---+------------+

# print("三元组视图：")
# graph.triplets.show()
# 三元组视图：
# +----------------+--------------+----------------+
# |             src|          edge|             dst|
# +----------------+--------------+----------------+
# | [e, Esther, 32]|[e, f, follow]|  [f, Fanny, 36]|
# |  [a, Alice, 34]|[a, e, friend]| [e, Esther, 32]|
# | [e, Esther, 32]|[e, d, friend]|  [d, David, 29]|
# |  [f, Fanny, 36]|[f, c, follow]|[c, Charlie, 30]|
# |    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|
# |[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|
# |  [a, Alice, 34]|[a, b, friend]|    [b, Bob, 36]|
# |  [d, David, 29]|[d, a, friend]|  [a, Alice, 34]|
# +----------------+--------------+----------------+

# 顶点的度
# graph.degrees.show()
# +---+------+
# | id|degree|
# +---+------+
# |  f|     2|
# |  e|     3|
# |  d|     2|
# |  c|     3|
# |  b|     3|
# |  a|     3|
# +---+------+

# 顶点的入度
# graph.inDegrees.show()
# +---+--------+
# | id|inDegree|
# +---+--------+
# |  f|       1|
# |  e|       1|
# |  d|       1|
# |  c|       2|
# |  b|       2|
# |  a|       1|
# +---+--------+

# 顶点的出度
# graph.outDegrees.show()
# +---+---------+
# | id|outDegree|
# +---+---------+
# |  f|        1|
# |  e|        2|
# |  d|        1|
# |  c|        1|
# |  b|        1|
# |  a|        2|
# +---+---------+

# 多个路径条件
motif = graph.find("(a)-[e]->(b); (b)-[e2]->(a)")
motif.show()

# 在搜索的结果上进行过滤
motif.filter("b.age > 30")
motif.show()

# 不需要返回路径中的元素时，可以使用匿名顶点和边
motif = graph.find("(start)-[]->()")
motif.show()

# 设置路径不存在的条件
motif = graph.find("(a)-[]->(b); !(b)-[]->(a)")
motif.show()

# Motif: A->B->C but not A->C
results = graph.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)")
# 排除自己
results = results.filter("A.id != C.id")
# 选择需要的列
results = results.select(results.A.id.alias("A"), results.C.id.alias("C"))
results.show()

+----------------+--------------+----------------+--------------+
|               a|             e|               b|            e2|
+----------------+--------------+----------------+--------------+
|[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|[b, c, follow]|
|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|[c, b, follow]|
+----------------+--------------+----------------+--------------+

+----------------+--------------+----------------+--------------+
|               a|             e|               b|            e2|
+----------------+--------------+----------------+--------------+
|[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|[b, c, follow]|
|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|[c, b, follow]|
+----------------+--------------+----------------+--------------+

+----------------+
|           start|
+----------------+
|  [f, Fanny, 36]|
| [e, Esther, 32]|
| [e, Esther, 32]|
|  [d, David, 29]|
|[c, Charlie, 30]|
|    [b, Bob, 36]|
|  [a, Alice, 34]|
|  [a, Alice, 34

In [18]:
import pyspark
print("spark version:",pyspark.__version__)
rdd = sc.parallelize(["hello","spark"])
print(rdd.reduce(lambda x,y:x+' '+y))

spark version: 2.4.6
hello spark
