Set up Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, explode, lit
from pyspark.sql.types import ArrayType, StringType, IntegerType

spark = SparkSession.builder \
    .appName("ShinglingDocuments") \
    .getOrCreate()

Load documents and adjust content

In [None]:
def load_documents(file_paths):
    data = []
    for idx, file_path in enumerate(file_paths):
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
            data.append((idx, text))
    return spark.createDataFrame(data, ["doc_id", "content"])

file_paths = [
    "brd_grundgesetz_63_2019-04-03.txt",
]

documents_df = load_documents(file_paths)

def preprocess_text(text):
    text = text.replace("\n", " ")
    text = text.replace("\t", " ")
    text = text.replace("\r", " ") 
    text = text.replace("- ", "")

    return text



preprocess the text

In [None]:
preprocess_udf = udf(preprocess_text, StringType())

def generate_shingles(text, k):
    shingles = [text[i:i+k] for i in range(len(text) - k + 1)]
    return list(set(shingles)) 

shingle_udf = udf(lambda text, k: generate_shingles(text, k), ArrayType(StringType()))
documents_df = documents_df.withColumn("cleaned_content", preprocess_udf(col("content")))

Generate shingles

In [None]:
k_values = [5, 9]
results = []

for k in k_values:
    shingle_df = documents_df.withColumn("shingles", shingle_udf(col("cleaned_content"), lit(k)))
    distinct_shingles_df = shingle_df.withColumn("shingle", explode(col("shingles"))) \
                                     .groupBy("doc_id") \
                                     .agg({"shingle": "count"}) \
                                     .withColumnRenamed("count(shingle)", f"unique_shingles_k{k}")
    results.append(distinct_shingles_df)

final_df = results[0]
for result_df in results[1:]:
    final_df = final_df.join(result_df, on="doc_id")

Show Grundgesetz Shingles

In [None]:
grundgesetz_doc_id = 0
final_df = final_df.withColumn("is_grundgesetz", col("doc_id") == grundgesetz_doc_id)
final_df.show(truncate=False)