In [49]:
# Import SparkSession
from pyspark.sql import SparkSession
import findspark
import pyspark
import random
import pandas as pd

In [50]:
from pyspark.sql.functions import col

In [51]:
# Create SparkSession 
spark = SparkSession.builder.master("local[1]").getOrCreate() 

In [52]:
sc = spark.sparkContext

# Read Files with sparkSession

In [5]:
gbrDF = spark.read.json('gbr.jsonl')
ofacDF = spark.read.json('ofac.jsonl')

# Checked for if schemas equals


In [6]:
a= ofacDF.printSchema()
b= gbrDF.printSchema()
if( a == b ):
    print("*Schemas equals*")

root
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- postal_code: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- aliases: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- id: long (nullable = true)
 |-- id_numbers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comment: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |-- name: string (nullable = true)
 |-- nationality: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- place_of_birth: string (nullable = true)
 |-- position: string (nullable = true)
 |-- reported_dates_of_birth: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: string (nullable = true)

root
 |-- a

# Dropped duplicates for name column, so that if there is a duplicate in itself, it does not appear as if there is a common duplicate

In [7]:
gbrDFclean = gbrDF.dropDuplicates(['name'])
ofacDFclean = ofacDF.dropDuplicates(['name'])

# Using Union()
# Union() methods of the DataFrame are employed to mix two DataFrame’s of an equivalent structure/schema.

In [8]:
df3 = gbrDFclean.union(ofacDFclean)

# Used groupby for finding records that exist in both DataFrames

In [11]:
groupednames = df3.groupBy("name").count().filter(col("count")>1)

In [12]:
ofacDFclean.select('name','id').show()

+--------------------+-----+
|                name|   id|
+--------------------+-----+
|  CUBAN CIGARS TRADE|  593|
|Abid Hamid Mahmud...| 7846|
|Ismail Abdallah S...| 7945|
| CORPORACION CIME...| 8125|
|Victor Julio SUAR...| 8143|
|Rafik Mohamad YOUSEF| 9589|
| TOYS FACTORY, S....|10638|
|      SINALOA CARTEL|11440|
| HODWALKER Y LEAL...|11526|
|Ignacio MEJIA GUT...|12025|
| CLUB DEPORTIVO O...|12194|
| PUNTO FARMACEUTI...|13018|
|Ramon Ignacio PAE...|15255|
|            ATLANTIC|15724|
|    Bassam AL-HASSAN|16235|
| COMPANIA AGRO CO...|16320|
| NAVID COMPOSITE ...|16444|
| FEODOSIYA ENTERP...|16957|
|        DAI HONG DAN|17086|
| MUJAHIDIN SHURA ...|17187|
+--------------------+-----+
only showing top 20 rows



# Loop for grb_id and ofac_id for matched values based on a column

In [13]:
#For iterating over a spark df you need to use .collect() for retrieving all the data from it. 

# Storing in the variable
data_collect = groupednames.select("name").collect()
 
# looping thorough each row of the dataframe for finding desired gbr and ofac id's

gbrid=[] # array for storing gbrid's 
ofacid=[] # array for storing ofacid's 

for row in data_collect:
    
    # For the name in each row, ofac and gbr find the id of the row with the same name
        #get equal row for name same with #our iterating row value   #Filter that row for getting the id and get it as a value.
    a = gbrDFclean.filter(gbrDFclean.name == row["name"]).select(gbrDFclean.id).first()
    b = ofacDFclean.filter(ofacDFclean.name == row["name"]).select(ofacDFclean.id).first()
    
    # Add every element to lists.
    gbrid.append(a)
    ofacid.append(b)   


# Checked if the number of obtained ids is correct.

In [14]:
print(len(gbrid),len(ofacid),groupednames.count())

482 482 482


## Created 2 Pandas DF from lists for adding as columns to output df

In [15]:
df = pd.DataFrame(gbrid)
df.columns=['gbrid']
df['gbrid'] = df['gbrid'].astype('string') #Converted string for adding as a column. It does not let add as integers.

In [16]:
df2 = pd.DataFrame(ofacid)
df2.columns=['ofacid']
df2['ofacid'] = df2['ofacid'].astype('string')

In [17]:
groupednamespd = groupednames.toPandas()

# Adding lists as columns to matched output df 

In [18]:
groupednamespd['gbr_id'] = df['gbrid']

In [19]:
groupednamespd['ofac_id'] = df2['ofacid']

# Matched DF 

In [20]:
groupednamespd

