In [1]:
import sys
fileDir = "/home/jovyan/notebooks/"
sys.path.append(fileDir)

from utilities import *
import pyspark.sql.functions as F
import pyspark.sql.types as T
import time as time

In [2]:
session = create_spark_session("ARANGODB GitHub", SparkConnector.ARANGO)

Added dependencies: 
 ['arangodb-java-driver-shaded-7.1.0.jar', 'arangodb-spark-commons-3.3_2.12-1.5.0.jar', 'arangodb-spark-datasource-3.3_2.12-1.5.0.jar', 'commons-codec-1.11.jar', 'commons-logging-1.2.jar', 'httpclient-4.5.13.jar', 'httpcore-4.4.13.jar', 'jackson-dataformat-velocypack-4.1.0.jar', 'slf4j-api-2.0.7.jar']


In [3]:
HDFS_URL = "hdfs://namenode:9000//data-team"
PREFIX = "sample_" # "sample_" or ""
SUFFIX = "_100" # "_10" or "_100" or ""

### Reading from HDFS

In [4]:
load_start_time = time.time()
repositories_json = session.read.json(f"{HDFS_URL}/{PREFIX}repositories{SUFFIX}.json", encoding='unicode_escape') \
    .withColumnRenamed("repo_name", "repo") \

repositories_csv = session.read.csv(f"{HDFS_URL}/repo_API_data.csv", header=True, inferSchema=True, encoding='unicode_escape')
repositories_csv = repositories_csv.select("repo_name","stargazers_count","topics")

repositories = repositories_json.join(repositories_csv, repositories_json.repo == repositories_csv.repo_name, "left") \
    .select(repositories_json["repo"].alias("repo_name"), 
            repositories_json["watch_count"], repositories_csv["stargazers_count"], 
            repositories_csv["topics"])

languages = session.read.json(f"{HDFS_URL}/{PREFIX}languages{SUFFIX}.json", encoding='unicode_escape')

licences = session.read.json(f"{HDFS_URL}/{PREFIX}licences{SUFFIX}.json", encoding='unicode_escape')

commits = session.read.json(f"{HDFS_URL}/{PREFIX}commits{SUFFIX}.json", encoding='unicode_escape')

load_end_time = time.time()

load_time = (load_end_time - load_start_time)

In [5]:

preprc_start_time = time.time()

def remove_back(text):
    return text.replace("/", "::").replace(r"[^a-zA-Z0-9:]", "")

remove_udf = F.udf(remove_back, T.StringType())
repositories = repositories.withColumn("repo_name", remove_udf("repo_name"))
commits = commits.withColumn("repo", remove_udf("repo"))
licences = licences.withColumn("repo_name", remove_udf("repo_name"))
languages = languages.withColumn("repo_name", remove_udf("repo_name"))
def remove_at_and_blank(text):
    return text.replace("@", "::").replace(r"[^a-zA-Z0-9:]", "")

remove_at_and_blank = F.udf(remove_at_and_blank, T.StringType())
commits = commits.withColumn("author_email", remove_at_and_blank("author.email"))
commits = commits.withColumn("committer_email", remove_at_and_blank("committer.email"))

### Data Processing
git_commits= commits.select("commit", "subject", "message")
newColumns = ["_key", "title", "message"]
git_commits = git_commits.toDF(*newColumns)

git_repositories = repositories.withColumnRenamed("repo_name", "_key")

def remove_c_sharp(text):
    return text.replace("#", "s").replace(" ", "").replace("++", "pp")

remove_c_sharp = F.udf(remove_c_sharp, T.StringType())

git_languages = languages.withColumn("name", F.explode(languages["language.name"]))\
    .dropDuplicates(["name"])\
    .select("name")\
    .withColumnRenamed("name", "_key")

git_languages = git_languages.withColumn("_key", remove_c_sharp(git_languages["_key"]))

git_licenses = licences.select("license").withColumnRenamed(
    "license", "name").dropDuplicates(["name"]).withColumnRenamed("name", "_key")


git_contributor = commits.select("author.name",commits["author_email"].alias("email")) \
    .union(commits.select("committer.name", commits["committer_email"].alias("email"))) \
    .dropDuplicates(["name"]) \
    .select("name", "email")\
    .withColumnRenamed("email", "_key")

