In [5]:
from pyspark.sql import SparkSession
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
print(os.getenv("JAVA_HOME"))
# Create SparkSession
spark = SparkSession.builder\
    .config("spark.driver.memory", "16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.0.0,org.neo4j:neo4j-connector-apache-spark_2.12:4.1.2_for_spark_3")\
    .getOrCreate()

/usr/lib/jvm/java-11-openjdk-amd64


In [6]:
from pyspark import StorageLevel
articles_abstracts = spark.read.format("org.neo4j.spark.DataSource")\
  .option("url", "bolt://192.168.0.178:7687")\
  .option("authentication.basic.username", os.environ["NEO4J_LOGIN"])\
  .option("authentication.basic.password", os.environ["NEO4J_PASSWORD"]).option("query", "MATCH (n:Article) where n.language =\"en\" WITH n RETURN n.id as id,n.year as year,n.title +'. '+ n.abstract as text")\
  .option("partitions", "4")\
  .load()
articles_abstracts.persist(StorageLevel.DISK_ONLY)

DataFrame[id: bigint, year: bigint, text: string]

In [7]:
from sparknlp.annotator import PerceptronModel,SentenceDetector,Tokenizer,Stemmer,Normalizer,StopWordsCleaner
from sparknlp.base import DocumentAssembler,Pipeline,LightPipeline

document_assembler = DocumentAssembler() \
  .setInputCol("text") \
  .setOutputCol("document")

sentence = SentenceDetector() \
    .setInputCols("document") \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

stemmer = Stemmer() \
  .setInputCols(["token"]) \
  .setOutputCol("stem")

norm = Normalizer()\
  .setInputCols(["token"])\
  .setOutputCol("normalized")\
  .setLowercase(True)

stops = StopWordsCleaner.pretrained()\
  .setInputCols("normalized")\
  .setOutputCol("cleanedStem")


stem_pipeline = Pipeline(stages=[
  document_assembler,
  sentence,
  tokenizer,
  stemmer,
  norm,
  stops,
])

empty_df = spark.createDataFrame([[""]]).toDF('text')
stem_model = stem_pipeline.fit(empty_df)


stopwords_en download started this may take some time.
Approximate size to download 2.9 KB
[ | ]stopwords_en download started this may take some time.
Approximate size to download 2.9 KB
Download done! Loading the resource.
[OK!]


In [8]:
with_stems = stem_model.transform(articles_abstracts).select("id","year","cleanedStem.result")#,F.col("normalized.result"))
with_stems.persist(StorageLevel.DISK_ONLY)

DataFrame[id: bigint, year: bigint, result: array<string>]

### create nodes

In [None]:
distinct_stems = with_stems.select(F.explode("result").alias("result")).distinct()

distinct_stems.write.format("org.neo4j.spark.DataSource")\
.option("url", "bolt://192.168.0.178:7687")\
.option("authentication.basic.username", os.environ["NEO4J_LOGIN"])\
.option("authentication.basic.password", os.environ["NEO4J_PASSWORD"])\
.option("labels", ":Stem")\
.option("node.keys", "result:word")\
.mode("Overwrite")\
.option("transaction.retries", 5)\
.option("transaction.retry.timeout", 5)\
.save()

                                                                                

### create edges to articles

In [None]:
article_stem_relation = with_stems.select("id",F.explode("result").alias("result")).groupBy("id","result").count()

In [None]:
article_stem_relation.write.format("org.neo4j.spark.DataSource")\
.option("url", "bolt://192.168.0.178:7687")\
.option("authentication.basic.username", os.environ["NEO4J_LOGIN"])\
.option("authentication.basic.password", os.environ["NEO4J_PASSWORD"])\
.option("relationship", "CONTAINS")\
.option("relationship.save.strategy", "keys")\
.option("relationship.properties", "count:weight")\
.option("relationship.source.labels", ":Article")\
.option("relationship.source.save.mode", "Match")\
.option("relationship.source.node.keys", "id:id")\
.option("relationship.target.labels", ":Stem")\
.option("relationship.target.node.keys", "result:word")\
.option("relationship.target.save.mode", "Match")\
.option("transaction.retries", 5)\
.option("transaction.retry.timeout", 5)\
.mode("Append")\
.save()

                                                                                

### Create edges between stems

In [21]:


from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType,StructType,StructField,StringType