Unnamed: 0,name,count,gbr_id,ofac_id
0,Abid Hamid Mahmud AL-TIKRITI,2,7619,7846
1,Salah BADI,2,13719,26094
2,Yahya HAQQANI,2,13144,16518
3,James Koang CHUOL,2,13265,16910
4,Moe Myint TUN,2,14062,31171
...,...,...,...,...
477,Barzan Ibrahim Hassan AL-TIKRITI,2,7608,7880
478,Delcy Eloina RODRIGUEZ GOMEZ,2,13687,25077
479,Germain KATANGA,2,8735,11975
480,Mokhtar BELMOKHTAR,2,7881,8084


# Some sample validations for if process working right.

In [22]:
gbrDF.filter(gbrDF.name == 'Abid Hamid Mahmud AL-TIKRITI').show()

+----------------+--------------------+----+----------+--------------------+-----------+--------------------+--------+-----------------------+----------+
|       addresses|             aliases|  id|id_numbers|                name|nationality|      place_of_birth|position|reported_dates_of_birth|      type|
+----------------+--------------------+----+----------+--------------------+-----------+--------------------+--------+-----------------------+----------+
|[{null, null, }]|[{AKA, Abed Mahmo...|7619|        []|Abid Hamid Mahmud...|     [Iraq]|al-Awja, near Tik...|    null|           [00/00/1957]|Individual|
+----------------+--------------------+----+----------+--------------------+-----------+--------------------+--------+-----------------------+----------+



In [24]:
ofacDF.filter(ofacDF.name == 'Abid Hamid Mahmud AL-TIKRITI').show()

+---------+--------------------+----+----------+--------------------+-----------+--------------------+--------+-----------------------+----------+
|addresses|             aliases|  id|id_numbers|                name|nationality|      place_of_birth|position|reported_dates_of_birth|      type|
+---------+--------------------+----+----------+--------------------+-----------+--------------------+--------+-----------------------+----------+
|     null|[{strong a.k.a., ...|7846|      null|Abid Hamid Mahmud...|     [Iraq]|al-Awja, near Tik...|    null|           [circa 1957]|Individual|
+---------+--------------------+----+----------+--------------------+-----------+--------------------+--------+-----------------------+----------+



# Cleaner way to see only ids.

In [27]:
gbrDF.select('name','id').filter(gbrDF.name == 'Sergei Vladimirovich ZHELEZNYAK').show()

+--------------------+-----+
|                name|   id|
+--------------------+-----+
|Sergei Vladimirov...|12920|
+--------------------+-----+



In [28]:
ofacDF.select('name','id').filter(ofacDF.name == 'Sergei Vladimirovich ZHELEZNYAK').show()

+--------------------+-----+
|                name|   id|
+--------------------+-----+
|Sergei Vladimirov...|16668|
+--------------------+-----+



In [30]:
gbrDF.select('name','id').filter(gbrDF.name == 'Barzan Ibrahim Hassan AL-TIKRITI').show()

+--------------------+----+
|                name|  id|
+--------------------+----+
|Barzan Ibrahim Ha...|7608|
+--------------------+----+



In [29]:
ofacDF.select('name','id').filter(ofacDF.name == 'Barzan Ibrahim Hassan AL-TIKRITI').show()

+--------------------+----+
|                name|  id|
+--------------------+----+
|Barzan Ibrahim Ha...|7880|
+--------------------+----+



In [32]:
gbrDF.select('name','id').filter(gbrDF.name == 'James Koang CHUOL').show()

+-----------------+-----+
|             name|   id|
+-----------------+-----+
|James Koang CHUOL|13265|
+-----------------+-----+



In [31]:
ofacDF.select('name','id').filter(ofacDF.name == 'James Koang CHUOL').show()

+-----------------+-----+
|             name|   id|
+-----------------+-----+
|James Koang CHUOL|16910|
+-----------------+-----+



# Add Matched DF to a Spark DF 

In [35]:
MatchedNamesSparkDF=spark.createDataFrame(groupednamespd)

In [37]:
MatchedNamesSparkDF.printSchema()

root
 |-- name: string (nullable = true)
 |-- count: long (nullable = true)
 |-- gbr_id: string (nullable = true)
 |-- ofac_id: string (nullable = true)



In [44]:
MatchedNamesSparkDF.createOrReplaceTempView('sparkdf')

In [45]:
q = spark.sql('SELECT * FROM sparkdf')

In [46]:
q.printSchema()

root
 |-- name: string (nullable = true)
 |-- count: long (nullable = true)
 |-- gbr_id: string (nullable = true)
 |-- ofac_id: string (nullable = true)

