In [1]:
import pyspark
from pyspark.sql import (
    SparkSession,
    functions as F,
    types as T
)
from pyspark.storagelevel import StorageLevel

In [2]:
spark = (
    SparkSession.builder
    .remote('sc://localhost:15002')
    .appName("testing_app")
    .getOrCreate()
)

In [13]:
(
    spark.read
    .parquet("s3://cpinney/p1/output/part-00004-b5bc2265-a632-4d9d-853f-93b4744a939d-c000.snappy.parquet")
    .createOrReplaceTempView("output1")
)

(
    spark.read
    .parquet("s3://cpinney/p1/output/part-00003-b5bc2265-a632-4d9d-853f-93b4744a939d-c000.snappy.parquet")
    .createOrReplaceTempView("output2")
)

(
    spark.read
    .parquet("s3://cpinney/p1/output/part-00002-b5bc2265-a632-4d9d-853f-93b4744a939d-c000.snappy.parquet")
    .createOrReplaceTempView("output3")
)

(
    spark.read
    .parquet("s3://cpinney/p1/output/part-00001-b5bc2265-a632-4d9d-853f-93b4744a939d-c000.snappy.parquet")
    .createOrReplaceTempView("output4")
)

(
    spark.read
    .parquet("s3://cpinney/p1/output/part-00000-b5bc2265-a632-4d9d-853f-93b4744a939d-c000.snappy.parquet")
    .createOrReplaceTempView("output5")
)

In [14]:
spark.table("output1").union(spark.table("output2")).createOrReplaceTempView("o")
spark.table("output3").union(spark.table("output4")).createOrReplaceTempView("u")
spark.table("o").union(spark.table("u")).createOrReplaceTempView("t")
spark.table("t").union(spark.table("output5")).createOrReplaceTempView("yay")

In [15]:
spark.table("yay").count()

192093553

In [4]:
spark.table("output").count()

38418708

In [3]:
(
    spark.read
    .parquet("s3://bsu-c535-fall2024-commons/arjun-workspace/linktarget/")
    .createOrReplaceTempView("linktarget")
)

(
    spark.read
    .parquet("s3://bsu-c535-fall2024-commons/arjun-workspace/page/")
    .createOrReplaceTempView("page")
)

(
    spark.read
    .parquet("s3://bsu-c535-fall2024-commons/arjun-workspace/pagelinks/")
    .createOrReplaceTempView("pagelinks")
)

(
    spark.read
    .parquet("s3://bsu-c535-fall2024-commons/arjun-workspace/redirect/")
    .createOrReplaceTempView("redirect")
)

In [10]:
# connect pages and linktarget ids
# (page_id | page_title | linktarget_id | redirect? | namespace)
(
    spark.table("page")
    .join(
        spark.table("linktarget"),
        F.expr("lt_title = page_title and lt_namespace = page_namespace"),
        "inner"
    )
    .selectExpr("page_id", "page_title", "lt_id", "page_is_redirect as redirect", "page_namespace")
    .createOrReplaceTempView("page_with_link_ids")
)

# connect redirect source pages with corresponding destination pages 
# (rd_src_page_id | rd_dst_dst_page_id | namespace)
(
    spark.table("page_with_link_ids")
    .filter("redirect = false") # don't want redirects that link to redirects
    .join(
        spark.table("redirect"),
        F.expr("rd_title = page_title and rd_namespace = page_namespace"),
        "inner"
    )
    .selectExpr("rd_from", "page_id as rd_dst", "page_namespace as rd_namespace")
    .createOrReplaceTempView("redirect_pages")
)

# connect the destination page of a redirect to the linktarget id of the redirect source,
# this means pages that link to redirect linktarget ids in the pagelinks table will be linked
# with the page id of the redirect destination rather than the source
# (rd_dst_page_id | rd_src_linktarget_id | namespace)
(
    spark.table("page_with_link_ids")
    .filter("redirect = true") # get all redirects
    .join(
        spark.table("redirect_pages"),
        F.expr("page_id = rd_from and page_namespace = rd_namespace"),
        "inner"
    )
    .selectExpr("rd_dst as page_id", "lt_id", "rd_namespace as page_namespace")
    .createOrReplaceTempView("redirect_with_link_ids")
)

# union together non-redirects and redirects
# (page_id | linktarget_id | namespace)
# (rd_dst_page_id | rd_src_linktarget_id | namespace)
(
    spark.table("page_with_link_ids")
    .filter("redirect = false") # non-redirects
    .select("page_id", "lt_id", "page_namespace")
    .union(spark.table("redirect_with_link_ids")) # redirects
    .select("page_id", "lt_id", "page_namespace")
    .createOrReplaceTempView("all_pages_with_links")
)

