In [14]:
# Install pyspark to begin with
!pip install pyspark



In [15]:
from pyspark.sql import SparkSession

# Create SparkSession

spark = SparkSession.builder\
                    .appName("Restaurants")\
                    .getOrCreate()

In [16]:
# Create dataframe for the first file
df1 = spark.read\
           .format("csv")\
           .option("header", True)\
           .option("inferSchema", True)\
           .load("/kaggle/input/restaurants/file1.csv")

df1.show()
df1.printSchema()



+--------------------+--------------------+----------------+-----+
|               name1|            address1|           city1| zip1|
+--------------------+--------------------+----------------+-----+
|BENTONS ABSOLUTE ...|    23820 ROCKINGHAM|      SOUTHFIELD|48034|
|BENTONS ABSOLUTE ...|    23820 ROCKINGHAM|      SOUTHFIELD|48034|
|    PRO WINDMILL INC|   178 SPRING STREET|         YANKTON|57078|
|OLSONS PEST TECHN...|   178 SPRING STREET|         YANKTON|57078|
|INTERIM HEALTH CA...|1901 NORTH UNION ...|COLORADO SPRINGS|80909|
| INTERIM HEALTH CARE|1901 NORTH UNION ...|COLORADO SPRINGS|80909|
|   COUNTY OF PASSAIC|     40 VALLEY ZROAD|         HALEDON| 7508|
|PREAKNESS HEALTH ...|     40 VALLEY ZROAD|         HALEDON| 7508|
|RESIDENT MANAGEME...|1390 QUAIL LAKE LOOP|COLORADO SPRINGS|80906|
|RESIDENT MANAGEME...|1390 QUAIL LAKE LOOP|COLORADO SPRINGS|80906|
|FENIMORE DRYWALL INC|    5609 NEWLAND WAY|          ARVADA|80002|
|SAM KEDEM NURSERY...|12414 191ST STREE...|        HASTINGS|55

                                                                                

In [17]:
# Create dataframe for the second file
df2 = spark.read\
           .format("csv")\
           .option("header", True)\
           .option("inferSchema", True)\
           .load("/kaggle/input/restaurants/file2.csv")

df2.show()
df2.printSchema()

+--------------------+--------------------+-----------------+-----+
|               name2|            address2|            city2| zip2|
+--------------------+--------------------+-----------------+-----+
|CARIBE TELENO RES...|      320 WATERS AVE|            TAMPA|33604|
|LATIN MIX RESTAUR...|     11368 SW 184 ST|            MIAMI|33157|
|FRIENDLY & AMIGO ...|12275 COLLIER BLVD 3|           NAPLES|34116|
|       MARKER 85 LLC|    127 BAYSHORE WAY|         GOODLAND|34140|
|         T MAX GROUP|      3743 PAULA AVE|         KEY WEST|33040|
|PEI WEI FRESH KIT...|9982 GLADES RD ST...|       BOCA RATON|33434|
|JULIAS PIZZA REST...|   5075 EDGEWATER DR|          ORLANDO|32810|
|      PITAS REPUBLIC|10454 ROOSEVELT B...|   ST. PETERSBURG|33716|
|THE HABIT BURGER ...|1801 S FEDERAL HW...|     DELRAY BEACH|33483|
|      CRYSTAL BUFFET|3160 W NEW HAVEN AVE|      W MELBOURNE|32904|
| MARITZA TRETO PEREZ|         368 W 14 ST|          HIALEAH|33010|
|        SOUP TO NUTS|      1323 S MAIN ST|     

In [18]:
# Harmonize column names between datasets
df1 = df1.withColumnsRenamed({'name1' : 'name', 'address1' : 'address', 'city1' : 'city', 'zip1' : 'zip'})
df2 = df2.withColumnsRenamed({'name2' : 'name', 'address2' : 'address', 'city2' : 'city', 'zip2' : 'zip'})

In [19]:
# Add origin file name to each dataset
from pyspark.sql.functions import lit

df1 = df1.withColumn("origin", lit("file1"))
df2 = df2.withColumn("origin", lit("file2"))

In [20]:
# Data needs to be standardized:
# - Uniformize the characters (so that we don't get differences in uppercased/lowercased characters)
# - Remove any non-alphanumeric character by:
# --- Removing any non-word character (a-zA-Z0-9 or \w)
# --- Removing any non-whitespace character (\n\t + the rest for compatibility purposes?) (also works with \s)
from pyspark.sql.functions import upper, regexp_replace

def clean_data(column):
        return upper(regexp_replace(column, r'[^\w\s]', ''))

In [21]:
# Apply the previous cell's function to each column
from pyspark.sql import functions as F
# Kaggle importing pyspark doesn't allow (?) for col to be called directly

df1_cleaned = df1.withColumn('name', clean_data(F.col('name')))\
                 .withColumn('address', clean_data(F.col('address')))\
                 .withColumn('city', clean_data(F.col('city')))\
                 .withColumn('zip', clean_data(F.col('zip')))

df2_cleaned = df2.withColumn('name', clean_data(F.col('name')))\
                 .withColumn('address', clean_data(F.col('address')))\
                 .withColumn('city', clean_data(F.col('city')))\
                 .withColumn('zip', clean_data(F.col('zip')))

In [22]:
# Data now has origin file and hashed fields
# We can safely merge the two files so we can work on the output

merged_df = df1_cleaned.unionAll(df2_cleaned)

