In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

cf = SparkConf()
cf.set("spark.submit.deployMode","client")
sc = SparkContext.getOrCreate(cf)

spark = SparkSession \
    .builder \
        .appName("misspellings_of_valid_values") \
            .config("spark.some.config.option", "some-value") \
                .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/08 14:32:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/08 14:32:56 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
%pip install jellyfish

Note: you may need to restart the kernel to use updated packages.


In [3]:
from pyspark.sql.functions import trim, col, row_number, monotonically_increasing_id
from pyspark.sql.window import Window

# Load the data stored in the file
df = spark.read.csv("dataset/Construction_Demolition_Registrants_20240413.csv", header=True, inferSchema=True)

# Trim the leading and trailing whitespaces
for column in df.columns:
    df = df.withColumn(column, trim(col(column)))

# Add a unique index to each row
df_with_index = df.withColumn('INDEX', row_number().over(Window.orderBy(monotonically_increasing_id())) - 1)

df_with_index.show()

24/05/08 14:33:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/08 14:33:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/08 14:33:01 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/08 14:33:01 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+----------+----------+--------------------+----------+--------------------+----------------+-----+----------+--------------+-----+----------------+----------------+--------------+---------------+-------+-----------+---------+----------+---------------+----------------+------------+---------+----------+--------------------+----+-----+
|   CREATED|BIC NUMBER|        ACCOUNT NAME|TRADE NAME|             ADDRESS|            CITY|STATE|  POSTCODE|         PHONE|EMAIL|APPLICATION TYPE|DISPOSITION DATE|EFFECTIVE DATE|EXPIRATION DATE|RENEWAL|EXPORT DATE| LATITUDE| LONGITUDE|COMMUNITY BOARD|COUNCIL DISTRICT|CENSUS TRACT|      BIN|       BBL|                 NTA|BORO|INDEX|
+----------+----------+--------------------+----------+--------------------+----------------+-----+----------+--------------+-----+----------------+----------------+--------------+---------------+-------+-----------+---------+----------+---------------+----------------+------------+---------+----------+--------------------+-

                                                                                

In [4]:
from pyspark.sql.functions import lit, collect_list, when
import jellyfish

def misspelling_detection(df, partition_columns, target_column, window_size=4, threshold=5):

    # Create a new column to flag the misspelling
    df_misspelling = df.withColumn('MISSPELLING', lit(''))

    # Remove the rows with null values in the partition columns and the target column
    df_misspelling = df_misspelling.na.drop(subset=partition_columns+[target_column])

    # Sort the dataframe by the target column
    df_misspelling = df_misspelling.sort(target_column)

    # Group the target column by the partition columns
    grouped = df_misspelling.groupBy(partition_columns).agg(collect_list(target_column).alias(target_column), collect_list('INDEX').alias('INDEX'))

    # Detect the misspelling by using the Sorted-Neighborhood algorithm
    for row in grouped.collect():
        partition = row.asDict()
        target_list = partition[target_column]
        index_list = partition['INDEX']
        for i in range(0, len(target_list) - window_size + 1, window_size - 1):
            flag = False
            temp = []
            temp.append(index_list[i])
            for j in range(i + 1, i + window_size):
                temp.append(index_list[j])
                if (
                    flag == False and
                    jellyfish.levenshtein_distance(target_list[i], target_list[j]) in range(1, threshold + 1)
                ):
                    flag = True
            if flag == True:
                temp_str = ' '.join([str(x) for x in temp])
                df_misspelling = df_misspelling.withColumn('MISSPELLING', when(col('INDEX').isin(temp), temp_str).otherwise(col('MISSPELLING')))

    # Keep only the rows that contain misspelling flags
    df_misspelling = df_misspelling.filter(col('MISSPELLING') != '').sort('MISSPELLING')

    return df_misspelling

In [5]:
# Detect misspelling
latitude_column = [column for column in df.columns if 'latitude' in column.lower()][0]
longitude_column = [column for column in df.columns if 'longitude' in column.lower()][0]
partition_columns = [latitude_column, longitude_column]
target_column = [column for column in df.columns if 'address' in column.lower() or 'street' in column.lower()][0]
df_misspelling = misspelling_detection(df_with_index, partition_columns, target_column)


df_misspelling.select(partition_columns+[target_column]+["INDEX"]).show()

24/05/08 14:33:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/08 14:33:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/08 14:33:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/08 14:33:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/08 14:33:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/08 14:33:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/08 1

