In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructType, StructField, StringType, ArrayType)
from pyspark.sql.functions import explode, col, from_json, concat_ws, regexp_extract, when
import os
from glob import glob

# Spark setup
os.environ["JAVA_HOME"] = "/local/d1/users/kv21supo/micromamba/envs/jupyter"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

print("Starting Spark Session...")
spark = SparkSession.builder \
    .appName("Data Exploration") \
    .config("spark.driver.memory", "100g") \
    .config("spark.executor.cores", "72") \
    .config("spark.sql.shuffle.partitions", "360") \
    .config("spark.ui.enabled", "false") \
    .getOrCreate()
print("Spark Session started.")

# Load raw JSON-LD files
folderPath = "../unpacked/"
filePaths = glob(os.path.join(folderPath, "*.jsonld"))
dfRaw = spark.read.text(filePaths)
print("JSON files read successfully.")

# Schema of JSON rows
schema = StructType([
    StructField("subject", StringType(), True),
    StructField("predicate", StringType(), True),
    StructField("objects", StringType(), True)
])

# Parse JSON into struct
parsed_df = dfRaw.withColumn("jsonData", from_json(col("value"), schema))

# JSON-LD @id schema
id_schema = StructType([StructField("@id", StringType(), True)])

object_schema = ArrayType(
    StructType([
        StructField("object", StructType([
            StructField("@value", StringType(), True),
            StructField("@type", StringType(), True)
        ])),
        StructField("source", ArrayType(
            StructType([
                StructField("@id", StringType(), True),
                StructField("iHash", StringType(), True)
            ])
        ))
    ])
)

expanded_df = parsed_df.select(
    from_json(col("jsonData.subject"), id_schema).getField("@id").alias("subject"),
    from_json(col("jsonData.predicate"), id_schema).getField("@id").alias("predicate"),
    from_json(col("jsonData.objects"), object_schema).alias("objects")
)

flat_df = expanded_df \
    .withColumn("object", explode(col("objects"))) \
    .select(
        "subject",
        "predicate",
        col("object.object.@value").alias("value"),
        col("object.object.@type").alias("value_type"),
        col("object.source.@id").alias("source_id"),
        col("object.source.iHash").alias("source_iHash")
    )

flat_df.repartition(1).write.mode("overwrite").parquet("allDataParquet")

Starting Spark Session...
Spark Session started.
JSON files read successfully.


                                                                                

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, explode, regexp_extract, when, concat_ws
import os
import sys
from glob import glob

schema = StructType([
    StructField("subject", StringType(), True),
    StructField("predicate", StringType(), True),
    StructField("object", StringType(), True),
    StructField("jsonrow", StringType(), True)
])
os.environ["JAVA_HOME"] = "/local/d1/users/kv21supo/micromamba/envs/jupyter"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
sys.stdout.flush()

print("Starting Spark Session...")
spark = SparkSession.builder \
    .appName("Data Exploration") \
    .config("spark.driver.memory", "100g") \
    .config("spark.executor.cores", "72") \
    .config("spark.sql.shuffle.partitions", "72") \
    .config("spark.ui.enabled", "false") \
    .getOrCreate()
print("Spark Session started.")
print("Reading Parquet files...")

folderPath = "allDataParquet/"
filePaths = glob(os.path.join(folderPath, "*.parquet"))

dfRaw = spark.read.parquet(*filePaths)

print("Parquet files read successfully.")

flat_df_with_lang = dfRaw \
    .withColumn("source_id", explode(col("source_id"))) \
    .withColumn("lang", regexp_extract(col("source_id"), r"lang=([a-z]+)", 1))

# Step 1: Filter for predicates starting with dbpedia ontology
ontology_predicates_df = flat_df_with_lang.filter(
    (col("predicate").contains("dbpedia.org")) | (col("subject").contains("dbpedia"))
)

# Step 2: Filter for language conditions
filtered_df = ontology_predicates_df.filter(
    (col("lang") == "en") | (col("lang") == "") | (col("lang") == "commons") | (col("lang").isNull()) | (col("lang") == "simple") | (col("lang") == "new") | (col("lang") == "redirected") | (col("lang") == "batsmg")
)


