# Single node analysis

In [1]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql import SQLContext

from pyspark.sql import functions as f
from pyspark.sql import types as T
from pyspark.sql import Window

import os

In [2]:
spark = SparkSession.builder.appName("Python Spark SQL").config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12").getOrCreate()
sqlContext = SQLContext(spark.sparkContext)

:: loading settings :: url = jar:file:/usr/local/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/tcn/.ivy2/cache
The jars for the packages stored in: /Users/tcn/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-656df167-ac7e-4b07-98f2-ac950c76cc14;1.0
	confs: [default]
	found graphframes#graphframes;0.8.2-spark3.2-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
:: resolution report :: resolve 117ms :: artifacts dl 6ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.2-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	-------------------------------------

In [3]:
from graphframes import *

## Prepare data

In [4]:
vertices = spark.read.parquet('../data/vertices/*.parquet')
edges = spark.read.parquet('../data/edges/*.parquet')

                                                                                

In [5]:
vertices.show(5)

+------------------+
|                id|
+------------------+
|      Abbas Afsar |
|       Abed S. H. |
|Abu-Shammala Wael |
|   Abuhlail J. Y. |
|    Assaf Michael |
+------------------+
only showing top 5 rows



In [6]:
edges.show(5)

+--------------------+--------------------+--------------+------------+--------------------+--------------------+
|                 src|                 dst|articles_count|articles_ids| articles_categories|articles_update_date|
+--------------------+--------------------+--------------+------------+--------------------+--------------------+
|Iliev Bozhidar Z....|Iliev Bozhidar Z....|             1| [0704.0066]|          [[hep-th]]|        [2007-05-23]|
|Schekochihin A. A...|Quataert E.  Berk...|             1| [0704.0044]|[[astro-ph, nlin....|        [2015-05-13]|
|Schekochihin A. A...|Dorland W.  Maryland|             1| [0704.0044]|[[astro-ph, nlin....|        [2015-05-13]|
|Schekochihin A. A...|Hammett G. W.  Pr...|             1| [0704.0044]|[[astro-ph, nlin....|        [2015-05-13]|
|Schekochihin A. A...|Tatsuno T.  Maryland|             1| [0704.0044]|[[astro-ph, nlin....|        [2015-05-13]|
+--------------------+--------------------+--------------+------------+-----------------

In [7]:
g = GraphFrame(vertices, edges)

## Degree centrality (DC)


In [8]:
g.degrees.where(f.col("id") == "Berger E. L. ").show()

+-------------+------+
|           id|degree|
+-------------+------+
|Berger E. L. |     6|
+-------------+------+



In [9]:
degree_centrality = g.degrees
degree_centrality = degree_centrality.withColumnRenamed('id', 'author')

In [10]:
degree_centrality.orderBy("degree", ascending=False).show(10)

+-------------------+------+
|             author|degree|
+-------------------+------+
|        Bloomer Ed |    26|
|       Clark James |    26|
|    Toher Jennifer |    26|
|    Pitkin Matthew |    26|
|Christensen Nelson |    26|
|  Roever Christian |    26|
|      Meyer Renate |    26|
|Umstaetter Richard |    26|
| Stroeer Alexander |    26|
|   Messenger Chris |    26|
+-------------------+------+
only showing top 10 rows



In [11]:
degree_centrality.write.mode("overwrite").parquet('../data/single-node-analysis/degree-centrality')

                                                                                

## Betweenness centrality (BC)

In [12]:
g.vertices.select('id').limit(5).rdd.flatMap(lambda x: x).collect()

                                                                                

['Abbas Afsar ',
 'Abed S. H. ',
 'Abu-Shammala Wael ',
 'Abuhlail J. Y. ',
 'Assaf Michael ']

In [13]:
shortest_paths = g.shortestPaths(landmarks=g.vertices.select('id').rdd.flatMap(lambda x: x).collect())
shortest_paths.show()

                                                                                

+--------------------+--------------------+
|                  id|           distances|
+--------------------+--------------------+
| Fal'ko Vladimir I. |{Goerbig M. O.  -...|
|        Levesley J. |{Brownlee R. A.  ...|
|        Noyes R. W. |{Everett M. E.  -...|
|The BABAR Collabo...|{Aubert B.  -> 1,...|
|       Theran Louis |{Theran Louis  ->...|
|        Choi Dohoon |{Choie YoungJu  -...|
|      Goerbig M. O. |{Goerbig M. O.  -...|
|     Minasyan Ashot |{Minasyan Ashot  ...|
|          Torres G. |{Everett M. E.  -...|
|       Warner Brian |{Dhillon Vikram S...|
|Tatsuno T.  Maryland|{Dorland W.  Mary...|
|        Øbro Mikkel | {Øbro Mikkel  -> 0}|
|         Bloomer Ed |{Messenger Chris ...|
|   Chapman Nicholas |{Myers Philip C. ...|
|  Stroeer Alexander |{Messenger Chris ...|
|        Hague J. P. |{d'Ambrumenil N. ...|
|        Abbas Afsar | {Abbas Afsar  -> 0}|
|Solel Baruch  Tec...|{Solel Baruch  Te...|
|  Sontag Eduardo D. |{Sontag Eduardo D...|
|       Wang Yu-Ming |{Li Xue-Qi

In [14]:
shortest_paths.write.mode('overwrite').parquet('../data/single-node-analysis/shortest-paths')

In [15]:
shortest_paths = shortest_paths.withColumn('vertices_in_between', f.map_keys(f.col('distances')))

In [16]:
shortest_paths.printSchema()

root
 |-- id: string (nullable = true)
 |-- distances: map (nullable = true)
 |    |-- key: string
 |    |-- value: integer (valueContainsNull = false)
 |-- vertices_in_between: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [17]:
shortest_paths.filter(f.col('id') == 'Abed S. H. ').show(truncate=False)

+-----------+-------------------------------------------------------------+----------------------------------------------+
|id         |distances                                                    |vertices_in_between                           |
+-----------+-------------------------------------------------------------+----------------------------------------------+
|Abed S. H. |{Abed S. H.  -> 0, Soleiman A.  -> 1, Youssef Nabil L.  -> 1}|[Abed S. H. , Soleiman A. , Youssef Nabil L. ]|
+-----------+-------------------------------------------------------------+----------------------------------------------+



In [18]:
shortest_paths.withColumn('vertices_in_between_exploded', f.explode(f.col('vertices_in_between'))).filter(f.col('vertices_in_between_exploded') == 'Bloomer Ed ').show()

+-------------------+--------------------+--------------------+----------------------------+
|                 id|           distances| vertices_in_between|vertices_in_between_exploded|
+-------------------+--------------------+--------------------+----------------------------+
|        Bloomer Ed |{Messenger Chris ...|[Messenger Chris ...|                 Bloomer Ed |
| Stroeer Alexander |{Messenger Chris ...|[Messenger Chris ...|                 Bloomer Ed |
|       Woan Graham |{Messenger Chris ...|[Messenger Chris ...|                 Bloomer Ed |
|   Vecchio Alberto |{Messenger Chris ...|[Messenger Chris ...|                 Bloomer Ed |
|    Toher Jennifer |{Messenger Chris ...|[Messenger Chris ...|                 Bloomer Ed |
|      Meyer Renate |{Messenger Chris ...|[Messenger Chris ...|                 Bloomer Ed |
|    Pitkin Matthew |{Messenger Chris ...|[Messenger Chris ...|                 Bloomer Ed |
|   Messenger Chris |{Messenger Chris ...|[Messenger Chris ...|       

In [19]:
betweenness_centrality = shortest_paths.withColumn('vertices_in_between_exploded', f.explode(f.col('vertices_in_between'))).groupBy('vertices_in_between_exploded').agg(
    f.count(f.col('id'))).withColumnRenamed(
        'vertices_in_between_exploded', 'author').withColumnRenamed('count(id)', 'degree')

In [20]:
betweenness_centrality.filter(f.col('author') == 'Bloomer Ed ').show()

+-----------+------+
|     author|degree|
+-----------+------+
|Bloomer Ed |    14|
+-----------+------+



In [21]:
# Verify
shortest_paths.filter(f.array_contains(f.col("vertices_in_between"), 'Bloomer Ed ')).count()

14

In [22]:
betweenness_centrality.orderBy('degree', ascending=False).show(10)

+-------------------+------+
|             author|degree|
+-------------------+------+
|       Veitch John |    14|
|    Pitkin Matthew |    14|
|     Hendry Martin |    14|
|       Woan Graham |    14|
|Christensen Nelson |    14|
|      Meyer Renate |    14|
|    Toher Jennifer |    14|
|        Bloomer Ed |    14|
|  Roever Christian |    14|
|Umstaetter Richard |    14|
+-------------------+------+
only showing top 10 rows



In [23]:
betweenness_centrality.write.mode("overwrite").parquet('../data/single-node-analysis/betweeness-centrality')

## Timespan

In [24]:
edges.limit(2).toPandas()

Unnamed: 0,src,dst,articles_count,articles_ids,articles_categories,articles_update_date
0,Iliev Bozhidar Z. Institute for Nuclear Resea...,Iliev Bozhidar Z. Institute for Nuclear Resea...,1,[0704.0066],[[hep-th]],[2007-05-23]
1,Schekochihin A. A. Oxford,Quataert E. Berkeley,1,[0704.0044],"[[astro-ph, nlin.CD, physics.plasm-ph, physics...",[2015-05-13]


In [25]:
edges.printSchema()

root
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- articles_count: long (nullable = true)
 |-- articles_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- articles_categories: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- articles_update_date: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [26]:
edges_statistics = edges.withColumn('articles_update_date_exploded', f.explode(f.col('articles_update_date')))
edges_statistics.limit(5).toPandas()

Unnamed: 0,src,dst,articles_count,articles_ids,articles_categories,articles_update_date,articles_update_date_exploded
0,Iliev Bozhidar Z. Institute for Nuclear Resea...,Iliev Bozhidar Z. Institute for Nuclear Resea...,1,[0704.0066],[[hep-th]],[2007-05-23],2007-05-23
1,Schekochihin A. A. Oxford,Quataert E. Berkeley,1,[0704.0044],"[[astro-ph, nlin.CD, physics.plasm-ph, physics...",[2015-05-13],2015-05-13
2,Schekochihin A. A. Oxford,Dorland W. Maryland,1,[0704.0044],"[[astro-ph, nlin.CD, physics.plasm-ph, physics...",[2015-05-13],2015-05-13
3,Schekochihin A. A. Oxford,Hammett G. W. Princeton,1,[0704.0044],"[[astro-ph, nlin.CD, physics.plasm-ph, physics...",[2015-05-13],2015-05-13
4,Schekochihin A. A. Oxford,Tatsuno T. Maryland,1,[0704.0044],"[[astro-ph, nlin.CD, physics.plasm-ph, physics...",[2015-05-13],2015-05-13


In [27]:
edges_statistics = edges_statistics.withColumn('articles_update_date_exploded', f.to_timestamp(f.col('articles_update_date_exploded'), format='yyyy-MM-dd'))
edges_statistics.limit(5).toPandas()

Unnamed: 0,src,dst,articles_count,articles_ids,articles_categories,articles_update_date,articles_update_date_exploded
0,Iliev Bozhidar Z. Institute for Nuclear Resea...,Iliev Bozhidar Z. Institute for Nuclear Resea...,1,[0704.0066],[[hep-th]],[2007-05-23],2007-05-23
1,Schekochihin A. A. Oxford,Quataert E. Berkeley,1,[0704.0044],"[[astro-ph, nlin.CD, physics.plasm-ph, physics...",[2015-05-13],2015-05-13
2,Schekochihin A. A. Oxford,Dorland W. Maryland,1,[0704.0044],"[[astro-ph, nlin.CD, physics.plasm-ph, physics...",[2015-05-13],2015-05-13
3,Schekochihin A. A. Oxford,Hammett G. W. Princeton,1,[0704.0044],"[[astro-ph, nlin.CD, physics.plasm-ph, physics...",[2015-05-13],2015-05-13
4,Schekochihin A. A. Oxford,Tatsuno T. Maryland,1,[0704.0044],"[[astro-ph, nlin.CD, physics.plasm-ph, physics...",[2015-05-13],2015-05-13


In [28]:
edges_statistics.printSchema()

root
 |-- src: string (nullable = true)
 |-- dst: string (nullable = true)
 |-- articles_count: long (nullable = true)
 |-- articles_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- articles_categories: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- articles_update_date: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- articles_update_date_exploded: timestamp (nullable = true)



In [29]:
window = Window().orderBy('articles_update_date_exploded').partitionBy('src')

In [30]:
timespan_data = edges_statistics.withColumn('timespan', f.year(f.last('articles_update_date_exploded').over(window)) - f.year(f.first('articles_update_date_exploded').over(window)))
timespan_data = timespan_data.select('src', 'timespan').withColumnRenamed('src', 'author').dropDuplicates()
timespan_data.sort('timespan', ascending=False).limit(5).toPandas()

Unnamed: 0,author,timespan
0,Chen Xiao-Lin,2
1,Deng Wei-Zhen,2
2,Grabec I.,2
3,Hoekstra Alfons G.,2
4,Kosel T.,2


In [31]:
timespan_data.write.mode("overwrite").parquet('../data/single-node-analysis/timespan')

## The clustering coefficient

### Global clustering

In [32]:
triangle_count = g.triangleCount()
triangle_count = triangle_count.withColumnRenamed('count', 'closed-triplets')
triangle_count.show(10, truncate=False)

                                                                                

+---------------+-------------------------+
|closed-triplets|id                       |
+---------------+-------------------------+
|0              |Abu-Shammala Wael        |
|3              |Balázs C.                |
|36             |Bakos G. A.              |
|0              |Abbas Afsar              |
|0              |Aubert B.                |
|0              |Abuhlail J. Y.           |
|1              |Abed S. H.               |
|0              |Aujla Jaspal Singh       |
|0              |Barvinsky A. O.          |
|0              |Audenaert Koenraad M. R. |
+---------------+-------------------------+
only showing top 10 rows



In [33]:
triangle_count.withColumnRenamed('id', 'author').write.mode('overwrite').parquet('../data/single-node-analysis/triangle-count')

                                                                                

In [34]:
shortest_paths = spark.read.parquet('../data/single-node-analysis/shortest-paths/*.parquet')

In [35]:
shortest_paths = shortest_paths.withColumn('distances', f.map_values(f.col('distances')))
shortest_paths.show(5)
shortest_paths = shortest_paths.withColumn('distances', f.explode(f.col('distances')))

+------------------+------------+
|                id|   distances|
+------------------+------------+
|      Abbas Afsar |         [0]|
|       Abed S. H. |   [0, 1, 1]|
|Abu-Shammala Wael |      [0, 1]|
|   Abuhlail J. Y. |      [0, 1]|
|    Assaf Michael |[1, 1, 0, 1]|
+------------------+------------+
only showing top 5 rows



In [36]:
shortest_paths = shortest_paths.filter(f.col('distances') == 2)
shortest_paths.show(5)

+-----------+---------+
|         id|distances|
+-----------+---------+
|Chen Chong |        2|
| Liu Xiang |        2|
|  Shu Zhan |        2|
|  Shu Zhan |        2|
|  Shu Zhan |        2|
+-----------+---------+
only showing top 5 rows



In [37]:
shortest_paths = shortest_paths.groupBy('id').count().withColumnRenamed('count', 'open-closed-triplets')
shortest_paths.show(5)

+------------+--------------------+
|          id|open-closed-triplets|
+------------+--------------------+
|  Liu Xiang |                   1|
| Chen Chong |                   1|
|   Shu Zhan |                   3|
|Zhu Shi-Lin |                   1|
+------------+--------------------+



In [38]:
nb_closed_triplets = triangle_count.select('closed-triplets').rdd.map(lambda x: (1, x[0])).reduceByKey(lambda y1, y2: y1 + y2).collect()[0][1]

                                                                                

In [39]:
nb_open_triplets = shortest_paths.select('open-closed-triplets').rdd.map(lambda x: (1, x[0])).reduceByKey(lambda y1, y2: y1 + y2).collect()[0][1] + nb_closed_triplets

In [40]:
clustering_coefficient = nb_closed_triplets / nb_open_triplets
clustering_coefficient

0.9968404423380727

### Local clustering

In [41]:
degree_centrality = spark.read.parquet('../data/single-node-analysis/degree-centrality/*.parquet')
degree_centrality.orderBy('degree', ascending=True).show(5)

+--------------------+------+
|              author|degree|
+--------------------+------+
|Iliev Bozhidar Z....|     2|
|        Abbas Afsar |     2|
|  Abu-Shammala Wael |     2|
|Tolstoy V. N.  IN...|     2|
| Torchinsky Alberto |     2|
+--------------------+------+
only showing top 5 rows



In [42]:
triangle_count = spark.read.parquet('../data/single-node-analysis/triangle-count/*.parquet')
triangle_count = triangle_count.withColumn('closed-triplets', 2*f.col('closed-triplets')/3)
triangle_count.orderBy('closed-triplets', ascending=False).show(5)

+---------------+-------------------+
|closed-triplets|             author|
+---------------+-------------------+
|           52.0|        Bloomer Ed |
|           52.0|       Veitch John |
|           52.0|    Toher Jennifer |
|           52.0|      Meyer Renate |
|           52.0|Christensen Nelson |
+---------------+-------------------+
only showing top 5 rows



In [43]:
local_clustering_coefficient = degree_centrality.join(triangle_count, on='author')
local_clustering_coefficient = local_clustering_coefficient.withColumn('local_clustering_coefficient', f.round(f.col('closed-triplets') / (f.col('degree') * (f.col('degree') - 1)), scale = 2))
local_clustering_coefficient.orderBy('local_clustering_coefficient', ascending=False).show(5)

+----------------+------+---------------+----------------------------+
|          author|degree|closed-triplets|local_clustering_coefficient|
+----------------+------+---------------+----------------------------+
| Toher Jennifer |    26|           52.0|                        0.08|
|  Everett M. E. |    18|           24.0|                        0.08|
|  Hendry Martin |    26|           52.0|                        0.08|
|Myers Philip C. |    12|           10.0|                        0.08|
|    Bakos G. A. |    18|           24.0|                        0.08|
+----------------+------+---------------+----------------------------+
only showing top 5 rows



In [44]:
local_clustering_coefficient.select('author', 'local_clustering_coefficient').write.mode('overwrite').parquet('../data/single-node-analysis/local_clustering_coefficient')