In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import types
import pyspark.sql.functions  as F

In [None]:
credentials_location = "/home/de-zoom/agile-polymer-376104-a02a7ed99393.json"

conf = (
    SparkConf()
    .setMaster("local[*]")
    .setAppName("test")
    .set("spark.jars", "./lib/gcs-connector-hadoop3-latest.jar")
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true")
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)
)

In [None]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [None]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [None]:
datasets_df = spark.read.parquet("gs://pwc-de-datalake/data/processed_data/datasets.parquet")
links_papers_code_df = spark.read.parquet(
    "gs://pwc-de-datalake/data/processed_data/links_between_papers_and_code.parquet"
)
papers_df = spark.read.parquet(
    "gs://pwc-de-datalake/data/processed_data/papers_with_abstracts.parquet"
)


In [None]:
type(papers_df)

In [None]:
datasets_df.printSchema()

In [None]:
datasets_df = datasets_df.withColumn("num_papers", datasets_df["num_papers"].cast(types.IntegerType()))

In [None]:
datasets_df.show(10)

In [None]:
links_papers_code_df.printSchema()

In [None]:
links_papers_code_df.show(10)

In [None]:
papers_df.printSchema()

In [None]:
papers_df.show(10)

In [None]:
papers_framework_df = (
    papers_df.join(
        links_papers_code_df,
        papers_df["paper_url"] == links_papers_code_df["paper_url"],
        how="inner",
    )
    .filter((F.col("framework").isNotNull()) & (F.col("framework") != "none"))
    .select("title", "date", "framework", "is_official")
)


In [None]:
datasets_df.printSchema()

In [None]:
languages_datasets_df = datasets_df.select(
    "name", "introduced_date", F.explode("languages").alias("language")
)

In [None]:
papers_df.printSchema()

In [None]:
def clean_proceeding_name(proceeding_name: str):
    if not proceeding_name:
        return proceeding_name
    i = len(proceeding_name)
    while i > 0 and not proceeding_name[i-1].isalpha():
        i -= 1
    return proceeding_name[:i]

proceeding_udf = F.udf(clean_proceeding_name, returnType=types.StringType())

proceedings_papers_df = papers_df \
    .withColumn("proceeding", proceeding_udf(papers_df.proceeding)) \
    .filter(F.col("proceeding").isNotNull()) \
    .select("title", "proceeding", "date")

In [None]:
papers_df.printSchema()
datasets_df.printSchema()

In [None]:
papers_task_df = papers_df.filter(papers_df.tasks.isNotNull()).select(
    "date", F.explode("tasks").alias("task")
)
datasets_task_df = datasets_df.filter(datasets_df.tasks.isNotNull()).select(
    "name", "introduced_date", F.explode("tasks").alias("task")
)

In [None]:
papers_framework_df
languages_datasets_df
proceedings_papers_df
papers_task_df
datasets_task_df

In [None]:
papers_framework_df.show(10)

In [None]:
languages_datasets_df.show(10)

In [None]:
proceedings_papers_df.show(10)

In [None]:
papers_task_df.show(10)

In [None]:
datasets_task_df.show(10)