fused_df = filtered_df.withColumn(
    "object",
    when(
        (col("value").isNull()) & (col("value_type").isNull()),
        None
    ).otherwise(concat_ws(":", col("value"), col("value_type")))
).select("subject", "predicate", "object") 

fused_df.repartition(1).write.mode("overwrite").parquet("dbpediaOntologyROWS")


Starting Spark Session...
Spark Session started.
Reading Parquet files...
Parquet files read successfully.


                                                                                

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, explode, regexp_extract, when, concat_ws
import os
import sys
from glob import glob

schema = StructType([
    StructField("subject", StringType(), True),
    StructField("predicate", StringType(), True),
    StructField("object", StringType(), True),
    StructField("jsonrow", StringType(), True)
])
os.environ["JAVA_HOME"] = "/local/d1/users/kv21supo/micromamba/envs/jupyter"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
sys.stdout.flush()

print("Starting Spark Session...")
spark = SparkSession.builder \
    .appName("Data Exploration") \
    .config("spark.driver.memory", "100g") \
    .config("spark.executor.cores", "72") \
    .config("spark.sql.shuffle.partitions", "72") \
    .config("spark.ui.enabled", "false") \
    .getOrCreate()
print("Spark Session started.")
print("Reading Parquet files...")

folderPath = "dbpediaOntologyROWS/"
filePaths = glob(os.path.join(folderPath, "*.parquet"))

dfRaw = spark.read.parquet(*filePaths)

print("Parquet files read successfully.")

Starting Spark Session...


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/18 20:37:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Session started.
Reading Parquet files...
Parquet files read successfully.


# Presidents Query FAILED #

In [56]:
peopleWithActiveYears = dfRaw.filter(col("predicate") == "http://dbpedia.org/ontology/activeYearsStartDate").select("subject").distinct().collect()
presidents = dfRaw.filter(col("predicate").contains("president")).select("subject").distinct().collect()

print(f"People with activeYearsStartDate: {len(peopleWithActiveYears)}")
print(f"Presidents: {len(presidents)}")

for person in peopleWithActiveYears:
    if "4vVY3" in person["subject"] or "29HeC" in person["subject"] or "y533" in person["subject"] or "B6dR" in person["subject"]:
        print(person["subject"])
        
print("---------------------------------")

for president in presidents:
    if "bill" in president["subject"] or "29HeC" in president["subject"] or "y533" in president["subject"] or "B6dR" in president["subject"]:
        print(president["subject"])

# dfRaw.filter(col("subject") == "https://global.dbpedia.org/id/4vVY3").select("predicate").distinct().show(200, truncate=False)
orderInOffice = dfRaw.filter(col("predicate") == "http://dbpedia.org/ontology/orderInOffice").select("subject").distinct().collect()

for person in orderInOffice:
    if "4vVY3" in person["subject"] or "29HeC" in person["subject"] or "y533" in person["subject"] or "B6dR" in person["subject"] or "2DcKK" in person["subject"] or "3DCUj" in person["subject"]:
        print(person["subject"])

dfRaw.filter((col("subject") == "https://global.dbpedia.org/id/4vVY3") & (col("predicate") == "http://dbpedia.org/ontology/predecessor")).show(200, truncate=False)

                                                                                

People with activeYearsStartDate: 123201
Presidents: 14990
https://global.dbpedia.org/id/129HeC
https://global.dbpedia.org/id/1y533
https://global.dbpedia.org/id/4vVY3
---------------------------------


                                                                                

https://global.dbpedia.org/id/12DcKK
https://global.dbpedia.org/id/3DCUj
https://global.dbpedia.org/id/1y533
https://global.dbpedia.org/id/B6dR
https://global.dbpedia.org/id/4vVY3
https://global.dbpedia.org/id/129HeC




+-------+---------+------+
|subject|predicate|object|
+-------+---------+------+
+-------+---------+------+



                                                                                

In [23]:
dfRaw.filter(col("subject") == "https://global.dbpedia.org/id/4n8e5").show(200, truncate=False)