git_contributor = git_contributor.filter(git_contributor["_key"] != "")

edges_df = commits.select("commit", "repo")\
                .withColumnRenamed("commit", "_from")\
                .withColumnRenamed("repo", "_to")\
                .withColumn("_to", remove_udf("_to"))
edges_pd_df = edges_df.to_pandas_on_spark()
edges_pd_df["_from"] = "GitCommit/" + edges_pd_df["_from"]
edges_pd_df["_to"] = "GitRepository/" + edges_pd_df["_to"]
belongs_to_df = edges_pd_df.to_spark()
belongs_to_df = set_df_columns_nullable(session, belongs_to_df, ["_from", "_to"], False)

edges_df = commits.select("commit", "parent") \
    .withColumn("parent", F.explode(commits["parent"])) \
    .withColumnRenamed("commit", "_from")\
    .withColumnRenamed("parent", "_to")\
    .dropDuplicates(["_from", "_to"])

edges_pd_df = edges_df.to_pandas_on_spark()
edges_pd_df["_from"] = "GitCommit/" + edges_pd_df["_from"]
edges_pd_df["_to"] = "GitCommit/" + edges_pd_df["_to"]

parent_df = edges_pd_df.to_spark()
parent_df = set_df_columns_nullable(session, parent_df, ["_from", "_to"], False)

edges_df = licences.select("repo_name", "license") \
    .dropDuplicates(["repo_name", "license"]) \
    .withColumnRenamed("repo_name", "_from")\
    .withColumnRenamed("license", "_to")\
    .dropDuplicates(["_from", "_to"])
    
edges_pd_df = edges_df.to_pandas_on_spark()
edges_pd_df["_from"] = "GitRepository/" + edges_pd_df["_from"]
edges_pd_df["_to"] = "GitLicense/" + edges_pd_df["_to"]


has_df = edges_pd_df.to_spark()
has_df = set_df_columns_nullable(session, has_df, ["_from", "_to"], False)

edges_df = commits.select(commits["author_email"].alias("email"), "commit", "author.date.seconds") \
    .withColumnRenamed("seconds", "ts")
edges_df = edges_df \
    .filter(edges_df["email"] != "") \
    .withColumn("ts", edges_df["ts"].cast(T.IntegerType())) \
    .withColumnRenamed("email", "_from")\
    .withColumnRenamed("commit", "_to")\
    .dropDuplicates(["_from", "_to"])

edges_pd_df = edges_df.to_pandas_on_spark()
edges_pd_df["_from"] = "GitContributor/" + edges_pd_df["_from"]
edges_pd_df["_to"] = "GitCommit/" + edges_pd_df["_to"]


author_df = edges_pd_df.to_spark()
author_df = set_df_columns_nullable(session, author_df, ["_from", "_to"], False)

edges_df = commits.select(commits["committer_email"].alias("email"), "commit", "committer.date.seconds") \
    .withColumnRenamed("seconds", "ts")
edges_df = edges_df \
    .filter(edges_df["email"] != "") \
    .withColumn("ts", edges_df["ts"].cast(T.IntegerType())) \
    .withColumnRenamed("email", "_from")\
    .withColumnRenamed("commit", "_to")\
    .dropDuplicates(["_from", "_to"])

edges_pd_df = edges_df.to_pandas_on_spark()
edges_pd_df["_from"] = "GitContributor/" + edges_pd_df["_from"]
edges_pd_df["_to"] = "GitCommit/" + edges_pd_df["_to"]


committed_df = edges_pd_df.to_spark()
committed_df = set_df_columns_nullable(session, committed_df, ["_from", "_to"], False)

edges_df = languages.withColumn("lang", F.explode(languages["language"]))
edges_df = edges_df \
    .withColumn("language", edges_df["lang.name"]) \
    .withColumn("bytes", edges_df["lang.bytes"].cast(T.IntegerType())) \
    .select("repo_name", "language", "bytes") \
    .withColumnRenamed("repo_name", "_from")\
    .withColumnRenamed("language", "_to")\
    .dropDuplicates(["_from", "_to"])

edges_df = edges_df.withColumn("_to", remove_c_sharp(edges_df["_to"]))

