In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from fuzzywuzzy import fuzz
from pyspark.sql.types import IntegerType

In [27]:
spark = SparkSession.builder.appName("Sayari_Spark_Assessment").getOrCreate()

In [28]:
ofac_original_df = spark.read.format("json").load("/content/ofac.jsonl")

In [29]:
ofac_df = ofac_original_df.select(
                        col("addresses").alias("ofac_addresses"),
                        col("id").alias("ofac_id"),
                        col("id_numbers").alias("ofac_id_numbers"),
                        col("name").alias("registered_ofac_name"),
                        col("nationality").alias("ofac_nationality"),
                        col("place_of_birth").alias("ofac_place_of_birth"),
                        col("position").alias("ofac_position"),
                        col("reported_dates_of_birth").alias("ofac_reported_dates_of_birth"),
                        col("type").alias("ofac_type")
                    )

In [30]:
ofac_df.show()

+--------------------+-------+---------------+--------------------+----------------+-------------------+--------------------+----------------------------+----------+
|      ofac_addresses|ofac_id|ofac_id_numbers|registered_ofac_name|ofac_nationality|ofac_place_of_birth|       ofac_position|ofac_reported_dates_of_birth| ofac_type|
+--------------------+-------+---------------+--------------------+----------------+-------------------+--------------------+----------------------------+----------+
|[{Cuba, null, , ,...|     36|           null| AEROCARIBBEAN AI...|              []|               null|                null|                        null|    Entity|
|[{United Kingdom,...|    173|           null| ANGLO-CARIBBEAN ...|              []|               null|                null|                        null|    Entity|
|[{Switzerland, CH...|    306|           null| BANCO NACIONAL D...|              []|               null|                null|                        null|    Entity|
|[{P

In [31]:
grb_original_df = spark.read.format("json").load("gbr.jsonl")

In [32]:
grb_df = grb_original_df.select(
                        col("addresses").alias("grb_addresses"),
                        col("id").alias("grb_id"),
                        col("id_numbers").alias("grb_id_numbers"),
                        col("name").alias("registered_grb_name"),
                        col("nationality").alias("gbr_nationality"),
                        col("place_of_birth").alias("gbr_place_of_birth"),
                        col("position").alias("gbr_position"),
                        col("reported_dates_of_birth").alias("gbr_reported_dates_of_birth"),
                        col("type").alias("gbr_type")
                    )

In [33]:
grb_df.show()

+--------------------+------+--------------------+--------------------+---------------+--------------------+--------------------+---------------------------+----------+
|       grb_addresses|grb_id|      grb_id_numbers| registered_grb_name|gbr_nationality|  gbr_place_of_birth|        gbr_position|gbr_reported_dates_of_birth|  gbr_type|
+--------------------+------+--------------------+--------------------+---------------+--------------------+--------------------+---------------------------+----------+
|[{Democratic Peop...| 12847|                  []|KOREAN COMMITTEE ...|         [null]|                    |                null|                     [null]|    Entity|
| [{Lebanon, null, }]| 13442|[{Passport: JX446...|      Hassan EL-HAJJ|     [Canadian]|Zaghdraiya, Sidon...|                null|               [22/03/1988]|Individual|
|    [{null, null, }]| 13192|[{, 01010744444},...|  Ali Abdullah SALEH|        [Yemen]|(1) Bayt al-Ahmar...|President of Yeme...|       [21/03/1945, 21/0..

In [34]:
df_crossjoin = ofac_df.crossJoin(grb_df)

In [35]:
df_crossjoin.show()

+--------------------+-------+---------------+--------------------+----------------+-------------------+-------------+----------------------------+---------+--------------------+------+--------------------+--------------------+---------------+--------------------+--------------------+---------------------------+----------+
|      ofac_addresses|ofac_id|ofac_id_numbers|registered_ofac_name|ofac_nationality|ofac_place_of_birth|ofac_position|ofac_reported_dates_of_birth|ofac_type|       grb_addresses|grb_id|      grb_id_numbers| registered_grb_name|gbr_nationality|  gbr_place_of_birth|        gbr_position|gbr_reported_dates_of_birth|  gbr_type|
+--------------------+-------+---------------+--------------------+----------------+-------------------+-------------+----------------------------+---------+--------------------+------+--------------------+--------------------+---------------+--------------------+--------------------+---------------------------+----------+
|[{Cuba, null, , ,...|   

In [36]:
def fuzzy_string_match(string1, string2):
    return fuzz.ratio(string1, string2)

fuzzy_string_match_udf = udf(fuzzy_string_match, IntegerType())

df_match_percentage = df_crossjoin.withColumn("fuzzy_match_percentage", fuzzy_string_match_udf("registered_ofac_name", "registered_grb_name"))

In [37]:
df_match_percentage.show()

+--------------------+-------+---------------+--------------------+----------------+-------------------+-------------+----------------------------+---------+--------------------+------+--------------------+--------------------+---------------+--------------------+--------------------+---------------------------+----------+----------------------+
|      ofac_addresses|ofac_id|ofac_id_numbers|registered_ofac_name|ofac_nationality|ofac_place_of_birth|ofac_position|ofac_reported_dates_of_birth|ofac_type|       grb_addresses|grb_id|      grb_id_numbers| registered_grb_name|gbr_nationality|  gbr_place_of_birth|        gbr_position|gbr_reported_dates_of_birth|  gbr_type|fuzzy_match_percentage|
+--------------------+-------+---------------+--------------------+----------------+-------------------+-------------+----------------------------+---------+--------------------+------+--------------------+--------------------+---------------+--------------------+--------------------+-------------------

In [38]:
percentage_filtered_df = df_match_percentage.filter(col("fuzzy_match_percentage") > 80)


In [39]:

percentage_filtered_df.show()

+--------------------+-------+--------------------+--------------------+----------------+--------------------+--------------------+----------------------------+----------+--------------------+------+--------------+--------------------+---------------+------------------+--------------------+---------------------------+----------+----------------------+
|      ofac_addresses|ofac_id|     ofac_id_numbers|registered_ofac_name|ofac_nationality| ofac_place_of_birth|       ofac_position|ofac_reported_dates_of_birth| ofac_type|       grb_addresses|grb_id|grb_id_numbers| registered_grb_name|gbr_nationality|gbr_place_of_birth|        gbr_position|gbr_reported_dates_of_birth|  gbr_type|fuzzy_match_percentage|
+--------------------+-------+--------------------+--------------------+----------------+--------------------+--------------------+----------------------------+----------+--------------------+------+--------------+--------------------+---------------+------------------+--------------------+-

In [40]:
percentage_filtered_df.write.json("final_output.json")