+-----------------------------------+------------------------------------------------------------+----------------------------------------------------------------------------+
|subject                            |predicate                                                   |object                                                                      |
+-----------------------------------+------------------------------------------------------------+----------------------------------------------------------------------------+
|https://global.dbpedia.org/id/4n8e5|http://www.w3.org/1999/02/22-rdf-syntax-ns#type             |NULL                                                                        |
|https://global.dbpedia.org/id/4n8e5|http://www.w3.org/1999/02/22-rdf-syntax-ns#type             |NULL                                                                        |
|https://global.dbpedia.org/id/4n8e5|http://www.w3.org/1999/02/22-rdf-syntax-ns#type             |NULL                  

                                                                                

In [28]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import count
from pyspark.sql.functions import split

# Assuming dfRaw is already loaded and SparkSession is initialized

# Get cities
cities_df = dfRaw.filter(col("predicate") == "http://dbpedia.org/ontology/city") \
                 .select("subject") \
                 .distinct()

# Get population info
population_df = dfRaw.filter(col("predicate") == "http://dbpedia.org/ontology/populationTotal") \
                     .select("subject", "object") \
                     .withColumnRenamed("object", "population")

# Get population density info (replace with your correct predicate)
density_df = dfRaw.filter(col("predicate") == "http://dbpedia.org/ontology/PopulatedPlace/populationDensity") \
                  .select("subject", "object") \
                  .withColumnRenamed("object", "density")

# Join cities with population and density
result_df = cities_df.join(population_df, on="subject", how="left") \
                     .join(density_df, on="subject", how="left")

# Ensure unique population and density per subject (taking the first occurrence)
window_spec = Window.partitionBy("subject").orderBy("population", "density")
result_df_unique = result_df.withColumn("row_num", row_number().over(window_spec)) \
                            .filter(col("row_num") == 1) \
                            .drop("row_num")

# Get all labels for each subject
labels_df = dfRaw.filter(col("predicate") == "http://www.w3.org/2000/01/rdf-schema#label") \
                 .select("subject", "object") \
                 .withColumnRenamed("object", "label")

# Count occurrences of each label per subject
label_counts = labels_df.groupBy("subject", "label").agg(count("*").alias("label_count"))

# Get the most common label per subject
label_window = Window.partitionBy("subject").orderBy(col("label_count").desc())
most_common_labels = label_counts.withColumn("rn", row_number().over(label_window)) \
                                .filter(col("rn") == 1) \
                                .drop("label_count", "rn")

# Join with result_df_unique and clean population and density
resultWithLabel = result_df_unique.join(most_common_labels, on="subject", how="left") \
                                 .select(
                                     "subject",
                                     col("label"),
                                     regexp_extract(col("population"), r"(\d+)", 1).alias("total_population"),
                                     regexp_extract(col("density"), r"(\d+)", 1).alias("population_density")
                                 ).collect()

# resultWithLabel is the final DataFrame

                                                                                

In [None]:
i = 1
for row in resultWithLabel:
    if row["total_population"] is None or row["population_density"] is None:
        continue
    if int(row["total_population"]) > 5000000 and int(row["population_density"]) > 2000:
        print(row)
        i += 1
print(f"Total cities with population > 5,000,000: {i-1}")

