# GeoNames Cities processing

#### 1. Setup the local Spark instance.

In this first step, we load and instantiate the different global Spark contexts: `SparkContext` and `SQLContext`.

In [None]:
import findspark
findspark.init("/opt/spark")

# Load the global SparkContext for this Spark application.
from pyspark import SparkContext, SparkConf
conf = SparkConf()
conf.set("spark.app.name", "SparkAlgoliaPipeline")
conf.set("spark.master", "local[30]")
conf.set("spark.ui.port", "4040")

SparkContext.setSystemProperty('spark.executor.memory', '16g')
SparkContext.setSystemProperty('spark.driver.memory', '8g')
SparkContext.setSystemProperty('spark.driver.maxResultSize', '8g')

sc = SparkContext(conf=conf)

# Load the global Spark SQLContext for this Spark application.
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Load the Spark packages that we will use throughout this notebook.
from pyspark.sql.types import ArrayType, StructType, StructField, DoubleType, IntegerType, StringType, BooleanType
from pyspark.sql.functions import col, udf, collect_list, collect_set, size, levenshtein, abs, length, ceil

# Load the tools we need for the string similarity computation
from collections import Counter
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity

#### 2. Load the different GeoNames datasets into Spark DataFrame instances.

 - The GeoNames dataset (DataFrame variable `df_gn`); the specifications for this dataset are available [here](http://download.geonames.org/export/dump/readme.txt).
 - The Postal Codes dataset (DataFrame variable `df_pc`); the specifications for this dataset are available [here](http://download.geonames.org/export/zip/readme.txt).


In [None]:
# Define the local path of the datasets.
DATA_PATH = "path/to/geonames/"
POSTAL_CODES_FILE = DATA_PATH + "postcodes-allCountries.txt"
GEONAMES_FILE = DATA_PATH + "geonames-allCountries.txt"


# Define the DataFrame schema for each dataset.
schema_postal_codes = StructType([
    StructField("CC", StringType()),
    StructField("PC", StringType()),
    StructField("NAME", StringType()),
    StructField("AN1", StringType()),
    StructField("AC1", StringType()),
    StructField("AN2", StringType()),
    StructField("AC2", StringType()),
    StructField("AN3", StringType()),
    StructField("AC3", StringType()),
    StructField("LAT", DoubleType()),
    StructField("LON", DoubleType()),
    StructField("ACC", IntegerType())
])

schema_geonames = StructType([
    StructField("GNID", StringType()),
    StructField("NAME", StringType()),
    StructField("NAME_ASCII", StringType()),
    StructField("ALT_NAMES", StringType()),
    StructField("LAT", DoubleType()),
    StructField("LON", DoubleType()),
    StructField("F_CLASS", StringType()),
    StructField("F_CODE", StringType()),
    StructField("CC", StringType()),
    StructField("CC2", StringType()),
    StructField("AC1", StringType()),
    StructField("AC2", StringType()),
    StructField("AC3", StringType()),
    StructField("AC4", StringType()),
    StructField("POP", StringType()),
    StructField("ALT", StringType()),
    StructField("DEM", StringType()),
    StructField("TZ", StringType()),
    StructField("DATE", StringType())
])

# Load both datasets in their respective DataFrame instances.
df_pc = sqlContext.read.format("com.databricks.spark.csv").schema(schema_postal_codes).option("header", "false").option("mode", "DROPMALFORMED").option("delimiter", '\t').load(POSTAL_CODES_FILE)
df_gn = sqlContext.read.format("com.databricks.spark.csv").schema(schema_geonames).option("header", "false").option("mode", "DROPMALFORMED").option("delimiter", '\t').load(GEONAMES_FILE)

# Give aliases to the DataFrames we just loaded
df_pc = df_pc.alias("pc")
df_gn = df_gn.alias("gn")

#### 3. Filter the Postal Codes dataset according to the country-specific postal code formats.

The country-specific postal codes format is given by this crowdsourced Wikipedia page: https://en.wikipedia.org/wiki/List_of_postal_codes.  
For this step, we parse the file that we have made from the formats detailed in the Wikipedia page and load each postal code format string as a regex pattern instance.

In [None]:
import re

# Define the local path of the csv file containing the postal code formats.
POSTAL_CODES_FORMAT_FILE = DATA_PATH + "postcodes.csv"

# Instanciate the list of postal code formats as a global variable.
country_postal_codes = {}
with open(POSTAL_CODES_FORMAT_FILE, 'r') as f:
    for line in f:
        fields = line.split('\t')
        cc = fields[0]
        pattern = None
        if fields[1] != '':
            pattern = re.compile(fields[1].strip())
        
        country_postal_codes[cc] = pattern

print("Loaded %s postal code formats." % len(country_postal_codes))

In [None]:
# Define a Python function to check the validity of a postal code given the country code.
def is_valid_postal_code(cc, postal_code):
    """Returns True if the postal code postal_code is valid 
       for the given country code cc, False otherwise."""
    if cc not in country_postal_codes:
        return False
    
    m = country_postal_codes[cc].match(postal_code)
    return m is not None
    
    
# Define a Spark user-defined function to check the validity of a postal code given the country code.
is_valid_postal_code_udf = udf(lambda x,y: is_valid_postal_code(x,y), BooleanType())

# Filter the Postal Codes DataFrame to remove the invalid postal codes.
df_pc_filtered = df_pc.filter(is_valid_postal_code_udf(col("pc.CC"), col("pc.PC")))

# Print to check that it has filtered.
print("Number of postal codes features (not filtered): %s." % df_pc.count())
print("Number of postal codes features (filtered): %s." % df_pc_filtered.count())

#### 4. Filter the cities from the GeoNames dataset.

The GeoNames features are associated to class and subclass codes. The feature class and subclass codes are defined in the document at this link: https://www.geonames.org/export/codes.html. The list of feature class codes is as follows:
 - `A`: country, state, region,...
 - `H`: stream, lake, ...
 - `L`: parks, area, ...
 - `P`: city, village, ...
 - `R`: road, railroad 
 - `S`: spot, building, farm
 - `T`: mountain, hill, rock, ... 
 - `U`: undersea
 - `V`: forest, heath, ...

The feature class that is of interest for this project is the feature class of code `P` that corresponds to populated places such as cities and villages. In the following, we filter the GeoNames dataset to remove the features that are not of class code `P` and only keep the cities.

In [None]:
# Filter the cities (feature class code P) from the GeoNames DataFrame.
df_gn_cities = df_gn.filter(col("gn.F_CLASS")=='P')

print("Number of GeoNames features (not filtered): %s." % df_gn.count())
print("Number of GeoNames features (filtered): %s." % df_gn_cities.count())

#### 5. Join the GeoNames dataset to the Postal Codes dataset.

In this step, we join the GeoNames dataset to the Postal Codes dataset based on the longitudes and latitudes of the features. Each fature in both dataset has a name that corresponds to the name of the city. We use the name of the features to match them together. The problem is that the names for the matching features may differ from one dataset to the other and thus cannot be used directly to join the two datasets. We will use the name features to select the most likely GeoNames candidate feature for a given Postal Code name. 

Features in both datasets are also associated to several hierarchical administrative areas of varying sizes. For instance, Paris 08 (75008) is in the country France of code FR, the first-level administrative area 11 (Île-de-France), the second-level administrative area 75 (Paris), and the third-level administrative area 751 (Paris). Since all features in both datasets are associated to a homogenous country code, we use it to divide the processsing of the features and thus reduce the size of the join. Note that, since administrative codes are not homogenous for some countries between the two datasets (e.g., Mexico), we cannot use them to further reduce the join size.

 
Each feature in both datasets is associated to coordinates that consist of a longitude and a latitude given in the [WGS84 coordinate system](https://en.wikipedia.org/wiki/World_Geodetic_System). The coordinates are more or less precise depending on the feature. The precision of the coordinates is given by the number of decimal places given in the latitude and longitude float. According to this [Wikipedia page](https://en.wikipedia.org/wiki/Decimal_degrees), the phyical length that can be measured depends on the precison of the coordinates as per the number of decimal places. As such, a precision of one decimal place corresponds to a length of 7.871 kilometers. Therefore, comparing coordinates at this precision level would be precise enough to absorb the difference between the coordinates in both datasets, while avoiding overlap on multiple cities.


#### 6. Aggregate the joined dataset.

In this last step, we aggregate the joined DataFrame by grouping the Postal Code name features. The identifiers and names of the GeoNames features are then aggregated into two separate lists for a given matching Postal Codes name feature. We select the most likely GeoNames feature candidate using the cosine distance between the Postal Code feature name and the GeoNames feature name. The [cosine similarity](https://en.wikipedia.org/wiki/Cosine_similarity) performs a loosely match between two strings. In particular, it removes the special characters, tokenizes the string into words, and then compares the words shared betwee between two strings. If the distance between the name in the GeoNames dataset and the one in the Postal Codes dataset is more than 20%, the two names will be matched.

The other features that match the name features will be joined to the aggregated dataframe using the GeoNames identifier. 

In [None]:
import os
import time

# Define some constants for this part of the pipeline.
COORDINATES_PRECISION = 0.01
STRING_MATCHING_THRESHOLD = 0.2
TEMP_CSV_FOLDER = "temp_output"
CSV_FOLDER = "geonames_output"

# Define some functions that we will use in the user-defined function below.
def get_best_match_idx(ref, strs): 
    """Get the index of the best matching string in strs against ref."""
    vectors = [t for t in get_vectors([ref]+strs)]
    sim = cosine_similarity(vectors)
    match_idx = sim[0,1:].argmax(axis=0)
    if sim[0,1:][match_idx] >= STRING_MATCHING_THRESHOLD:
        return int(match_idx)
    else:
        return -1
    
def get_vectors(strs):
    """Vectorize the word tokens normalized in lower case contained in strings of list strs."""
    text = [t.lower() for t in strs if t is not None]
    vectorizer = CountVectorizer(text)
    vectorizer.fit(text)
    return vectorizer.transform(text).toarray()


# Define the join conditions.
cond1 = [col("pc.CC") == col("gn.CC")]
cond2 = [abs(col("pc.LON") - col("gn.LON")) <= COORDINATES_PRECISION, 
         abs(col("pc.LAT") - col("gn.LAT")) <= COORDINATES_PRECISION]

# Define the fields to select after the first join.
sel_join = [col("gn.GNID"), col("gn.CC"), col("pc.AC1"), col("pc.AC2"), col("pc.AC3"), 
            col("pc.AN1"), col("pc.AN2"), col("pc.AN3"), col("gn.NAME").alias("NAME_gn"), 
            col("pc.NAME").alias("NAME_pc"), col('pc.PC'), col('pc.LAT'), col("pc.LON")]

# Define the fields to select after the aggregation.
sel_agg = [col("jn.GNID"), col("jn.NAME_gn"), col("jn.NAME_pc"), col("jn.PC"),
           col("jn.AC1"), col("jn.AC2"), col("jn.AC3"), 
           col("jn.AN1"), col("jn.AN2"), col("jn.AN3")]

# Define the final fiels to select after the final join.
sel_final = [col("gn.GNID"), col("gn.CC"), col("agg.AC1"), col("agg.AC2"), col("gn.LAT"), col("gn.LON"),
             col("agg.AC3"), col("agg.AN1"), col("agg.AN2"), col("agg.AN3"), col("agg.NAME_pc"), 
             col("gn.NAME"), col("agg.PC_set"), col("gn.ALT_NAMES"), col("gn.POP")]

# User-defined function to transform a list of elements into a set.
to_set_udf = udf(lambda x: list(set(x)), ArrayType(StringType()))

# User-defined function to select the most similar GeoNames name with the given postal code feature name.
best_match_idx_udf = udf(lambda x,y: get_best_match_idx(x,y), IntegerType())

# User-defined function to get the value of the element in a list at a given index.
get_list_index_udf = udf(lambda l,i: l[i] if i >= 0 else '', StringType())

# User-defined function to transform an array of strings into a string (eq. str.join).
array_to_string_udf = udf(lambda x: ",".join(x))

print("Computing the join...")
for cc, count in countries.items():            
    print("[.] %s: Processing..." % cc)
    start_time = time.time()
    
    # Filter both dataframes by their country code.
    df_pc_filtered_cc = df_pc_filtered.filter((col("pc.CC")==cc) & (col("pc.NAME").isNotNull()))
    df_gn_cities_cc = df_gn_cities.filter((col("gn.CC")==cc) & (col("gn.NAME").isNotNull()))

    # Perform the inner join operation on the fitered dataframes.
    df_joined_cc = df_pc_filtered_cc.join(df_gn_cities_cc, cond1+cond2, "inner").select(sel_join)
        
    # Aggregate the names features of the Postal Codes dataset in the joined DataFrame.
    # Reduce the GeoName features with the best matching name
    df_joined_cc = df_joined_cc.alias("jn")
    df_agg_cc = df_joined_cc.select(sel_agg)\
    .groupBy("NAME_pc", "AC1", "AC2", "AC3", "AN1", "AN2", "AN3").agg(
        collect_list("GNID").alias("GNID_list"),
        collect_list("NAME_gn").alias("NAME_gn_list"),
        collect_set("PC").alias("PC_set"))\
    .withColumn("match_idx", best_match_idx_udf(col("NAME_pc"), col("NAME_gn_list")))\
    .withColumn("GNID", get_list_index_udf(col("GNID_list"), col("match_idx")))
    
    df_agg_cc = df_agg_cc.alias("agg")
    df_agg_filtered_cc = df_agg_cc.filter(col("agg.GNID").isNotNull())
    
    # Join the aggregated dataframe with the attributes of the GeoNames dataframe.
    df_agg_joined_cc = df_agg_filtered_cc.join(df_gn_cities_cc, col("gn.GNID") == col("agg.GNID"), "inner").select(sel_final)
    
    # Transform the postal code set (array) into a string compatible with csv write.
    try:
        df_agg_joined_cc = df_agg_joined_cc.alias("agg_jn")
        df_agg_joined_cc.withColumn('PCs', array_to_string_udf(col("agg_jn.PC_set")))\
        .drop("PC_set").write.csv("%s" % TEMP_CSV_FOLDER)

        print("[x] %s: Completed in %.2f seconds." % (cc, time.time() - start_time))   
    except Exception as e:
        print("Exception when dumping the file in a csv: %s" % e)
        
    # Put all the csv files dumped in the temporary folder into a single csv file 
    # and remove the temporary folder.
    os.system("cat %s/p* > %s/%s.csv; rm -rf %s" % (TEMP_CSV_FOLDER, CSV_FOLDER, cc, TEMP_CSV_FOLDER))     
    
    # Release the dataframes.
    try:
        del df_agg_joined_cc
        del df_agg_filtered_cc
        del df_agg_cc
        del df_joined_cc
        del df_gn_cities_cc
        del df_pc_filtered_cc
    except KeyError:
            pass
    

#### 7. Prepare and send the data to Algolia
In this last part, we prepare and then send the data to Algolia.

In [None]:
from algoliasearch import algoliasearch

client = algoliasearch.Client("N3CD4LLDJP", '7b36e4dc1ad98f44e8868c1b1aa184d0')

In [None]:
import csv
import glob

# Batch add to the Algolia index.
# Doc: https://www.algolia.com/doc/api-reference/api-methods/batch/
# Search doc: https://www.algolia.com/doc/guides/searching/geo-search/#getting-geo-search-info-with-rankinginfo

INDEX_NAME = "geonames"

cc_data = []
count = 0
for file in glob.glob(CSV_FOLDER+"/*.csv"):
    # Prepare the data.        
    with open(file, 'r') as f:
        csvreader = csv.reader(f, delimiter=',', quotechar='"')
        for row in csvreader:
            d = {
                "action": "addObject",
                "indexName": INDEX_NAME,
                "body": {
                    "geonameid": row[0],
                    "cc": row[1],
                    "name": row[10],
                    "altnames": row[12].split(','),
                    "admin1": row[7],
                    "admin2": row[8],
                    "admin3": row[9],
                    "postalCodes": row[14].split(',')[:100],
                    "population": int(row[13]),
                    "_geoloc": {
                        "lat": float(row[4]),
                        "lng": float(row[5])
                    }
                }
            }
            cc_data.append(d)
            count += 1

    
# Batch-send the data.
print("Sending the data...")
res = client.batch(cc_data)
print("Data sent with %s records." % (count))