def get_sliding_window(window_size:int):
    def window(arr:list):
        arrlen = len(arr)
        size = min(arrlen*(arrlen-1)//2,window_size*(window_size-1)//2) + max(0,(arrlen-window_size))*(window_size-1)
        result = [None] * size
        index = 0
        for i in range(size):
            for j in range(i+1,min(i+window_size,arrlen)):
                result[index] = (arr[i],arr[j]) if (arr[i]>arr[j]) else (arr[j],arr[i])
                index+=1
        return result
    return F.udf(window,ArrayType(StructType([StructField("first",StringType(),False),StructField("second",StringType(),False)])))

mySlidingPairs2 = get_sliding_window(7)
coocurence = with_stems.select("year",F.explode(mySlidingPairs2("result")
    ).alias("result")).select("year","result.first","result.second"
    ).filter(F.col("first")!=F.col("second")).groupBy("year","first","second").count(    
    ).select("year","first","second",F.col("count").alias("weight"),(1/F.col("count")).alias("inverseWeight")
    ).filter(F.col("weight")>16)
coocurence.persist(StorageLevel.DISK_ONLY)

DataFrame[year: bigint, first: string, second: string, weight: bigint, inverseWeight: double]

In [22]:
coocurence.head(1)

                                                                                

[Row(year=2015, first='result', second='bound', weight=69, inverseWeight=0.014492753623188406)]

In [23]:
years = [row["year"] for row in articles_abstracts.select("year").distinct().collect()]

In [24]:
import networkx as nx
frames = {year:coocurence.filter(F.col("year")==year).select("first","second","weight","inverseWeight") for year in years}
graphs = {year: nx.from_pandas_edgelist(frame.toPandas(),"first","second",["weight","inverseWeight"]) for year,frame in frames.items()}

                                                                                

In [25]:
betweenneess ={year: nx.betweenness_centrality(graph, normalized=False, weight ='inverseWeight') for year,graph in graphs.items()}


In [30]:
from pyspark.sql.types import StructType,StructField,StringType,DoubleType
dist_type = StructType([StructField("word",StringType(),False),StructField("betweenneess",DoubleType(),False)])
for year,value in betweenneess.items():
    df = spark.createDataFrame(value.items(), dist_type).withColumn('year', F.lit(year))

    df.write.format("org.neo4j.spark.DataSource")\
    .option("url", "bolt://192.168.0.178:7687")\
    .option("authentication.basic.username", os.environ["NEO4J_LOGIN"])\
    .option("authentication.basic.password", os.environ["NEO4J_PASSWORD"])\
    .option("relationship", "BETWEENNESS")\
    .option("relationship.save.strategy", "keys")\
    .option("relationship.properties", "betweenneess:value")\
    .option("relationship.source.labels", ":Stem")\
    .option("relationship.source.save.mode", "Match")\
    .option("relationship.source.node.keys", "word:word")\
    .option("relationship.target.labels", ":Year")\
    .option("relationship.target.node.keys", "year")\
    .option("relationship.target.save.mode", "Match")\
    .option("transaction.retries", 5)\
    .option("transaction.retry.timeout", 5)\
    .mode("Append")\
    .save()


22/06/25 06:49:00 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 2906340 ms exceeds timeout 120000 ms
22/06/25 06:49:00 WARN SparkContext: Killing executors is not supported by current scheduler.


In [27]:
import distinctivenessz.distinctiveness.dc as dd
diversity ={year: [(k,float(v)) for k,v in dd.distinctiveness(graph, normalize = False, alpha = 1,measures=["D2"])["D2"].items()] for year,graph in graphs.items() if len(graph.nodes)>3}

{'D2': {'results': 38.520596740329545, 'algorithm': 64.84753656420158, 'mimo': 10.072901707526299, 'fading': 10.223185702975695, 'proof': 1.644860439410578, 'logic': 14.827649364645882, 'parallel': 24.3100482950842, 'design': 8.887428694138206, 'network': 47.22487123518382, 'channel': 112.96258751644336, 'problem': 83.70767187550216, 'general': 2.267795997267116, 'transmission': 8.44983333547382, 'receiver': 7.18848193702355, 'information': 41.56473402411295, 'model': 23.951104459407926, 'coding': 27.746468517862567, 'systems': 7.851783478657298, 'channels': 25.17704651985793, 'broadcast': 1.9365546808285399, 'proposed': 14.921295221917742, 'number': 36.313643877053245, 'properties': 1.7246444537463632, 'codes': 64.48225697746963, 'users': 5.266678785161003, 'system': 16.18119136530903, 'scheme': 11.81244563821423, 'time': 23.257704604625392, 'study': 7.0646079734686005, 'paper': 79.8147564099423, 'wireless': 10.928541038362898, 'communication': 3.248722116319799, 'nash': 6.51563292883

In [29]:
from pyspark.sql.types import StructType,StructField,StringType,DoubleType
dist_type = StructType([StructField("word",StringType(),False),StructField("distinctiveness",DoubleType(),False)])
for year,value in diversity.items():
    df = spark.createDataFrame(value, dist_type).withColumn('year', F.lit(year))

    df.write.format("org.neo4j.spark.DataSource")\
    .option("url", "bolt://192.168.0.178:7687")\
    .option("authentication.basic.username", os.environ["NEO4J_LOGIN"])\
    .option("authentication.basic.password", os.environ["NEO4J_PASSWORD"])\
    .option("relationship", "DISTINCTIVENESS")\
    .option("relationship.save.strategy", "keys")\
    .option("relationship.properties", "distinctiveness:value")\
    .option("relationship.source.labels", ":Stem")\
    .option("relationship.source.save.mode", "Match")\
    .option("relationship.source.node.keys", "word:word")\
    .option("relationship.target.labels", ":Year")\
    .option("relationship.target.node.keys", "year")\
    .option("relationship.target.save.mode", "Match")\
    .option("transaction.retries", 5)\
    .option("transaction.retry.timeout", 5)\
    .mode("Append")\
    .save()