In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/pubdata/acm_1995_2004.csv
/kaggle/input/pubdata/dblp_1995_2004.csv


In [2]:
!pip install pyspark 

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=76e5ce43386a9fe273179301fc757858c3f1be537a468526d65f52c404bb6e2c
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
!pip install python-levenshtein



In [4]:
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
from pyspark.sql.functions import split
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import col, concat_ws
from pyspark.sql.functions import regexp_replace, regexp_extract, collect_list, explode, udf
from pyspark.sql.types import DoubleType
from Levenshtein import distance, ratio

In [5]:
spark = SparkSession.builder.appName("pubdata").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/29 14:34:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
# Define the schema
pub_schema = StructType([
    StructField("title", StringType(), True),
    StructField("authors", StringType(), True),
    StructField("year", StringType(), True),
    StructField("journal", StringType(), True),
    StructField("index", StringType(), True),
])

In [12]:
def create_dataframe_from_file(file_path, pub_schema):
    # Define the custom delimiter
    delimiter = "\n\n"

    # Create an RDD using newAPIHadoopFile with TextInputFormat
    rdd = spark.sparkContext.newAPIHadoopFile(
        file_path,
        "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
        "org.apache.hadoop.io.LongWritable",
        "org.apache.hadoop.io.Text",
        conf={"textinputformat.record.delimiter": delimiter},
    )

    # Filter and map the RDD to extract relevant fields
    data_rdd = rdd.filter(lambda x: not x[1].startswith('#%')).map(lambda x: tuple(
        (
            next((field[2:] for field in x[1].splitlines() if field.startswith('#*')), None),  # Paper Title
            next((field[2:] for field in x[1].splitlines() if field.startswith('#@')), None),  # Authors
            next((field[2:] for field in x[1].splitlines() if field.startswith('#t')), None),  # Year
            next((field[2:] for field in x[1].splitlines() if field.startswith('#c')), None),  # Publication Venue
            next((field[6:] for field in x[1].splitlines() if field.startswith('#index')), None)  # Index ID
        )
    ))

    # Create DataFrame using the defined schema
    df = spark.createDataFrame(data_rdd, schema=pub_schema)

    return df

                                                                                

+-----+-------+----+-------+-----+
|title|authors|year|journal|index|
+-----+-------+----+-------+-----+
|NULL |NULL   |NULL|NULL   |NULL |
+-----+-------+----+-------+-----+



In [7]:
def read_csv_with_schema(spark, file_path, schema):
    """
    Read records from a CSV file using a specified schema.

    Args:
    - spark: SparkSession object
    - file_path: path to the CSV file
    - schema: schema to be applied to the DataFrame

    Returns:
    - DataFrame containing the records from the CSV file with the specified schema
    """
    # Read CSV file with schema
    df = spark.read.csv(file_path, schema=schema, header=True)

    return df

file_path_acm = "/kaggle/input/pubdata/acm_1995_2004.csv"
file_path_dblp = "/kaggle/input/pubdata/dblp_1995_2004.csv"

# Read CSV file with schema
df_acm = read_csv_with_schema(spark, file_path_acm, pub_schema)
df_dblp = read_csv_with_schema(spark, file_path_dblp, pub_schema)

# Show the DataFrame
df_acm.show()

                                                                                

+--------------------+--------------------+----+-------+--------------------+
|               title|             authors|year|journal|               index|
+--------------------+--------------------+----+-------+--------------------+
|The next database...|            Jim Gray|2004| SIGMOD|5390972920f70186a...|
|The role of crypt...|         Ueli Maurer|2004| SIGMOD|5390972920f70186a...|
|Tree logical clas...|Stelios Paparizos...|2004| SIGMOD|5390972920f70186a...|
|Adaptive stream r...|Ankur Jain, Edwar...|2004| SIGMOD|5390972920f70186a...|
|Holistic UDAFs at...|Graham Cormode, T...|2004| SIGMOD|5390972920f70186a...|
|Online eventdrive...|Huanmei Wu, Betty...|2004| SIGMOD|5390972920f70186a...|
|Using the structu...|Kristina Lerman, ...|2004| SIGMOD|5390972920f70186a...|
|FleXPath flexible...|Sihem AmerYahia, ...|2004| SIGMOD|5390972920f70186a...|
|An interactive cl...|Wensheng Wu, Clem...|2004| SIGMOD|5390972920f70186a...|
|Lazy query evalua...|Serge Abiteboul, ...|2004| SIGMOD|53909729

