# Address Matching with Apache Spark

## Iniciar Spark

In [15]:
from pyspark.sql.session import SparkSession

spark = spark = SparkSession.builder \
    .appName("Adress Matching") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "20g") \
    .config("spark.driver.maxResultSize", "2g") \
    .getOrCreate()

In [16]:
conf = spark.sparkContext.getConf()
for (key, value) in conf.getAll():
  print(f"{key}: {value}")

spark.app.name: Adress Matching
spark.app.submitTime: 1710360749774
spark.executor.id: driver
spark.driver.host: 10.6.130.30
spark.driver.extraJavaOptions: -Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false
spark.sql.warehouse.dir: file:/home/usu

## Leer fichero

El dataset continene exactamente 24 columnas las cuales, el ***, almacena información relevante el trabajo que realizan. En cambio, en este proyecto, se utilizarán las siguientes columnas pues contienen la información más relevante:
- **uuid_idt**: contiene un identificador alfanumérico único para cada dirección.
- **tvia**: contiene el tipo de vía, es decir, si es una calle, avenida, carretera, etc.
- **nvia**: contiene el nombre de la vía.
- **numer**: contiene el número de la vía.
- **codmun**: está compuesto por un código numérico que identifica a un municipio, es decir, el código postal.
- **nommun**: contiene el nombre del municipio.
- **direccion**: contiene la dirección completa."

In [17]:
# Leer el archivo CSV y cargarlo en un DataFrame
file =  "../data/raw_data/TFM_Direcciones.tab"
first_df = spark.read.option("delimiter", "\t").option("encoding", "windows-1252").csv(file, header=True, inferSchema=True)

# Seleccionar solo las columnas deseadas
selected_columns = [
  "uuid_idt",
  "tvia",
  "nvia",
  "numer",
  "codmun",
  "nommun",
  "direccion",
]
first_df = first_df.select(selected_columns)

# Mostrar el DataFrame
first_df.show()
first_df.schema
print(first_df.count())
size_df = first_df.count()

                                                                                

+--------------------+------------+--------------------+-----+------+------+--------------------+
|            uuid_idt|        tvia|                nvia|numer|codmun|nommun|           direccion|
+--------------------+------------+--------------------+-----+------+------+--------------------+
|027C0FF8-B17B-11E...|       CALLE|        FUENTE SANTA|    5| 35001|Agaete|CALLE FUENTE SANT...|
|028F2302-B17B-11E...|       CALLE|       CRUZ CHIQUITA|    2| 35001|Agaete|CALLE CRUZ CHIQUI...|
|02918516-B17B-11E...|URBANIZACION|RESIDENCIAL PALMERAL|    9| 35001|Agaete|URBANIZACION RESI...|
|03A306C0-7525-11E...|       CALLE|SEÑORITA MARIA MA...|    2| 35001|Agaete|CALLE SEÑORITA MA...|
|03A306C0-7525-11E...|       CALLE|SRTA M MANRIQUE LARA|    2| 35001|Agaete|CALLE SRTA M MANR...|
|03A306C0-7525-11E...|       CALLE| MARIA MANRIQUE LARA|    0| 35001|Agaete|CALLE MARIA MANRI...|
|03A306C0-7525-11E...|       CALLE| MARIA MANRIQUE LARA|    2| 35001|Agaete|CALLE MARIA MANRI...|
|03A306C0-7525-11E..

                                                                                

1784217


                                                                                

In [18]:
from pyspark.sql.functions import upper

file =  "../data/raw_data/data-09022024.csv"
second_df = spark.read.option('header', True).csv(file)

selected_columns = [
  "uuid_idt",
  "codmun",
  "nommun",
  "direccion"
]

second_df = second_df.select(selected_columns)
second_df = second_df.select(upper('uuid_idt').alias('uuid_idt'), 'codmun', upper('nommun').alias('nommun'), 'direccion')
second_df.show()
first_df.schema
print(second_df.count())
size_df += second_df.count()