# Should we drop the NAN/nulls? If so, 
merged_df = merged_df.dropna() 

In [23]:
# We need to:
# - Output all restaurants and addresses without duplicates
# - Output the hashed field
# - Output the total number of occurrences of each entry (group the entries)
# - Output two boolean flags that are true/false each depending on the values of "origin"
from pyspark.sql.functions import count, when

########
# Group the dataframe by the relevant columns to filter out duplicates
# By doing so, we will also aggregate the grouped results so as to add new columns.
########
# First count will just count the # of occurrences of "same values".
# Whereas the second and third counts will look into the existence of "file1" or "file2" on the "origin" column,
# and passing a truthful value to a new column depending on whether it exists or not
########

final_df = merged_df.groupBy('name', 'address', 'city', 'zip')\
                    .agg(
                        count("*").alias('number_of_occurrences'),
                        (count(when(F.col('origin') == 'file1', True)) > 0).alias('exists_file1'),
                        (count(when(F.col('origin') == 'file2', True)) > 0).alias('exists_file2')
                    )

In [24]:
# Add unique identifier for each cleaned file
# First concat all the relevant fields
# Then hash over the concatenated field
from pyspark.sql.functions import concat_ws, sha2

# Concat all relevant columns with a '_'
final_df = final_df.withColumn('uuid', concat_ws("_", F.col('name'), F.col('address'), F.col('city'), F.col('zip')))

# Apply sha256 over the concatenated string
final_df = final_df.withColumn('uuid', sha2(F.col('uuid'), 256))

In [25]:
# Make the ordering a bit nicer...

final_df = final_df.select('uuid', 'name', 'address', 'city', 'zip', 'exists_file1', 'exists_file2', 'number_of_occurrences').sort('number_of_occurrences', ascending=False)

final_df.show(10)
final_df.printSchema()

[Stage 22:>                                                         (0 + 4) / 4]

+--------------------+--------------------+--------------------+-------------+-----+------------+------------+---------------------+
|                uuid|                name|             address|         city|  zip|exists_file1|exists_file2|number_of_occurrences|
+--------------------+--------------------+--------------------+-------------+-----+------------+------------+---------------------+
|66e4647885fec9266...|   US POSTAL SERVICE|     160 DURYEA ROAD|     MELVILLE|11747|        true|       false|                   49|
|ad429e3e35a455879...|X PRESS SWEEPING INC|        6 CRUDALE DR| WEST WARWICK| 2893|        true|       false|                   46|
|2ce0a86068e5fa5bf...|USPROTECT CORPORA...|801 ROEDER ROAD S...|SILVER SPRING|20910|        true|       false|                   43|
|fbdd5fda1155bed4d...|USPS MID ISLAND P...|     160 DURYEA ROAD|     MELVILLE|11747|        true|       false|                   28|
|af91c0b850e98204d...|                USPS|     160 DURYEA ROAD|     

                                                                                

In [26]:
# Finally, write the output to a csv file

final_df.write.csv("output.csv", header=True, mode="overwrite")

                                                                                

# How it is done
In order to both merge and harmonize the restaurant data fetched from the given .csv files, this notebook/script performs the following steps:
1. Data Cleaning, Merging and Harmonization:
	- Every column is standardized by converting text to uppercase, removing any special characters (non-alphanumeric) and whitespace characters;
    - These columns are also uniformized (removing the suffix number) in order to simplify the eventual data union;
    - A new string column is added on each file ("origin"), with its contents being "file1" or "file2", depending on its original file. This is to simplify the output column generated in the next step, in order to identify whether that row comes from a specific file or from another.
	- Data is then merged entirely;
	- Rows with empty or NaN values are also discarded after unifying all data (though ideally we could follow-up later with what is described in the "future considerations" section, as we are losing data with this step).


2. Output columns:
	- The requested output columns are generated with the entirety of the data merged, grouping the dataset by its fields;
	- With the grouped data, we can work on aggregating the rows to:
		- Count the # of occurrences (to fetch the duplicates);
            - To do so, we aggregate the grouped data, now normalized, and count its duplicates
		- Check for existence of the origin file's flag (and apply a flag to indicate its presence on such).
            - This is done by averiguating whether the field actually exists via ```when(col("output") == "file1", True)```, then counting the no. of its occurrences. When it does exist at least once, it will output `True`
	- With the dataset finalized, we can now generate a UUID/hash for the first four uniformized fields, which is done via a sha256 hash.


3. Final Output:
	- The merged data is then written to a CSV file.

# Issues, Future Considerations and "What I would do if given more time"
### Data given is very "repetitive" with its duplicates, some fields are very similar despite having different names
    An alternative to deal with this issue could be via a fuzzy matching algorithm to account for less "false positives" when it comes to name comparisons
### Some addresses are partly written, with the same address being written in different ways (some shortened, some in full, for instance)
	To tackle this issue, there could be some better string normalization libraries to normalize the addresses in its entirety
	We could also geocode addresses (with APIs like Google Maps or others as such) to minimize duplicates
### One column is generated per file in order to identify its origin, on the final dataset
    This could be problematic in the way that, if we have 10000 input files, we'd have 10000 additional columns (making it cumbersome to manage the final dataset)
        
    Alternatively, we could either have an array, appending each occurrence to itself
        __or__
    We could still keep a string field and just concatenate each file to itself (making it "File1.csv + File2.csv + ... + File10000.csv" if it was present on every file)