edges_pd_df = edges_df.to_pandas_on_spark()
edges_pd_df["_from"] = "GitRepository/" + edges_pd_df["_from"]
edges_pd_df["_to"] = "GitLanguage/" + edges_pd_df["_to"]

writted_in_df = edges_pd_df.to_spark()
writted_in_df = set_df_columns_nullable(session, writted_in_df, ["_from", "_to"], False)


preproc_end_time = time.time()
preproc_time = (preproc_end_time - preprc_start_time)



PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_5118/919889244.py", line 13, in remove_at_and_blank
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x94 in position 50: invalid start byte


In [None]:

writing_start_time = time.time()
options = get_default_options(SparkConnector.ARANGO)
options["table.type"] = "document"
options["table"] = "GitCommit"
options["createCollection"] = "true"

spark_write(SparkConnector.ARANGO, git_commits, "Overwrite", options=options)
options = get_default_options(SparkConnector.ARANGO)
options["table.type"] = "document"
options["table"] = "GitRepository"
options["createCollection"] = "true"

spark_write(SparkConnector.ARANGO, git_repositories, "Overwrite", options=options)
options = get_default_options(SparkConnector.ARANGO)
options["table.type"] = "document"
options["table"] = "GitContributor"
options["createCollection"] = "true"

spark_write(SparkConnector.ARANGO, git_contributor, "Overwrite", options=options)
options = get_default_options(SparkConnector.ARANGO)
options["table.type"] = "document"
options["table"] = "GitLanguage"
options["createCollection"] = "true"

spark_write(SparkConnector.ARANGO, git_languages, "Overwrite", options=options)
options = get_default_options(SparkConnector.ARANGO)
options["table.type"] = "document"
options["table"] = "GitLicense"
options["createCollection"] = "true"

spark_write(SparkConnector.ARANGO, git_licenses, "Overwrite", options=options)
options = get_default_options(SparkConnector.ARANGO)
options["table.type"] = "edge"
options["table"] = "BELONGS_TO"
options["createCollection"] = "true"

spark_write(SparkConnector.ARANGO, belongs_to_df, "Overwrite", options=options)
options = get_default_options(SparkConnector.ARANGO)
options["table.type"] = "edge"
options["table"] = "PARENT"
options["createCollection"] = "true"

spark_write(SparkConnector.ARANGO, parent_df, "Overwrite", options=options)
options = get_default_options(SparkConnector.ARANGO)
options["table.type"] = "edge"
options["table"] = "HAS"
options["createCollection"] = "true"

spark_write(SparkConnector.ARANGO, has_df, "Overwrite", options=options)
options = get_default_options(SparkConnector.ARANGO)
options["table.type"] = "edge"
options["table"] = "AUTHOR"
options["createCollection"] = "true"

spark_write(SparkConnector.ARANGO, author_df, "Append", options=options)
options = get_default_options(SparkConnector.ARANGO)
options["table.type"] = "edge"
options["table"] = "COMMITTED"
options["createCollection"] = "true"

spark_write(SparkConnector.ARANGO, committed_df, "Overwrite", options=options)
options = get_default_options(SparkConnector.ARANGO)
options["table.type"] = "edge"
options["table"] = "WRITTEN_IN"
options["createCollection"] = "true"

spark_write(SparkConnector.ARANGO, writted_in_df, "Overwrite", options=options)

writing_end_time = time.time()
writing_time = (writing_end_time - writing_start_time)

In [None]:
print(f"Load time: {load_time} sec")
print(f"Preprocessing time: {preproc_time} sec")
print(f"Writing time: {writing_time} sec")

In [None]:
# Scenario 1
N = 10
start_time = time.time()
options = get_default_options(SparkConnector.ARANGO)
options["query"] = f"LET distinctValues = (\
                        FOR c IN GitContributor\
                            FOR commit IN OUTBOUND c AUTHOR\
                                FOR r IN OUTBOUND commit BELONGS_TO\
                                    RETURN DISTINCT{{c, r}})\
                    FOR d in distinctValues\
                        COLLECT contrib = d.c.name WITH COUNT INTO repo_count\
                        SORT repo_count DESC\
                        FILTER repo_count > 1\
                        LIMIT {N}\
                        RETURN {{contrib, repo_count}}"