+--------------------+------+--------------------+--------------------+
|            uuid_idt|codmun|              nommun|           direccion|
+--------------------+------+--------------------+--------------------+
|C0CF3B94-0AD1-11E...| 35016|LAS PALMAS DE GRA...|IGUAZU 42 0 LAS P...|
|73B57C1B-3251-11E...| 35016|LAS PALMAS DE GRA...|PARQUE CENTRAL BL...|
|C70E3EC6-3EDA-11E...| 35016|LAS PALMAS DE GRA...|CONCEJAL GARCIA F...|
|687BA81E-3251-11E...| 35006|              ARUCAS|PEDRO MORALES DEN...|
|74921989-3251-11E...| 35019|SAN BARTOLOMÉ DE ...|ISLA LOBOS 19 0 S...|
|6C0D19FF-3251-11E...| 35016|LAS PALMAS DE GRA...|CALLE LEON Y CAST...|
|6360FDCD-3251-11E...| 35016|LAS PALMAS DE GRA...|VIGEN PILAR 45 4 ...|
|758B3855-3251-11E...| 35016|LAS PALMAS DE GRA...|CALLE PALMERA CAN...|
|5D36841A-7A6B-11E...| 35016|LAS PALMAS DE GRA...|RUPERTO CHAPI 0 L...|
|6C00DBF4-3251-11E...| 35021|       SANTA BRÍGIDA|MANUEL HERNANDEZ ...|
|733A49C5-3251-11E...| 35027|               TEROR| HERRERIA 36 0

In [19]:
print(size_df)

2414514


In [20]:
from pyspark.sql.functions import count

def uuid_frecuency(dataframe):
    values_under_10 = dataframe.groupBy(dataframe.uuid_idt).count() \
        .filter('count < 9') \
        .groupBy('count') \
        .agg(count('*') \
        .alias('Frecuencia')) \
        .orderBy('count')
    values_under_10 = values_under_10.withColumnRenamed('count', 'Número de direcciones asociadas')

    values_over_10 = dataframe.groupBy(dataframe.uuid_idt).count() \
        .filter('count > 9')
    values_over_10 = spark.createDataFrame(
        [['10 o más', values_over_10.count()]],
        ['Número de direcciones asociadas', 'Frecuencia']
        )

    values = values_under_10.union(values_over_10)
    values.show()

uuid_frecuency(first_df)
uuid_frecuency(second_df)

                                                                                

+-------------------------------+----------+
|Número de direcciones asociadas|Frecuencia|
+-------------------------------+----------+
|                              1|    267275|
|                              2|    115343|
|                              3|     60757|
|                              4|     35631|
|                              5|     22207|
|                              6|     14810|
|                              7|     10142|
|                              8|      7411|
|                       10 o más|     25539|
+-------------------------------+----------+



                                                                                

+-------------------------------+----------+
|Número de direcciones asociadas|Frecuencia|
+-------------------------------+----------+
|                              1|    250632|
|                              2|     51645|
|                              3|     19692|
|                              4|      9055|
|                              5|      4702|
|                              6|      2876|
|                              7|      1772|
|                              8|      1161|
|                       10 o más|      4083|
+-------------------------------+----------+



## Unión de los dataframes

In [21]:
# Seleccionar los UUID únicos del primer DataFrame
unique_uuid_first_df = first_df.select("uuid_idt").distinct()

# Seleccionar los UUID únicos del segundo DataFrame
unique_uuid_second_df = second_df.select("uuid_idt").distinct()

# Encontrar los UUID comunes
uuid_comunes = unique_uuid_first_df.join(unique_uuid_second_df, "uuid_idt", "inner")
# Da el mismo resultado: uuid_primero.intersect(uuid_segundo)

# Juntar los UUID comunes con el primer DataFrame
adresses_df = first_df.unionByName(second_df.join(uuid_comunes, "uuid_idt", "inner"), allowMissingColumns=True)

# Mostrar el resultado
adresses_df.show()
print("Tamaño del dataframe ampliado: ", adresses_df.count())

# Si se quiere comprobar que la operación es correcta
print("Número de uuid_idt nuevos del segundo dataframe: ", unique_uuid_second_df.subtract(uuid_comunes).count())

                                                                                