In [8]:
df_dblp.show()

+--------------------+--------------------+----+-------+--------------------+
|               title|             authors|year|journal|               index|
+--------------------+--------------------+----+-------+--------------------+
|An initial study ...|      Amol Deshpande|2004| SIGMOD|53e9a515b7602d970...|
|Engineering Feder...|Stefan Conrad, Wi...|1999| SIGMOD|53e9b275b7602d970...|
|Information Findi...|Tak W Yan, Hector...|1995| SIGMOD|53e9a5beb7602d970...|
|       Editors Notes|      Jennifer Widom|1995| SIGMOD|53e99800b7602d970...|
|Report on the 5th...|HansJoachim Lenz,...|2003| SIGMOD|53e9a718b7602d970...|
|       Editors Notes|            Ling Liu|2002| SIGMOD|53e99800b7602d970...|
|Report from the N...|Amit P Sheth, Dim...|1996| SIGMOD|53e99e6ab7602d970...|
|TODS Perceptions ...| Richard T Snodgrass|2002| SIGMOD|53e99ae6b7602d970...|
|SQLMED  A Status ...|Jim Melton, JanEi...|2002| SIGMOD|53e9b4d4b7602d970...|
|2003 SIGMOD Innov...| Donald D Chamberlin|2003| SIGMOD|53e9b4a5

In [None]:
def filter_and_clean_df(df):
    # Filter publications between 1995 and 2004 in VLDB and SIGMOD venues
    filtered_df = df.filter(
        (col("year").cast("int").between(1995, 2004)) &
        (col("journal").rlike("(?i)SIGMOD|VLDB"))
    )

    # Clean the journal column
    cleaned_df = filtered_df.withColumn("journal",
                                        regexp_replace(
                                            regexp_replace("journal", "(?i).*\\bVLDB\\b.*", "VLDB"),
                                            "(?i).*\\bSIGMOD\\b.*", "SIGMOD"))

    return cleaned_df

In [None]:
def remove_special_chars(df):
  pattern = r'[^\w,\s]'
  return df.withColumn("title", regexp_replace("title", pattern,'')).withColumn("authors", regexp_replace("authors", pattern,''))

In [9]:
def write_to_csv(dataframe, path):
    dataframe.repartition(1).write.format('csv').option("header", "true").option("quote", "\"").option("escape", "\"").save(path, mode='overwrite')

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, count
from pyspark.sql.window import Window

def find_duplicates(df):
    # Convert titles, authors, and year to lowercase for case-insensitive comparison
    df = df.withColumn("lower_title", lower(col("title"))) \
           .withColumn("lower_authors", lower(col("authors"))) 
    
    # Find duplicates using window function
    windowSpec = Window.partitionBy("lower_title", "lower_authors", "year", "index")
    df_with_count = df.withColumn("count", count("*").over(windowSpec))
    
    # Filter only rows with count > 1, indicating duplicates
    duplicates_df = df_with_count.filter(col("count") > 1).orderBy(col("count").desc()) \
                                 .drop("lower_title", "lower_authors")
    
    # Remove duplicates from original DataFrame
    df_unique = df_with_count.filter(col("count") == 1).drop("count", "lower_title", "lower_authors")
    
    return duplicates_df, df_unique

duplicates, unique_records = find_duplicates(df_acm)

duplicates.show(truncate=False)
#unique_records.show()

In [33]:
def levenshtein_ratio(s1, s2):
    return ratio(s1, s2)

# Register the UDF to calculate Levenshtein ratio
levenshtein_ratio_udf = udf(levenshtein_ratio, DoubleType())