# get pagelinks from source to destination and create pairs
(
    spark.table("all_pages_with_links")
    .join(
        spark.table("pagelinks"),
        F.expr("pl_target_id = lt_id and pl_from_namespace = page_namespace"),
        "inner"
    )
    .selectExpr("pl_from as page_a", "page_id as page_b")
    .filter(F.expr("page_a != page_b")) # don't want pages that link to themselves
    .withColumn("pair", F.sort_array(F.array(F.col("page_a"), F.col("page_b")))) # get pairs
    .repartition("pair") # repartition to speed aggregation up
    .groupBy("pair") 
    .agg(F.count("*").alias("count"))
    .filter(F.expr("count > 1")) # get rows that are duplicated, those are the pairs we want
    .withColumn("page_a", F.col("pair")[0]) # get page_a and page_b cols from pairs
    .withColumn("page_b", F.col("pair")[1])
    .drop("pair") 
    .select("page_a", "page_b")
    .repartition(5) # repartition into 5 parquet files instead of a bunch of small ones
#     .write.mode("OVERWRITE").parquet(os.environ['PAGE_PAIRS_OUTPUT'])
    .createOrReplaceTempView("final")
)

In [11]:
spark.table("final").count()

192093553

## check with synthetic dataset

In [3]:
(
    spark.read
    .csv("s3://cpinney/synthetic/page.txt",header=False)
    .createOrReplaceTempView("p")
)

(
    spark.read
    .csv("s3://cpinney/synthetic/linktarget.txt",header=False)
    .createOrReplaceTempView("lt")
)

(
    spark.read
    .csv("s3://cpinney/synthetic/redirect.txt",header=False)
    .createOrReplaceTempView("rd")
)

(
    spark.read
    .csv("s3://cpinney/synthetic/pagelinks.txt",header=False)
    .createOrReplaceTempView("pl")
)

In [6]:
(
    spark.table("p")
    .withColumnRenamed("_c0", "page_id")
    .withColumnRenamed("_c1", "page_title")
    .withColumnRenamed("_c2", "page_namespace")
    .withColumnRenamed("_c3", "redirect")
    .createOrReplaceTempView("p")
)

(
    spark.table("lt")
    .withColumnRenamed("_c0", "lt_id")
    .withColumnRenamed("_c1", "lt_title")
    .withColumnRenamed("_c2", "lt_namespace")
    .createOrReplaceTempView("lt")
)

(
    spark.table("rd")
    .withColumnRenamed("_c0", "rd_from")
    .withColumnRenamed("_c1", "rd_title")
    .withColumnRenamed("_c2", "rd_namespace")
    .createOrReplaceTempView("rd")
)

(
    spark.table("pl")
    .withColumnRenamed("_c0", "pl_from")
    .withColumnRenamed("_c1", "pl_target_id")
    .createOrReplaceTempView("pl")
)

In [7]:
# connect pages and linktarget ids
(
    spark.table("p")
    .join(
        spark.table("lt"),
        F.expr("lt_title = page_title and lt_namespace = page_namespace"),
        "inner"
    )
    .select("page_id", "page_title", "lt_id", "redirect", "page_namespace")
    .createOrReplaceTempView("new_p")
)

# connect redirect src page_id with redirect dst page_id
(
    spark.table("new_p")
    .filter("redirect = false") # don't want redirects that link to redirects
    .join(
        spark.table("rd"),
        F.expr("rd_title = page_title and rd_namespace = page_namespace"),
        "inner"
    )
    .selectExpr("rd_from", "page_id as rd_dst")
#     .filter("page_id != pid")
#     .drop("pid")
    .createOrReplaceTempView("new_rd")
)

# connect page_id of redirect dst page_id to the linktarget id of the redirect src
(
    spark.table("new_p")
    .filter("redirect = true")
    .join(
        spark.table("new_rd"),
        F.expr("page_id = rd_from"),
        "inner"
    )
    .selectExpr("rd_dst as page_id", "lt_id")
    .createOrReplaceTempView("prd")
)

# union together all src page_ids to corresponding linktarget ids
(
    spark.table("new_p")
    .filter("redirect = false")
    .select("page_id", "lt_id")
    .union(spark.table("prd"))
    .select("page_id", "lt_id")
    .createOrReplaceTempView("p_w_l")
)

# get pagelinks from src to dst and create pairs
(
    spark.table("p_w_l")
    .join(
        spark.table("pl"),
        F.expr("pl_target_id = lt_id"),
        "inner"
    )
    .selectExpr("pl_from as page_a", "page_id as page_b")
    .filter(F.expr("page_a != page_b"))
    .withColumn("pair", F.sort_array(F.array(F.col("page_a"), F.col("page_b"))))
    .groupBy("pair")
    .agg(F.count("*").alias("count"))
    .filter(F.expr("count > 1"))
    .withColumn("page_a", F.col("pair")[0])
    .withColumn("page_b", F.col("pair")[1])
    .drop("pair")
    .select("page_a", "page_b")
    .createOrReplaceTempView("final_s")
)

In [8]:
spark.table("final_s").show()

+------+------+
|page_a|page_b|
+------+------+
|     1|     2|
|     2|     7|
|     8|     9|
+------+------+