+--------------------+------------+--------------------+-----+------+------+--------------------+
|            uuid_idt|        tvia|                nvia|numer|codmun|nommun|           direccion|
+--------------------+------------+--------------------+-----+------+------+--------------------+
|027C0FF8-B17B-11E...|       CALLE|        FUENTE SANTA|    5| 35001|Agaete|CALLE FUENTE SANT...|
|028F2302-B17B-11E...|       CALLE|       CRUZ CHIQUITA|    2| 35001|Agaete|CALLE CRUZ CHIQUI...|
|02918516-B17B-11E...|URBANIZACION|RESIDENCIAL PALMERAL|    9| 35001|Agaete|URBANIZACION RESI...|
|03A306C0-7525-11E...|       CALLE|SEÑORITA MARIA MA...|    2| 35001|Agaete|CALLE SEÑORITA MA...|
|03A306C0-7525-11E...|       CALLE|SRTA M MANRIQUE LARA|    2| 35001|Agaete|CALLE SRTA M MANR...|
|03A306C0-7525-11E...|       CALLE| MARIA MANRIQUE LARA|    0| 35001|Agaete|CALLE MARIA MANRI...|
|03A306C0-7525-11E...|       CALLE| MARIA MANRIQUE LARA|    2| 35001|Agaete|CALLE MARIA MANRI...|
|03A306C0-7525-11E..

                                                                                

Tamaño del dataframe ampliado:  2234214


[Stage 322:==>(7 + 1) / 8][Stage 323:==>(6 + 2) / 8][Stage 325:=> (3 + 5) / 8]8]

Número de uuid_idt nuevos del segundo dataframe:  148073


                                                                                

In [22]:
uuid_frecuency(adresses_df)

                                                                                

+-------------------------------+----------+
|Número de direcciones asociadas|Frecuencia|
+-------------------------------+----------+
|                              1|    211768|
|                              2|    119917|
|                              3|     70050|
|                              4|     44000|
|                              5|     28925|
|                              6|     19650|
|                              7|     14050|
|                              8|     10331|
|                       10 o más|     38060|
+-------------------------------+----------+



## Partición del dataset

In [23]:
adresses_df = adresses_df.repartition(32)

## Limpieza del dataset

Tras visualizar el dataset, se observa que hay columnas que tienen valores desconocidos, representados con '_U', y camos vacíos.

In [24]:
print(f"Initial dataset size: {adresses_df.count()}")



Initial dataset size: 2234214


                                                                                

### Valores nulos

In [25]:
from pyspark.sql.functions import isnan, when, count, col

adresses_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in adresses_df.columns]).show()



+--------+------+------+------+------+------+---------+
|uuid_idt|  tvia|  nvia| numer|codmun|nommun|direccion|
+--------+------+------+------+------+------+---------+
|       0|580357|460216|449997|     0|     0|        0|
+--------+------+------+------+------+------+---------+



                                                                                

In [26]:
adresses_df = adresses_df.na.fill('_U')

adresses_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in adresses_df.columns]).show()



+--------+----+----+------+------+------+---------+
|uuid_idt|tvia|nvia| numer|codmun|nommun|direccion|
+--------+----+----+------+------+------+---------+
|       0|   0|   0|449997|     0|     0|        0|
+--------+----+----+------+------+------+---------+



                                                                                

### Extracción del prefijo para eleminar valores '_U'

Tras analizar el dataset, se observa que para una dirección, el tipo de vía (tvia) puede aparecer vacío, en cambio, en la columna 'direccion' aparece este valor. Por lo que se procederá a extraer el prefijo de la columna 'direccion' para rellenar los valores vacíos de la columna 'tvia' evitando así la perdida de información.

In [27]:
print(f"Unknown tvia before prefix extraction: {adresses_df.filter(adresses_df.tvia == '_U').count()}")



Unknown tvia before prefix extraction: 580357


                                                                                

In [28]:
tvia_types = adresses_df.select("tvia").distinct().collect()
tvia_types = {t.tvia for t in tvia_types if t.tvia != "_U" and not t.tvia.isnumeric()}
print(tvia_types)



