In [10]:
import sys
fileDir = "/home/jovyan/notebooks/"
sys.path.append(fileDir)

from utilities import *

import pyspark.sql.functions as F
import pyspark.sql.types as T

In [11]:
session = create_spark_session("Neo4j GitHub", SparkConnector.NEO4J)

Added dependencies: 
 ['neo4j-connector-apache-spark_2.12-5.0.1_for_spark_3.jar']


In [5]:
# Scenario 1

options = get_default_options(SparkConnector.NEO4J)
options["query"] =  """
                        MATCH (contrib:GitContributor)-[:AUTHOR]->(commit:GitCommit)-[:BELONGS_TO]->(repo:GitRepository)
                        WITH contrib, COUNT(DISTINCT repo) as repo_count
                        RETURN contrib, repo_count ORDER BY repo_count DESC
                    """
top10contributors = spark_read(SparkConnector.NEO4J, session, options=options)
display(top10contributors.take(10))

Dataframe loaded from neo4j


[Row(contrib=Row(<id>=11587, <labels>=['GitContributor'], name='Ikko Ashimine', email='d41f8067726d843438db002d5555099b4901d7c1@gmail.com'), repo_count=4),
 Row(contrib=Row(<id>=1402, <labels>=['GitContributor'], name='dependabot[bot]', email='1c358da00a777d4e9898c1280ab801e2df165188@users.noreply.github.com'), repo_count=3),
 Row(contrib=Row(<id>=3485, <labels>=['GitContributor'], name='Prayag Verma', email='35a46e17bc00e93336a001ea5a30f33595fd0d03@gmail.com'), repo_count=3),
 Row(contrib=Row(<id>=5545, <labels>=['GitContributor'], name='Michaël De Boey', email='59bd0a3ff43b32849b319e645d4798d8a5d1e889@michaeldeboey.be'), repo_count=3),
 Row(contrib=Row(<id>=2824, <labels>=['GitContributor'], name='James George', email='3e5d4505bd1e679d62cbd9e85b63ce0b6e249349@gmail.com'), repo_count=3),
 Row(contrib=Row(<id>=1275, <labels>=['GitContributor'], name='Ronald Eddy Jr', email='10bf4b03df8e0eeff31b9303c48728d238ba68d1@yahoo.com'), repo_count=3),
 Row(contrib=Row(<id>=3904, <labels>=['GitCo

In [6]:
# Scenario 2:

LANGUAGE = "C++"
PERCENTAGE = 0.5
options = get_default_options(SparkConnector.NEO4J)
options["query"] = f"""
                    MATCH (r:GitRepository)-[w:WRITTED_IN]->(l:GitLanguage)
                    WITH r, SUM(w.bytes) AS totalBytesForRepo, collect({{language_name:l.name,bytes: w.bytes}}) AS bytesForLanguages
                    UNWIND bytesForLanguages AS bytesForLanguage
                    WITH r.name AS repo_name, bytesForLanguage.language_name AS lang, round((bytesForLanguage.bytes*1.0/totalBytesForRepo),2) AS percOfBytes
                        WHERE lang = "{LANGUAGE}" AND percOfBytes > {PERCENTAGE}
                    RETURN repo_name, lang, percOfBytes 
                  """
bytesPercentageInRepos = spark_read(SparkConnector.NEO4J, session, options=options)
display(bytesPercentageInRepos.take(10))

Dataframe loaded from neo4j


[Row(repo_name='tensorflow/tensorflow', lang='C++', percOfBytes=0.63)]

In [7]:
# Scenario 3:
REPO_NAME = "tensorflow/tensorflow"
options = get_default_options(SparkConnector.NEO4J)
options["query"] =  f"""
                        MATCH (repository:GitRepository {{name: "{REPO_NAME}"}})<-[:BELONGS_TO]-(commit:GitCommit), 
                            r = (commit)-[:PARENT]->()
                        WITH commit, collect(r) AS parents
                        WHERE size(parents) > 1
                        RETURN count(commit) AS mergeCount
                    """
bytesPercentageInRepos = spark_read(SparkConnector.NEO4J, session, options=options)
bytesPercentageInRepos.show(10)

Dataframe loaded from neo4j
+----------+
|mergeCount|
+----------+
|     12127|
+----------+



In [18]:
# Scenario 4:
query = """
             CALL gds.graph.project.cypher(
            'allData5',
            'MATCH (n:GitCommit) RETURN ID(n) AS id',
            'MATCH (n)-[:PARENT]->(m) RETURN ID(n) AS source, ID(m) AS target')
        """
raise Exception("Put query text in the neo4j GUI")

Exception: Put query text in the neo4j GUI

In [6]:
# Scenario 4:
options = get_default_options(SparkConnector.NEO4J)
options["query"] =  """
                        CALL gds.louvain.stream('allData')
                        YIELD nodeId, communityId, intermediateCommunityIds
                        RETURN nodeId as ID, communityId 
                    """
louvain = spark_read(SparkConnector.NEO4J, session, options=options)
louvain.write.option("header", True).mode("overwrite").csv("hdfs://namenode:9000//data-team/louvain")

Dataframe loaded from neo4j


In [19]:
# Scenario 4:
options = get_default_options(SparkConnector.NEO4J)
options["query"] =  """
                        CALL gds.wcc.stream('allData') YIELD nodeId, componentId
                        RETURN nodeId AS ID, componentId
                    """
wcc = spark_read(SparkConnector.NEO4J, session, options=options)
wcc.write.option("header", True).mode("overwrite").csv("hdfs://namenode:9000//data-team/wcc")

Dataframe loaded from neo4j


In [7]:
session.sparkContext.stop()
session.stop()