In [1]:
!pip install pyspark
!pip install python-Levenshtein
!pip install fuzzywuzzy

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 45.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=a3b90a229b8f456deb847aba6964df62ada822038a4d16804de88fed4820f689
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1
Collecting python-Levenshtein
  Downloading python-Levenshtein-0.12.2.tar.gz (50 kB)
[K     |████████████████████████████████| 50 kB 3.0 MB/s 
Building wheels for collected packages: python-Levenshtein
  Building whee

In [2]:
import pyspark.sql.functions as F

from pyspark.sql import SparkSession
from fuzzywuzzy import fuzz
from pyspark.sql.functions import row_number,lit, when, col
from pyspark.sql.window import Window
from pyspark.sql import Window as window

In [34]:
spark = SparkSession.builder \
    .appName("SparkByExamples.com") \
    .getOrCreate()
df = spark.read.options( header=True).csv('./data.csv')

In [35]:
df.show()

+----------+-------+---------+-----------------+--------------+--------------+
|First Name|Surname|Full Name|            Email|Categorisation|Email_Validity|
+----------+-------+---------+-----------------+--------------+--------------+
|       Ali|   NULL|      Ali|             NULL|          Live|             1|
|      Umer|   NULL|     Umer|             NULL|          Live|             1|
|     Adnan|   NULL|    Adnan|  adnan@gmail.com|          Live|             1|
|   Abubakr|   NULL|  Abubakr|             NULL|          Live|             1|
|     Baber|   NULL|    Baber|             NULL|        Lapsed|             1|
|     Adeel|   NULL|    Adeel|  adeel@gmail.com|          Live|             0|
|       Ali|   NULL|      Ali|             NULL|          Live|             1|
|    Farhan|   NULL|   Farhan|             NULL|          Live|             1|
|   Murtaza|   NULL|  Murtaza|             NULL|        Lapsed|             1|
|    Faizan|   NULL|   Faizan|             NULL|    

In [36]:
w = Window().orderBy(lit('row_num'))
df = df.withColumn("row_num", row_number().over(w))

In [37]:
joined = df.crossJoin(df)

In [38]:
def rename_duplicate_columns(dataframe):
    columns = dataframe.columns
    duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2]))
    for index in duplicate_column_indices:
        columns[index] = columns[index]+'_2'
    dataframe = dataframe.toDF(*columns)
    return dataframe
joined = rename_duplicate_columns(joined)

In [41]:
def calculate_fuzz_ratio(
   First_Name, Surname, Full_Name, Email, Categorisation, Email_Validity, First_Name_2, Surname_2, Full_Name_2, Email_2, Categorisation_2, Email_Validity_2
):
  str1 = [First_Name, Surname, Full_Name, Email, Categorisation, Email_Validity]
  str2 = [First_Name_2, Surname_2, Full_Name_2, Email_2, Categorisation_2, Email_Validity_2]
  return fuzz.ratio(str1, str2)

In [42]:
sch = ['group_no', 'row_no', 'match_%']
joined = joined.rdd.map(lambda x: (x.row_num, x.row_num_2, 
                                calculate_fuzz_ratio(x['First Name'], x.Surname, x['Full Name'], x.Email, x.Categorisation, x.Email_Validity, x['First Name_2'], x.Surname_2, x['Full Name_2'], x.Email_2, x.Categorisation_2, x.Email_Validity_2))).toDF(sch)

In [43]:
joined.show()

+--------+------+-------+
|group_no|row_no|match_%|
+--------+------+-------+
|       1|     1|    100|
|       2|     1|     84|
|       3|     1|     69|
|       4|     1|     83|
|       5|     1|     76|
|       6|     1|     71|
|       7|     1|    100|
|       8|     1|     80|
|       9|     1|     73|
|      10|     1|     85|
|      11|     1|     73|
|      12|     1|     87|
|      13|     1|     87|
|      14|     1|     57|
|      15|     1|     67|
|      16|     1|     69|
|      17|     1|     65|
|      18|     1|     66|
|      19|     1|     96|
|      20|     1|     82|
+--------+------+-------+
only showing top 20 rows



In [44]:
joined = joined.where(joined['match_%'] >= 80)

## picking the best group number for each row of data

In [45]:
w = window.partitionBy('group_no')
joined = joined.select('group_no', 'row_no', 'match_%', F.count('group_no').over(w).alias('count')).sort('group_no', 'match_%')

In [46]:
joined.show()

