In [2]:
%run ./local_dev/spark_setup.ipynb

Using Databricks Connect
Spark Version: 4.0.0


In [3]:
import random
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


DESTROY_MODULO = 7
DIRTY_MODULO = 2

dirty_postcode_mods = [
    lambda x: x.replace("9", "E"),
    lambda x: x.replace("E", "9"),
    lambda x: x.replace("0", "o"),
    lambda x: x.replace("o", "0"),
    lambda x: x.replace("1", "l"),
    lambda x: x.replace("l", "1"),
    lambda x: x.replace("I", "1"),
    lambda x: x.replace("B", "8"),
    lambda x: x.replace("S", "5"),
    lambda x: x.replace("G", "6"),
    lambda x: x.replace("Z", "2"),
    lambda x: x.replace("C", "0"),
    lambda x: x.replace("K", "X"),
    lambda x: x.lower(),
    lambda x: x.upper(),
    lambda x: x.replace(" ", ""),
    lambda x: x[:len(x)//2],
    lambda x: x + "X",
    lambda x: x + "'",
    lambda x: x + ".",
    lambda x: x + ",",
    lambda x: "X" + x,
    lambda x: "]" + x,
    lambda x: x[:-1] + "Z" if len(x) > 1 else x,
    lambda x: x[1:] if len(x) > 1 else x,
    lambda x: x[:-1] if len(x) > 1 else x,
    lambda x: x + " " + x[-1],
    lambda x: x[:2] + " " + x[2:] if len(x) > 2 else x,
    lambda x: x.replace(" ", "  "),
    lambda x: x[::-1],
]

destroy_postcode_mods = [
    lambda x: "",
    lambda x: "NOTAPOSTC",
]

dirty_address_mods = [
    lambda x: x.replace("Street", "St"),
    lambda x: x.replace("Road", "Rd"),
    lambda x: x.replace("Road", ""),
    lambda x: x.replace("Street", ""),
    lambda x: x.replace("o", "0"),
    lambda x: x.replace("1", "l"),
    lambda x: x.replace("l", "1"),
    lambda x: x.upper(),
    lambda x: x.lower(),
    lambda x: x + " Apt 1B",
    lambda x: x[:len(x)//2],
    lambda x: x.replace("Avenue", "Ave"),
    lambda x: x.replace("Ave", ""),
    lambda x: x.replace("Apartment", "Apt"),
    lambda x: x.replace("Building", "Bldg"),
    lambda x: x.replace(",", ""),
    lambda x: x + ", United Kingdom",
    lambda x: x + ", UK",
    lambda x: "Flat " + x,
    lambda x: x.replace(" ", ""),
    lambda x: " ".join(x.split()[:-1]),
    lambda x: " ".join(x.split()[1:]),
    lambda x: x + " London",
    lambda x: x.replace("London", "Londom"),
    lambda x: x.replace("Main", "Mian"),
    lambda x: x.replace("o", ""),
    lambda x: x.replace("a", "@"),
    lambda x: x.replace("e", "3"),
    lambda x: x.replace("s", "$"),
    lambda x: x.title(),
    lambda x: " ".join(list(x)),
]

destroy_address_mods = [
    lambda x: "123 Madeup Road, Imaginary Town",
    lambda x: "",
    lambda x: x[::-1],
]

def transform_postcode(unique_id, postcode):
    if unique_id % DESTROY_MODULO == 0:
        return random.choice(destroy_postcode_mods)(postcode)
    elif unique_id % DIRTY_MODULO == 0:
        return random.choice(dirty_postcode_mods)(postcode)
    else:
        return postcode

def transform_fulladdress(unique_id, fulladdress):
    if unique_id % DESTROY_MODULO == 0:
        return random.choice(destroy_address_mods)(fulladdress)
    elif unique_id % DIRTY_MODULO == 0:
        return random.choice(dirty_address_mods)(fulladdress)
    else:
        return fulladdress

def transform_expected_uprn(unique_id, current_uprn):
    if unique_id % DESTROY_MODULO == 0:
        return None
    else:
        return current_uprn
    
transform_postcode_udf = udf(transform_postcode, StringType())
transform_fulladdress_udf = udf(transform_fulladdress, StringType())
transform_expected_uprn_udf = udf(transform_expected_uprn, StringType())

In [4]:
# Get all GB addresses
all_address_sdf = spark.read.option("header", True).csv([
    # "dbfs:/Volumes/catalog-sbx-uks-ctdp-001/schema-sbx-uks-ctdp-001-acquire-clickops/test_ctdp/part_0.csv",
    # "dbfs:/Volumes/catalog-sbx-uks-ctdp-001/schema-sbx-uks-ctdp-001-acquire-clickops/test_ctdp/part_1.csv",
    # "dbfs:/Volumes/catalog-sbx-uks-ctdp-001/schema-sbx-uks-ctdp-001-acquire-clickops/test_ctdp/part_2.csv",
    "dbfs:/Volumes/catalog-sbx-uks-ctdp-001/schema-sbx-uks-ctdp-001-acquire-clickops/test_ctdp/part_3.csv",
])

# Extract required fields for address matching
all_address_sdf = all_address_sdf.select(
    "uprn",
    "fulladdress",
    "postcode"
)


In [5]:
import pyspark.sql.functions as F

number_of_test_rows = 50

# Generate SAMPLE of addresses to dirty
address_samples = all_address_sdf.orderBy(F.rand()).limit(number_of_test_rows)
address_samples = address_samples.withColumnRenamed("uprn", "expected_uprn")
# give each test address a key of 'unique_id' to join-on in matcher output (UPRNs want be present in all matcher output)
address_samples = address_samples.withColumn("unique_id", F.monotonically_increasing_id())

address_samples = address_samples \
    .withColumn("postcode", transform_postcode_udf("unique_id", "postcode")) \
    .withColumn("fulladdress", transform_fulladdress_udf("unique_id", "fulladdress")) \
    .withColumn("expected_uprn", transform_expected_uprn_udf("unique_id", "expected_uprn"))

count = address_samples.count()
print(f"Number of test data rows: {count}")
address_samples.show(10, truncate=False)
# we dont want uprn potentially interfering with matching
input_address_samples = address_samples.drop("expected_uprn")



Number of test data rows: 50
+-------------+-----------------------------------------------+---------+---------+
|expected_uprn|fulladdress                                    |postcode |unique_id|
+-------------+-----------------------------------------------+---------+---------+
|NULL         |                                               |         |0        |
|100020439557 |83A, MARLOW ROAD, PENGE, LONDON, SE20 7XR      |SE20 7XR |1        |
|100031672960 |326 TUTBURY ROAD BURTON UPON TRENT DE13 0AJ    |E13 0AJ  |2        |
|32026746     |13, CHESTNUT AVENUE, TIPTON, DY4 9QT           |DY4 9QT  |3        |
|100022274445 |220, ST BARNABAS ROAD, WOODFORD GREEN, IG8 7DR |IG8 7DR  |4        |
|100022296699 |23, PALEWELL PARK, EAST SHEEN, LONDON, SW14 8JQ|SW14 8JQ |5        |
|100022572263 |40, NEVILLE CLOSE, LEYTONSTONE, E11 3QH        |911 3QH  |6        |
|NULL         |                                               |NOTAPOSTC|7        |
|100020262517 |27, THE GROVE, SIDCUP, DA14 5NG 

In [6]:
# # change column names to match moj address matcher's expectations
moj_input_address_samples = input_address_samples.withColumnRenamed("fulladdress", "address_concat")
moj_all_address_subset_sdf = all_address_sdf.withColumnRenamed("uprn", "unique_id").withColumnRenamed("fulladdress", "address_concat")

# MoJ algo wants to read from a Parquet file format
moj_input_address_samples.write.mode("overwrite").parquet("dbfs:/tmp/address_data/input/")
moj_all_address_subset_sdf.write.mode("overwrite").parquet("dbfs:/tmp/address_data/all")