{'CG', 'CUESTA', 'LU', 'VEREDA', 'RAMBL', 'PROL', 'LUGAR', 'MUELLE', 'COSTANILLA', 'CALLEJUELA', 'MONTE', 'AVENIDA', 'ESCALINATA', 'MERCADO', 'CSRIO', 'BLOQUES', 'AVENI', 'BARRA', 'BCO', 'PARTICULAR', 'SECTOR', 'EDIF', 'HOTEL', 'SOLAR', 'PARAJE', 'PUENTE', 'ACEQUIA', 'CAMINO', 'OTROS', 'AGRUP', 'CORRAL', 'CONJUNTO', 'RINCONADA', 'EDIFICIO', 'SENDERO', 'PLAZA', 'MANZANA', 'RU', 'VILLAS', 'PASADIZO', 'RESIDENCIA', 'CMINO', 'PAGO', 'CALLEJON', 'PLACETA', 'COOPERATIVA', 'CRA', 'RESIDENCIAL', 'COMPLEJO', 'CARRERA', 'VIA', 'AUTOVIA', 'POBLADO', 'PARQUE', 'MUNICIPIO', 'RAMAL', 'VIVIENDAS', 'TORRENTE', 'APARTAMENTOS', 'BULEVAR', 'TRANS', 'CALLE', 'PASO', 'PLAZUELA', 'PATIO', 'CENTRO COMERCIAL', 'BARRIO', 'LADERA', 'POLIG', 'PASAJE', 'POLIGONO', 'AUTOPISTA', 'CARRETERA', 'PASEO BAJO', 'ENTRADA', 'RAMBLA', 'PASILLO', 'PASEO', 'AMPLIACION', 'MONTAÑA', 'TRASERA', 'CALE', 'ALAMEDA', 'PASSEIG', 'PARQ', 'TORRE', 'SUBIDA', 'GRUPO', 'TRANSVERSAL', 'SENDA', 'DISEMINADO', 'TVERA', 'ANGOSTA', 'CALLEJA', '

                                                                                

In [29]:
from pyspark.sql.functions import regexp_extract, when

condition = (adresses_df.tvia == '_U') & (regexp_extract('direccion', r'^(\\S+)', 1).isin(tvia_types))
adresses_df = adresses_df.withColumn("tvia", when(condition, regexp_extract('direccion', r'^(\\S+)', 1)).otherwise(adresses_df.tvia))

In [30]:
print(f"Unknown tvia before prefix extraction: {adresses_df.filter(adresses_df.tvia == '_U').count()}")



Unknown tvia before prefix extraction: 580357


                                                                                

Se puede observar que los tipos de vías que siguen siendo desconocidos es debido a que desde la columna de dirección no se ha podido extraer un prefijo que aparezca en la lista de los tipos de vías que tenemos en el dataset.

### Limpieza de entradas con valor '_U'

Tras realizar la limpieza del dataset y extraer el tipo de via de la columna 'direccion', se procederá a eliminar las entradas restantes que contengan valores '_U' en las columnas 'tvia' pues no aportan información relevante.

In [31]:
print(f"Unknown tvia before duplicated null cleaning: {adresses_df.filter(adresses_df.tvia == '_U').count()}")



Unknown tvia before duplicated null cleaning: 580357


                                                                                

In [32]:
""" # Marca todos los duplicados incluyendo la primera fila encontrada
duplicated_rows = dataset.duplicated(subset = ["direccion"], keep = False)
# Marca como duplicada todas las filas menos la primera encontrada
# duplicated_rows = dataset.duplicated(subset = ["direccion"], keep = 'first')
# print(duplicated_rows)

duplicated_rows = [index for index, value in enumerate(duplicated_rows.to_numpy()) if value]
print(len(duplicated_rows))
print(dataset.iloc[0])

to_be_removed = list()

for index in reversed(duplicated_rows[:50]): # Tiempo: 9min limpiando 50 filas
    if dataset.at[index, 'tvia'] == "_U": # Usando at en vez de iloc se reduce el tiempo de ejecución 1/3 a 4min
        to_be_removed.append(index)

print(len(to_be_removed))
print(dataset.iloc[to_be_removed])
# dataset.drop(index = to_be_removed, inplace = True) """


adresses_df = adresses_df.filter(adresses_df.tvia != '_U')
""" print(adresses_df.filter(adresses_df.tvia != '_U').count())
print(adresses_df.filter(adresses_df.tvia == '_U').count()) """

""" print(f"Unknown tvia after duplicated null cleaning: {dataset.tvia.value_counts()['_U']}") """
""" adresses_df.filter(adresses_df.tvia == '_U').groupBy(adresses_df.direccion).count().sort(desc('count')).show() """

" dataset_spark.filter(dataset_spark.tvia == '_U').groupBy(dataset_spark.direccion).count().sort(desc('count')).show() "

In [33]:
print(f"Unknown tvia after duplicated null cleaning: {adresses_df.filter(adresses_df.tvia == '_U').count()}")

Unknown tvia after duplicated null cleaning: 0


In [34]:
print(f"Final Dataset size: {adresses_df.count()}")



Final Dataset size: 1653857


                                                                                

## Aumento de datos

### Frecuencia de uuid_idt antes

In [35]:
uuid_frecuency(adresses_df)

                                                                                

+-------------------------------+----------+
|Número de direcciones asociadas|Frecuencia|
+-------------------------------+----------+
|                              1|    278002|
|                              2|    109890|
|                              3|     57031|
|                              4|     32912|
|                              5|     20151|
|                              6|     13280|
|                              7|      9074|
|                              8|      6577|
|                       10 o más|     22563|
+-------------------------------+----------+



### Aumento de datos

- Cambiar el orden de las palabras
- Añadir errores ortográficos
- Añadir sinónimos

In [36]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import random

# Define la función UDF
def switch_letters(direcction):
    """Intercambia letras adyacentes en una palabra"""
    words = direcction.split()
    word_candidates = [word for word in words if word.isalpha() and len(word) >= 2]
    if not word_candidates:
        return direcction
    word = random.choice(word_candidates)
    pos = random.randint(0, len(word) - 2)
    return " ".join(
        w if w != word else w[:pos] + w[pos + 1] + w[pos] + w[pos + 2:]
        for w in words
    )

def update_direction_via(tvia, direction):
    """Actualiza el tipo de vía de la dirección"""
    old_tvia = direction.split(" ")[0]
    if tvia.upper() in tvia_types:   
        direction = direction.replace(old_tvia, tvia)
    return direction

def select_via_type(tvia):
    """Selecciona tipos de vía alternativos sin duplicados"""
    return random.choice([t for t in tvia_types if t.title() != tvia and t != tvia.lower()])


unique_filtered_uuids = adresses_df.groupBy("uuid_idt").count().filter("count < 10").select("uuid_idt").sample(0.5)

to_extend = adresses_df.join(unique_filtered_uuids, "uuid_idt")

# Registra la función UDF con Spark
spark.udf.register("switch_letters", switch_letters, StringType())
spark.udf.register("select_via_type", select_via_type, StringType())
spark.udf.register("update_direction_via", update_direction_via, StringType())

# Aplica la función UDF a la columna "direccion" usando withColumn
extended_ds = (to_extend
    .withColumn("tvia", udf(select_via_type, StringType())("tvia"))
    .withColumn("direccion", udf(update_direction_via, StringType())("tvia", "direccion"))
    .withColumn("direccion", udf(switch_letters, StringType())("direccion"))
)

extended_ds.show(truncate=False)
print(extended_ds.count(), adresses_df.count())
adresses_df = adresses_df.union(extended_ds)
print(adresses_df.count())

                                                                                

+------------------------------------+------------+----------------------------+-----+------+--------------------------+------------------------------------------------------------------+
|uuid_idt                            |tvia        |nvia                        |numer|codmun|nommun                    |direccion                                                         |
+------------------------------------+------------+----------------------------+-----+------+--------------------------+------------------------------------------------------------------+
|0016535F-03E9-429E-A5F2-3B6B9AA90BC9|ENTRADA     |SIROCO CUEVAS BLANCAS       |21   |38038 |Santa Cruz de Tenerife    |ENTRADA SIROCO UCEVAS BLANCAS 21 SANTA CRUZ DE TENERIFE           |
|00760F34-E924-4184-A8B3-29965FC5B864|HOTEL       |BRISAMAR III CL AYOZE ACORAN|8    |38038 |Santa Cruz de Tenerife    |HOTEL BRISAAMR III CL AYOZE ACORAN 8 SANTA CRUZ DE TENERIFE       |
|008B0887-AC4D-4713-B9E5-07E6CED0604F|LUGAR       |POLO 106A

                                                                                

571732 1653857




2225589


                                                                                

### Frecuencia de uuid_idt después

In [37]:
uuid_frecuency(adresses_df)

                                                                                

+-------------------------------+----------+
|Número de direcciones asociadas|Frecuencia|
+-------------------------------+----------+
|                              1|    138541|
|                              2|    194307|
|                              3|     28533|
|                              4|     71500|
|                              5|      9978|
|                              6|     35079|
|                              7|      4531|
|                              8|     19792|
|                       10 o más|     49672|
+-------------------------------+----------+