Row(subject='https://global.dbpedia.org/id/1nhqh', label='Dhaka', total_population='12043977', population_density='8200')
Row(subject='https://global.dbpedia.org/id/1o4jN', label='Ho Chi Minh City', total_population='10380000', population_density='4097')
Row(subject='https://global.dbpedia.org/id/4ewfZ', label='Union County', total_population='536499563892', population_density='2086')
Row(subject='https://global.dbpedia.org/id/4z941', label='London', total_population='8787892', population_density='5590')
Row(subject='https://global.dbpedia.org/id/Gwvb', label='Osaka', total_population='8819416', population_density='4629')
Row(subject='https://global.dbpedia.org/id/3MisF', label='Jakarta', total_population='10075310', population_density='14464')
Row(subject='https://global.dbpedia.org/id/4qZeD', label='Санкт-Петербург', total_population='5351935', population_density='3699')
Row(subject='https://global.dbpedia.org/id/3LZos', label='Teheran', total_population='8846782', population_density

In [None]:
from pyspark.sql.functions import countDistinct

# Assuming dfRaw is already loaded and SparkSession is initialized

# Step 1: Get distinct cities
cities_df = dfRaw.filter(col("predicate") == "http://dbpedia.org/ontology/city") \
                 .select("subject") \
                 .distinct()

# Step 2: Count distinct predicates per subject
predicate_counts = dfRaw.groupBy("subject") \
                       .agg(countDistinct("predicate").alias("predicate_count"))

# Step 3: Filter for cities with more than 35 unique predicates
city_predicate_counts = cities_df.join(predicate_counts, on="subject", how="inner") \
                                 .filter(col("predicate_count") > 35) \
                                 .select("subject", "predicate_count") \
                                 .distinct()

# Step 4: Get predicates for these cities
city_predicates = dfRaw.join(city_predicate_counts, on="subject", how="inner") \
                       .select("subject", "predicate") \
                       .distinct()

# Step 5: Count how many cities each predicate appears in
predicate_city_counts = city_predicates.groupBy("predicate") \
                                      .agg(countDistinct("subject").alias("city_count"))

# Step 6: Get the total number of cities with >35 predicates
total_cities = city_predicate_counts.count()

# Step 7: Filter for predicates that appear in all such cities
common_predicates = predicate_city_counts.filter(col("city_count") == total_cities) \
                                        .select("predicate")

# Step 8: Get labels for the cities
labels_df = dfRaw.filter(col("predicate") == "http://www.w3.org/2000/01/rdf-schema#label") \
                 .select("subject", "object") \
                 .withColumnRenamed("object", "label")

# Count occurrences of each label per subject
label_counts = labels_df.groupBy("subject", "label").agg(count("*").alias("label_count"))

# Get the most common label per subject
label_window = Window.partitionBy("subject").orderBy(col("label_count").desc())
most_common_labels = label_counts.withColumn("rn", row_number().over(label_window)) \
                                .filter(col("rn") == 1) \
                                .select("subject", "label")

# Step 9: Join cities with their predicate counts and labels
result_df = city_predicate_counts.join(most_common_labels, on="subject", how="left") \
                                .select(
                                    col("subject"),
                                    col("label"),
                                    col("predicate_count")
                                ) \
                                .distinct() \
                                .orderBy(col("predicate_count").desc())

# Step 10: Collect common predicates as a list for reference
common_predicates_list = [row["predicate"] for row in common_predicates.collect()]

# Output the results
print("Cities with more than 35 unique predicates:")
result_df.show(300, truncate=False)
print("Common predicates across these cities:", common_predicates_list)

                                                                                

Cities with more than 35 unique predicates:




+------------------------------------+-------------+---------------+
|subject                             |label        |predicate_count|
+------------------------------------+-------------+---------------+
|https://global.dbpedia.org/id/KZb4  |Chicago      |51             |
|https://global.dbpedia.org/id/38Nvt |Montreal     |47             |
|https://global.dbpedia.org/id/S6yW  |Allentown    |46             |
|https://global.dbpedia.org/id/6T27  |Boston       |46             |
|https://global.dbpedia.org/id/Q5rP  |Sherbrooke   |46             |
|https://global.dbpedia.org/id/4obdh |San Francisco|46             |
|https://global.dbpedia.org/id/2dVXG |Lincoln      |45             |
|https://global.dbpedia.org/id/MaJu  |Pittsburgh   |44             |
|https://global.dbpedia.org/id/12Dp4j|Atlanta      |44             |
|https://global.dbpedia.org/id/e7ew  |San Jose     |43             |
|https://global.dbpedia.org/id/hYK6  |São Paulo    |43             |
|https://global.dbpedia.org/id/4iE

                                                                                

In [None]:
subjects_df = dfRaw.filter(col("predicate").contains("office")) \
                   .filter(col("object").contains("President of the United States")) \
                   .filter(col("object").contains("United States")) \
                   .select("subject") \
                   .distinct()

# Step 2: Get all labels for these subjects
labels_df = dfRaw.filter(col("predicate") == "http://www.w3.org/2000/01/rdf-schema#label") \
                 .select("subject", "object") \
                 .withColumnRenamed("object", "label")

# Step 3: Count occurrences of each label per subject
label_counts = labels_df.join(subjects_df, on="subject", how="inner") \
                       .groupBy("subject", "label") \
                       .agg(count("*").alias("label_count"))

# Step 4: Get the most common label per subject
label_window = Window.partitionBy("subject").orderBy(col("label_count").desc())
most_common_labels = label_counts.withColumn("rn", row_number().over(label_window)) \
                                .filter(col("rn") == 1) \
                                .select("subject", "label") \
                                .distinct()

# Step 5: Final result
result_df = most_common_labels.orderBy("subject") \
                             .select("subject", "label")

# Show up to 200 rows without truncation
result_df.show(200, truncate=False)





+----------------------------------------------------+-----------------------------------------------------+
|subject                                             |label                                                |
+----------------------------------------------------+-----------------------------------------------------+
|http://dbpedia.org/resource/Rafael_Nú༞z_(politician)|Rafael Nú༞z (politician)                             |
|https://global.dbpedia.org/id/125e9x                |Elbridge Gerry                                       |
|https://global.dbpedia.org/id/127Ror                |Daniel D. Tompkins                                   |
|https://global.dbpedia.org/id/1299go                |Gustavus Town Kirby                                  |
|https://global.dbpedia.org/id/12AtKZ                |George Washington                                    |
|https://global.dbpedia.org/id/12DcKK                |George H. W. Bush                                    |
|https://global.dbp

                                                                                

In [97]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Assuming dfRaw is already loaded and SparkSession is initialized

# Define filter conditions
is_office = F.lower(F.col("predicate")) == "http://dbpedia.org/ontology/office"
has_potus = F.lower(F.col("object")).rlike(r"president[\s-]of[\s-]the[\s-]united[\s-]states:")
not_vice = ~F.lower(F.col("object")).rlike(r"vice[\s-]*president")

# Step 1: Filter for presidents
presidents = (
    dfRaw
    .filter(is_office & has_potus & not_vice)
    .select("subject", "predicate", "object")
    .distinct()
    .filter(col("subject").contains("global.dbpedia.org/id/"))
)

# Step 2: Get distinct subjects
president_subjects = presidents.select("subject").distinct()

# Step 3: Get labels for these subjects
labels_df = dfRaw.filter(F.col("predicate") == "http://www.w3.org/2000/01/rdf-schema#label") \
                 .select("subject", "object") \
                 .withColumnRenamed("object", "label")

# Count occurrences of each label
label_counts = labels_df.join(president_subjects, on="subject", how="inner") \
                       .groupBy("subject", "label") \
                       .agg(F.count("*").alias("label_count"))

# Get the most common label per subject
label_window = Window.partitionBy("subject").orderBy(F.col("label_count").desc())
most_common_labels = label_counts.withColumn("rn", F.row_number().over(label_window)) \
                                .filter(F.col("rn") == 1) \
                                .select("subject", "label") \
                                .distinct()

# Step 4: Get termStart for these subjects
# Assuming termStart is under predicate "http://dbpedia.org/ontology/activeYearsStartDate"
term_start_df = dfRaw.filter(F.col("predicate").contains("years")) \
                     .select(
                         "subject",
                         F.regexp_extract(F.col("object"), r"(\d{4}-\d{2}-\d{2})", 1).alias("termStart")
                     ) \
                     .withColumn(
                         "termStart",
                         F.when(
                             F.col("termStart").cast("date").isNotNull(),
                             F.to_date(F.col("termStart"), "yyyy-MM-dd")
                         ).otherwise(None)  # Return NULL for invalid dates
                     )

# Step 5: Join labels and termStart
result_df = most_common_labels.join(term_start_df, on="subject", how="left") \
                             .select(
                                 F.col("subject"),
                                 F.col("label"),
                                 F.col("termStart")
                             ) \
                             .distinct() \
                             .orderBy(F.col("termStart").asc_nulls_last())  # Sort by termStart, nulls last

# Show the results
print("Presidents with their most common labels and term start dates:")
result_df.show(2000, truncate=False)

# Optionally, show the original presidents DataFrame for reference
print("Presidents with office predicate:")
presidents.show(2000, truncate=False)

Presidents with their most common labels and term start dates:


                                                                                

+------------------------------------+-----------------------------------------------------+---------+
|subject                             |label                                                |termStart|
+------------------------------------+-----------------------------------------------------+---------+
|https://global.dbpedia.org/id/4wdaw |Ulysses S. Grant presidential administration scandals|NULL     |
|https://global.dbpedia.org/id/GqBK  |Franklin Pierce                                      |NULL     |
|https://global.dbpedia.org/id/3Hyz1 |Rutherford B. Hayes                                  |NULL     |
|https://global.dbpedia.org/id/Chc7  |Harry S. Truman                                      |NULL     |
|https://global.dbpedia.org/id/E6QS  |James Madison                                        |NULL     |
|https://global.dbpedia.org/id/51mUj |Andrew Johnson                                       |NULL     |
|https://global.dbpedia.org/id/3FKYg |Herbert Hoover                     



+------------------------------------+----------------------------------+-----------------------------------------------------------------------------------------------+
|subject                             |predicate                         |object                                                                                         |
+------------------------------------+----------------------------------+-----------------------------------------------------------------------------------------------+
|https://global.dbpedia.org/id/4wTNT |http://dbpedia.org/ontology/office|President of the United States:http://www.w3.org/2001/XMLSchema#string                         |
|https://global.dbpedia.org/id/4j76J |http://dbpedia.org/ontology/office|3rdPresident of the United States:http://www.w3.org/2001/XMLSchema#string                      |
|https://global.dbpedia.org/id/4oPJd |http://dbpedia.org/ontology/office|3rdPresident of the United States:http://www.w3.org/2001/XMLSchema#string    

                                                                                

In [88]:
# Assuming dfRaw is already loaded and SparkSession is initialized

# List of president IDs
presidentIds = [
    "https://global.dbpedia.org/id/4vVY3",
    "https://global.dbpedia.org/id/1y533",
    "https://global.dbpedia.org/id/B6dR",
    "https://global.dbpedia.org/id/3DCUj",
    "https://global.dbpedia.org/id/56odt",
    "https://global.dbpedia.org/id/4x8DR",
]

# Step 1: Filter dfRaw for the specified president IDs
president_predicates = dfRaw.filter(col("subject").isin(presidentIds)) \
                           .select("subject", "predicate") \
                           .distinct()

# Step 2: Count how many presidents each predicate appears for
predicate_counts = president_predicates.groupBy("predicate") \
                                      .agg(countDistinct("subject").alias("subject_count"))

# Step 3: Filter for predicates common to all presidents
total_presidents = len(presidentIds)
common_predicates = predicate_counts.filter(col("subject_count") == total_presidents) \
                                   .select("predicate")

# Step 4: Get labels for the presidents
labels_df = dfRaw.filter(col("predicate") == "http://www.w3.org/2000/01/rdf-schema#label") \
                 .filter(col("subject").isin(presidentIds)) \
                 .select("subject", "object") \
                 .withColumnRenamed("object", "label")

# Count occurrences of each label per subject
label_counts = labels_df.groupBy("subject", "label").agg(count("*").alias("label_count"))

# Get the most common label per subject
label_window = Window.partitionBy("subject").orderBy(col("label_count").desc())
most_common_labels = label_counts.withColumn("rn", row_number().over(label_window)) \
                                .filter(col("rn") == 1) \
                                .select("subject", "label")

# Step 5: Join common predicates with president labels for context
result_df = most_common_labels.orderBy("subject") \
                             .select("subject", "label")

# Step 6: Collect common predicates as a list
common_predicates_list = [row["predicate"] for row in common_predicates.collect()]

# Output the results
print("Presidents and their most common labels:")
result_df.show(len(presidentIds), truncate=False)
print("Common predicates across all specified presidents:", common_predicates_list)

                                                                                

Presidents and their most common labels:




+-----------------------------------+-------------------------+
|subject                            |label                    |
+-----------------------------------+-------------------------+
|https://global.dbpedia.org/id/1y533|George W. Bush           |
|https://global.dbpedia.org/id/3DCUj|Ulysses S. Grant         |
|https://global.dbpedia.org/id/4vVY3|Barack Obama             |
|https://global.dbpedia.org/id/4x8DR|Franklin Delano Roosevelt|
|https://global.dbpedia.org/id/56odt|John F. Kennedy          |
|https://global.dbpedia.org/id/B6dR |Bill Clinton             |
+-----------------------------------+-------------------------+

Common predicates across all specified presidents: ['http://dbpedia.org/ontology/birthName', 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type', 'http://dbpedia.org/ontology/birthDate', 'http://dbpedia.org/ontology/award', 'http://dbpedia.org/ontology/signature', 'http://dbpedia.org/ontology/orderInOffice', 'http://dbpedia.org/ontology/vicePresident', 'http:

                                                                                

In [102]:
# List of predicates, with one entry as a list for OR condition
required_predicates = [
    'http://dbpedia.org/ontology/birthName',
    'http://www.w3.org/1999/02/22-rdf-syntax-ns#type',
    'http://dbpedia.org/ontology/birthDate',
    'http://dbpedia.org/ontology/signature',
    ['http://dbpedia.org/ontology/orderInOffice', 'http://dbpedia.org/ontology/order'],
    ['http://dbpedia.org/ontology/vicePresident', 'http://dbpedia.org/ontology/vicepresident'],
    'http://dbpedia.org/ontology/spouse',
    'http://dbpedia.org/ontology/occupation',
    'http://www.w3.org/2000/01/rdf-schema#label',
    'http://dbpedia.org/ontology/almaMater',
    'http://www.w3.org/2000/01/rdf-schema#seeAlso',
    'http://dbpedia.org/ontology/successor',
    'http://dbpedia.org/ontology/child',
    'http://dbpedia.org/ontology/birthPlace',
    'http://dbpedia.org/ontology/party',
    'http://dbpedia.org/ontology/office'
]

# Step 1: Filter subjects with all required predicates, handling OR condition
# Split predicates into single predicates and OR groups
single_predicates = [p for p in required_predicates if not isinstance(p, list)]
or_predicates = [p for p in required_predicates if isinstance(p, list)]

# Initialize predicate_subjects with subjects having single predicates
predicate_subjects = dfRaw.filter(F.col("predicate").isin(single_predicates)) \
                         .groupBy("subject") \
                         .agg(F.countDistinct("predicate").alias("single_predicate_count")) \
                         .filter(F.col("single_predicate_count") == len(single_predicates))

# Handle OR predicates (e.g., at least one of orderInOffice or order)
for or_predicate_group in or_predicates:
    predicate_subjects = predicate_subjects.join(
        dfRaw.filter(F.col("predicate").isin(or_predicate_group))
             .select("subject")
             .distinct(),
        on="subject",
        how="inner"
    )

# Step 2: Get labels for these subjects
labels_df = dfRaw.filter(F.col("predicate") == "http://www.w3.org/2000/01/rdf-schema#label") \
                 .select("subject", "object") \
                 .withColumnRenamed("object", "label")

# Step 3: Join with subjects and count occurrences of each label
label_counts = labels_df.join(predicate_subjects, on="subject", how="inner") \
                       .groupBy("subject", "label") \
                       .agg(F.count("*").alias("label_count"))

# Step 4: Get the most common label per subject
label_window = Window.partitionBy("subject").orderBy(F.col("label_count").desc())
most_common_labels = label_counts.withColumn("rn", F.row_number().over(label_window)) \
                                .filter(F.col("rn") == 1) \
                                .select("subject", "label") \
                                .distinct()

# Step 5: Final result
result_df = most_common_labels.orderBy("subject") \
                             .select("subject", "label")

# Show the results
print("Subjects with all required predicates and their most common labels:")
result_df.show(200, truncate=False)

Subjects with all required predicates and their most common labels:




+------------------------------------+-------------------------+
|subject                             |label                    |
+------------------------------------+-------------------------+
|https://global.dbpedia.org/id/123ePs|Fidel V. Ramos           |
|https://global.dbpedia.org/id/12DcKK|George H. W. Bush        |
|https://global.dbpedia.org/id/12EpJE|Jimmy Carter             |
|https://global.dbpedia.org/id/1y533 |George W. Bush           |
|https://global.dbpedia.org/id/3AMic |Woodrow Wilson           |
|https://global.dbpedia.org/id/3BASN |Boris Yeltsin            |
|https://global.dbpedia.org/id/3C6Qj |James A. Garfield        |
|https://global.dbpedia.org/id/3DCUj |Ulysses S. Grant         |
|https://global.dbpedia.org/id/3F36c |Grover Cleveland         |
|https://global.dbpedia.org/id/3FJhb |Warren G. Harding        |
|https://global.dbpedia.org/id/3FKYg |Herbert Hoover           |
|https://global.dbpedia.org/id/3Hyz1 |Rutherford B. Hayes      |
|https://global.dbpedia.o

                                                                                