+---------+----------+--------------------+-------+
| LATITUDE| LONGITUDE|             ADDRESS|  INDEX|
+---------+----------+--------------------+-------+
|40.755247|-73.900527|32-37 62ND STREET...| 100625|
|40.755247|-73.900527|32-37 62ND STREET...| 101767|
|40.755247|-73.900527|32-37 62ND STREET...| 102655|
|40.755247|-73.900527|32-37 62ND STREET...| 103838|
|40.790356|-73.795417|       16804 12Th Rd|    954|
|40.790356|-73.795417|       16804 12Th Rd|   2098|
|40.790356|-73.795417|    168-04 12TH ROAD|1244633|
|40.790356|-73.795417|    168-04 12TH ROAD|1245877|
|40.723357|-73.751186|     89-23 212 PLACE|1250364|
|40.723357|-73.751186|     89-23 212 PLACE|1251586|
|40.723357|-73.751186|   89-23 212TH PLACE|1747575|
|40.723357|-73.751186|   89-23 212TH PLACE|1748610|
|40.669153|-73.799978|   137-28 133 AVENUE|1276837|
|40.669153|-73.799978|   137-28 133 AVENUE|1274621|
|40.669153|-73.799978|   137-28 133 AVENUE|1275675|
|40.669153|-73.799978| 137-28 133RD AVENUE|1880248|
|40.791512|-

                                                                                

In [6]:
from pyspark.sql.functions import udf, concat
from pyspark.sql.types import BooleanType

def jaro_winkler_similarity(value, vocab):

    if value is None:
        return False
    if value not in vocab:
        for v in vocab:
            if jellyfish.jaro_winkler_similarity(value, v) < 0.9:
                return True
    return False

def vocabulary_misspelling_detection(df):

    # Create a new column to flag the misspelling compared to the vocabulary
    df_vocabulary_misspelling = df.withColumn('VOCAB', lit(''))

    # Define the list of columns that need to be checked
    nta_column = [column for column in df.columns if 'nta' in column.lower()][0]
    boro_column = [column for column in df.columns if 'boro' in column.lower()][0]

    # Define the list of vocabularies
    vocab_boro = ['MANHATTAN', 'BRONX', 'BROOKLYN', 'QUEENS', 'STATEN IS']
    with open('reference/NTAName.txt', 'r') as file:
        vocab_nta = file.read().splitlines()

    # Define the user-defined functions
    NTA_similarity_udf = udf(lambda x: jaro_winkler_similarity(x, vocab_nta), BooleanType())
    BORO_similarity_udf = udf(lambda x: jaro_winkler_similarity(x, vocab_boro), BooleanType())

    # Detect the vocabulary violations
    df_vocabulary_misspelling = df_vocabulary_misspelling.withColumn('VOCAB',
                                      when(col(nta_column).isNotNull() & NTA_similarity_udf(trim(col(nta_column))), 
                                           concat(col('VOCAB'), lit(','), lit(nta_column))).otherwise(col('VOCAB')))
    df_vocabulary_misspelling = df_vocabulary_misspelling.withColumn('VOCAB',
                                      when(col(boro_column).isNotNull() & BORO_similarity_udf(trim(col(boro_column))), 
                                           concat(col('VOCAB'), lit(','), lit(boro_column))).otherwise(col('VOCAB')))
    
    # Keep only the rows that contain vocabulary misspelling flags
    df_vocabulary_misspelling = df_vocabulary_misspelling.filter(col('VOCAB') != '')

    return df_vocabulary_misspelling

In [7]:
# Detect vocabulary misspelling

df_vocabulary_misspelling = vocabulary_misspelling_detection(df_with_index)

df_vocabulary_misspelling.show()

24/05/08 14:33:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/08 14:33:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/08 14:33:12 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/08 14:33:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/05/08 14:33:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 12:>                                                         (0 + 1) / 1]

+----------+----------+--------------------+----------------+--------------------+----------------+-----+----------+--------------+-----+----------------+----------------+--------------+---------------+-------+-----------+--------+---------+---------------+----------------+------------+----------+---+---+-----+------+---------+
|   CREATED|BIC NUMBER|        ACCOUNT NAME|      TRADE NAME|             ADDRESS|            CITY|STATE|  POSTCODE|         PHONE|EMAIL|APPLICATION TYPE|DISPOSITION DATE|EFFECTIVE DATE|EXPIRATION DATE|RENEWAL|EXPORT DATE|LATITUDE|LONGITUDE|COMMUNITY BOARD|COUNCIL DISTRICT|CENSUS TRACT|       BIN|BBL|NTA| BORO| INDEX|    VOCAB|
+----------+----------+--------------------+----------------+--------------------+----------------+-----+----------+--------------+-----+----------------+----------------+--------------+---------------+-------+-----------+--------+---------+---------------+----------------+------------+----------+---+---+-----+------+---------+
|06/18/201

                                                                                