def find_duplicates_for_ground_truth_lv(df1, df2, threshold=0.5):
    # Select only the "title" and "authors" columns from df1 and alias them
    df1_subset = df1.select(col("title").alias("title_1"), col("authors").alias("authors_1"))

    # Select only the "title" and "authors" columns from df2 and alias them
    df2_subset = df2.select(col("title").alias("title_2"), col("authors").alias("authors_2"))

    # Join the subsets of df1 and df2 containing "title" and "authors" columns
    joined_df = df1_subset.crossJoin(df2_subset)

    # Calculate the similarity score using Levenshtein ratio on "title" and "authors"
    similarity_df = joined_df.withColumn("Title_Similarity", levenshtein_ratio_udf(joined_df["title_1"], joined_df["title_2"])) \
                             .withColumn("Authors_Similarity", levenshtein_ratio_udf(joined_df["authors_1"], joined_df["authors_2"]))

    # Filter the DataFrame to keep only the pairs with similarity scores greater than or equal to the specified threshold
    duplicates_df = similarity_df.filter((similarity_df["Title_Similarity"] >= threshold) & (similarity_df["Authors_Similarity"] >= threshold))

    # Select and rename the relevant columns
    duplicates_df = duplicates_df.select(duplicates_df["title_1"].alias("Title1"),
                                         duplicates_df["title_2"].alias("Title2"),
                                         duplicates_df["authors_1"].alias("Authors1"),
                                         duplicates_df["authors_2"].alias("Authors2"),
                                         duplicates_df["Title_Similarity"],
                                         duplicates_df["Authors_Similarity"])

    return duplicates_df


In [15]:
def jaccard_similarity_case_insensitive(str1, str2):
    if str1 is None or str2 is None:
        return 0.0
    set1 = set(str1.lower().split())
    set2 = set(str2.lower().split())

    intersection = len(set1.intersection(set2))
    union = len(set1.union(set2))

    similarity = intersection / union if union != 0 else 0
    return similarity

In [16]:
# Register the UDF to calculate Jaccard similarity
jaccard_similarity_udf = udf(jaccard_similarity_case_insensitive, DoubleType())

def find_duplicates_for_ground_truth_js(df1, df2, threshold=0.5):
    # Select only the "title" and "authors" columns from df1 and alias them
    df1_subset = df1.select(col("title").alias("title_1"), col("authors").alias("authors_1"))

    # Select only the "title" and "authors" columns from df2 and alias them
    df2_subset = df2.select(col("title").alias("title_2"), col("authors").alias("authors_2"))

    # Join the subsets of df1 and df2 containing "title" and "authors" columns
    joined_df = df1_subset.crossJoin(df2_subset)

    # Calculate the similarity score using Jaccard similarity on "title" and "authors"
    similarity_df = joined_df.withColumn("Title_Similarity", jaccard_similarity_udf(joined_df["title_1"], joined_df["title_2"])) \
                             .withColumn("Authors_Similarity", jaccard_similarity_udf(joined_df["authors_1"], joined_df["authors_2"]))

    # Filter the DataFrame to keep only the pairs with similarity scores greater than or equal to the specified threshold
    duplicates_df = similarity_df.filter((similarity_df["Title_Similarity"] >= threshold) & (similarity_df["Authors_Similarity"] >= threshold))

    # Select and rename the relevant columns
    duplicates_df = duplicates_df.select(duplicates_df["title_1"].alias("Title1"),
                                         duplicates_df["title_2"].alias("Title2"),
                                         duplicates_df["authors_1"].alias("Authors1"),
                                         duplicates_df["authors_2"].alias("Authors2"),
                                         duplicates_df["Title_Similarity"],
                                         duplicates_df["Authors_Similarity"])

    return duplicates_df

In [None]:
df_dblp = create_dataframe_from_file(file_path_dblp, pub_schema)
df_acm = create_dataframe_from_file(file_path_acm, pub_schema)

In [None]:
df_dblp_clean = remove_special_chars(filter_and_clean_df(df_dblp))
df_acm_clean = remove_special_chars(filter_and_clean_df(df_acm))

In [17]:
duplicates_find_naive = find_duplicates_for_ground_truth_js(df_acm, df_dblp)

In [13]:
!ls /kaggle/

input  lib  working


In [18]:
write_to_csv(duplicates_find_naive, "dublicates_js_1995_2004.csv")

24/01/29 14:38:15 WARN ExtractPythonUDFFromJoinCondition: The join condition:((jaccard_similarity_case_insensitive(title_1#125, title_2#129)#137 >= 0.5) AND (jaccard_similarity_case_insensitive(authors_1#126, authors_2#130)#144 >= 0.5)) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.
                                                                                