In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lit
import json
from pyspark.sql.functions import first

# Initialize Spark session
spark = SparkSession.builder.appName("Graph").master("spark://spark-master:7077").config("spark.executor.memory", "512m").config("spark.eventLog.enabled", "true").config("spark.eventLog.dir", "file:///opt/workspace/events").getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/06/22 09:57:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Load paper_dates data
paper_dates_df = spark.read.csv("file:///opt/workspace/tabular/paper_dates/*.csv", header=True)
paper_dates_df = paper_dates_df.withColumnRenamed("month", "publish_month")

# Load paper_location data
#paper_location_df = spark.read.csv("file:///opt/workspace/tabular/paper_location/*.csv", header=True)

# Load paper_theme data
paper_theme_df = spark.read.csv("file:///opt/workspace/tabular/paper_theme/*.csv", header=True)

# Load paper_publishers data
paper_publishers_df = spark.read.csv("file:///opt/workspace/tabular/paper_publishers/*.csv", header=True)

# Load all_papers.json data
authors_df = spark.read.json("all_papers.json")


[Stage 0:>                                                          (0 + 1) / 1]

                                                                                

[Stage 1:>                                                          (0 + 1) / 1]

                                                                                

[Stage 2:>                                                          (0 + 1) / 1]

                                                                                

In [3]:

# Join paper_dates, paper_theme, and paper_publishers on 'id'
papers_df = paper_dates_df.join(paper_theme_df, "id", "left") \
    .join(paper_publishers_df, "id", "left")

# Explode the authors array to create multiple rows for each author
authors_df = authors_df.withColumn("author", explode(col("authors"))) \
                       .select("id", col("author.name").alias("name"))

# Create CSV files for relationships
# Relationships between paper and author (posted_paper)

# Aggregate papers_df to select only one row per "id"
papers_agg_df = papers_df.groupby("id").agg(first("publish_month").alias("publish_month"), first("day_of_week").alias("day_of_week"))

# Join the aggregated papers_df with authors_df
papers_authors_relationships = papers_agg_df.join(authors_df, "id", "inner") \
    .select("id", col("name").alias("author"), "publish_month", "day_of_week")

papers_authors_relationships.coalesce(1).write.csv("dataout/posted_paper", header=True, mode="overwrite")

# Relationships between author and publisher (posted_with)
authors_publishers_relationships = authors_df.join(paper_publishers_df, "id", "inner") \
    .select("name", "publisher", "description") \
    .withColumnRenamed("name", "author")

authors_publishers_relationships.coalesce(1).write.csv("dataout/posted_with", header=True, mode="overwrite")

# Relationships between author and author (published_with)
published_with_relationships = authors_df.alias("a1").join(authors_df.alias("a2"), "id", "inner") \
    .filter(col("a1.name") < col("a2.name")) \
    .select(col("a1.name").alias("author_1"), col("a2.name").alias("author_2"))

published_with_relationships.coalesce(1).write.csv("dataout/published_with", header=True, mode="overwrite")

# Create CSV files for nodes
papers_df.select("id","theme").coalesce(1).write.csv("dataout/papers", header=True, mode="overwrite")

authors_df.select("name").distinct().withColumnRenamed("name", "author").coalesce(1).write.csv("dataout/authors", header=True, mode="overwrite")


# Save the DataFrame to a CSV file
paper_publishers_df.select("publisher", "description").coalesce(1).write.csv("dataout/publishers", header=True, mode="overwrite")



[Stage 4:>                                                          (0 + 1) / 1]

                                                                                

[Stage 7:>                                                          (0 + 1) / 1]

                                                                                

[Stage 9:>                                                          (0 + 1) / 1]

                                                                                

[Stage 10:>                                                         (0 + 1) / 1]

                                                                                

[Stage 14:>                                                         (0 + 1) / 1]                                                                                

[Stage 15:>                                                         (0 + 1) / 1]

                                                                                

In [4]:
# Stop Spark session
spark.stop()