+--------+------+-------+-----+
|group_no|row_no|match_%|count|
+--------+------+-------+-----+
|       1|     8|     80|   17|
|       1|    33|     80|   17|
|       1|    20|     82|   17|
|       1|     4|     83|   17|
|       1|    25|     83|   17|
|       1|     2|     84|   17|
|       1|    10|     85|   17|
|       1|    31|     85|   17|
|       1|    34|     87|   17|
|       1|    12|     87|   17|
|       1|    13|     87|   17|
|       1|    23|     87|   17|
|       1|    21|     87|   17|
|       1|    19|     96|   17|
|       1|     1|    100|   17|
|       1|     7|    100|   17|
|       1|    28|    100|   17|
|       2|    19|     80|   18|
|       2|    20|     80|   18|
|       2|    21|     80|   18|
+--------+------+-------+-----+
only showing top 20 rows



In [47]:
windowDept = Window.partitionBy("row_no").orderBy(col("count").desc())
joined = joined.withColumn("row",row_number().over(windowDept)) \
  .filter(col("row") == 1).drop("row")

In [48]:
joined.show()

+--------+------+-------+-----+
|group_no|row_no|match_%|count|
+--------+------+-------+-----+
|       8|     1|     80|   19|
|       8|     2|     83|   19|
|       3|     3|    100|   11|
|       8|     4|     82|   19|
|       8|     5|     80|   19|
|       3|     6|     83|   11|
|       8|     7|     80|   19|
|       8|     8|    100|   19|
|      10|     9|     80|   15|
|       8|    10|     92|   19|
|       3|    11|     84|   11|
|       8|    12|     81|   19|
|       8|    13|     85|   19|
|      35|    14|     95|    5|
|       3|    15|     83|   11|
|       3|    16|     81|   11|
|      15|    17|     81|    8|
|       3|    18|     83|   11|
|       8|    19|     81|   19|
|       8|    20|     85|   19|
+--------+------+-------+-----+
only showing top 20 rows



(optional) removed the match percentage of rows which are only matched with their self.

In [49]:
# joined = joined.withColumn('match_%', when(joined['count'] == 1, None).otherwise(joined['match_%']))
# joined.show()

# Final step

In [50]:
joined = joined.orderBy(col('group_no'), col('row_no'))

In [51]:
df = joined.join(df, joined.row_no == df.row_num, "inner")

In [52]:
df.show()

+--------+------+-------+-----+----------+-------+---------+-----------------+--------------+--------------+-------+
|group_no|row_no|match_%|count|First Name|Surname|Full Name|            Email|Categorisation|Email_Validity|row_num|
+--------+------+-------+-----+----------+-------+---------+-----------------+--------------+--------------+-------+
|       8|     1|     80|   19|       Ali|   NULL|      Ali|             NULL|          Live|             1|      1|
|       8|     2|     83|   19|      Umer|   NULL|     Umer|             NULL|          Live|             1|      2|
|       3|     3|    100|   11|     Adnan|   NULL|    Adnan|  adnan@gmail.com|          Live|             1|      3|
|       8|     4|     82|   19|   Abubakr|   NULL|  Abubakr|             NULL|          Live|             1|      4|
|       8|     5|     80|   19|     Baber|   NULL|    Baber|             NULL|        Lapsed|             1|      5|
|       3|     6|     83|   11|     Adeel|   NULL|    Adeel|  ad

In [53]:
df = df.select(F.row_number().over(Window.partitionBy(df['group_no']).orderBy(df['row_no'])).alias("row_num"),"match_%", "First Name", "Surname", "Full Name", "Email", "Categorisation", "Email_Validity")

In [54]:
df.show()

+-------+-------+----------+-------+---------+----------------+--------------+--------------+
|row_num|match_%|First Name|Surname|Full Name|           Email|Categorisation|Email_Validity|
+-------+-------+----------+-------+---------+----------------+--------------+--------------+
|      1|     80|      Noor|   NULL|     Noor|            NULL|        Lapsed|             1|
|      2|     87|     Ahmer|   NULL|    Ahmer|            NULL|        Lapsed|             1|
|      1|    100|     Adnan|   NULL|    Adnan| adnan@gmail.com|          Live|             1|
|      2|     83|     Adeel|   NULL|    Adeel| adeel@gmail.com|          Live|             0|
|      3|     84|     Abdul|   NULL|    Abdul| abdul@gmail.com|          Live|             1|
|      4|     83|      Taha|   NULL|     Taha|  taha@gmail.com|          Live|             1|
|      5|     81|     Saima|   NULL|    Saima| saima@gmail.com|          Live|             1|
|      6|     83|    Adeela|   NULL|   Adeela|adeela@gmail.c