In [0]:
df = spark.read.format("csv").option("header", True).load("dbfs:/FileStore/datasets/authors_publications.csv")
df.show(5)

In [0]:
from pyspark.sql import functions as F

# Example: Filter rows where 'publication_year' >= 2020 and aggregate count by 'author'
filtered_df = df.filter(F.col("Year") >= 2020)
agg_df = filtered_df.groupBy("Authors").agg(F.count("*").alias("publication_count"))
display(agg_df)

In [0]:
from pyspark.sql import functions as F

# ------------------------------
# Data Quality Validation
# ------------------------------

# Define validation rules:
# 1. 'Authors' must not be null or empty
# 2. 'Year' must be a valid integer and >= 2020
validation_expr = (
    (F.col("Authors").isNull()) |
    (F.trim(F.col("Authors")) == "") |
    (F.col("Year").isNull()) |
    ((F.col("Year") < 2020) & F.col("Year").isNotNull())
)

# Capture rows that violate any rule
invalid_df = df.filter(validation_expr)

# If any invalid rows exist, raise an error with details
invalid_count = invalid_df.count()
if invalid_count == 0:
    raise ValueError(
        f"Data quality validation failed: {invalid_count} row(s) do not meet the criteria."
    )

# ------------------------------
# Continue with clean data processing
# ------------------------------

# Filter rows that passed validation (i.e., Year >= 2020 and nonâ€‘null Authors)
clean_df = df.filter(~validation_expr)

# Example aggregation: count publications per author
agg_df = (
    clean_df.groupBy("Authors")
            .agg(F.count("*").alias("publication_count"))
)

display(agg_df)