df = spark_read(SparkConnector.ARANGO, session, options=options)
end_time = time.time()
print(f"Scenario 1: {end_time - start_time} sec")

In [None]:
# Scenario 2:

LANGUAGE = "C++"
BYTES_PERCENTAGE = 0.5

start_time = time.time()

query = f"""
    FOR repo IN GitRepository
    LET repoTotalBytes = (
    FOR lan IN OUTBOUND repo WRITTEN_IN
        LET byteInfo = (
                FOR info IN WRITTEN_IN
                FILTER info._from == repo._id AND info._to == lan._id
                RETURN info.bytes
            )
        COLLECT repository = repo._key
        AGGREGATE repoTotalBytes = SUM(byteInfo[0])
        RETURN {{repository, repoTotalBytes}}
        )
    FILTER LENGTH(repoTotalBytes) > 0 //for the repos with 0 WRITTEN_IN edges. FIX mini-batch
    
    FOR lan IN OUTBOUND repo WRITTEN_IN
        LET byteInfo = (
          FOR info IN WRITTEN_IN
            FILTER info._from == repo._id AND info._to == lan._id
            RETURN info.bytes
        )
        COLLECT repo_name = repo._key, language = lan._key, percentageOfBytes = (byteInfo[0]/repoTotalBytes[0].repoTotalBytes)
        FILTER language == "{LANGUAGE}" AND percentageOfBytes > {BYTES_PERCENTAGE}
        RETURN {{
          repo_name, 
          language,
          percentageOfBytes
        }}
        """

options = get_default_options(SparkConnector.ARANGO)
options["query"] = query

df = spark_read(SparkConnector.ARANGO, session, options=options)
end_time = time.time()
print(f"Scenario 2: {end_time - start_time} sec")

In [None]:
# Scenario 3:
REPO_NAME = "tensorflow::tensorflow"

start_time = time.time()
query = f"""
        FOR repo IN GitRepository
            FILTER repo._key == "{REPO_NAME}"
            FOR commit IN INBOUND repo BELONGS_TO
                LET parents = ( 
                FOR parent IN OUTBOUND commit PARENT
                    COLLECT comm = commit._key INTO parents
                    RETURN {{lun: length(parents), comm}}
                )
            FILTER parents[0].lun>1 AND parents[0].comm == commit._key
            COLLECT WITH COUNT INTO n_merge
            RETURN {{num_merge: n_merge}}
        """

options = get_default_options(SparkConnector.ARANGO)
options["query"] = query

df = spark_read(SparkConnector.ARANGO, session, options=options)
end_time = time.time()
print(f"Scenario 3: {end_time - start_time} sec")

In [None]:
from arango import ArangoClient

# Initialize the ArangoDB client.
client = ArangoClient(hosts='http://arangodb:8529')

# Connect to "test" database as root user.
db = client.db('_system', username='root', password='')

# Get the Pregel API wrapper.
pregel = db.pregel


# Scenario 4:
start_time = time.time()

# Start a new Pregel job.
job_id = db.pregel.create_job(
    graph='github_graph',
    algorithm='labelpropagation',
    store=False,
    max_gss=250,
    thread_count=1,
    async_mode=False,
    result_field='community'
)

# Retrieve details of a Pregel job by ID.
job = pregel.job(job_id)
query = f"""
        FOR v IN PREGEL_RESULT({job["id"]})
        RETURN {{key: v._key,
                community: v.community}}
        """

options = get_default_options(SparkConnector.ARANGO)
options["query"] = query

df = spark_read(SparkConnector.ARANGO, session, options=options)
end_time = time.time()
print(f"Scenario 4: {end_time - start_time} sec")

In [None]:


# Scenario 5: page rank contributori
start_time = time.time()
query = f"""
        FOR v IN PREGEL_RESULT({job["id"]})
        RETURN {{key: v._key,
                community: v.community}}
        """

options = get_default_options(SparkConnector.ARANGO)
options["query"] = query

df = spark_read(SparkConnector.ARANGO, session, options=options)
end_time = time.time()
print(f"Scenario 5: {end_time - start_time} sec")

In [None]:
session.sparkContext.stop()
session.stop()