# E-mails unification

In [44]:
import pandas as pd
from tqdm import tqdm
import random
from  datetime import datetime
from pyisemail import is_email

pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

### Functions defenition

In [29]:
email_df = pd.read_csv("generated-emails-32000.csv", header=0, names=['email'])
email_df.shape[0]
print(email_df["email"][9998])

Larryeager@frontiernet.net


In [67]:
def extract_columns_from_ddl(table_ddl):
    column_names = [line.split()[0] for line in table_ddl.strip().split('\n')]
    return  column_names

def generate_dummy_data_email_df(n):
    data = []
    max_index = email_df.shape[0]
    i = 0
    for _ in range(n):
        if i == max_index - 2:
            i = 0
        data.append({
            "EMAIL_PARTY_ID": random.randint(1, 1000),
            "PARTY_ID": random.randint(1, 1000),
            #"COUNTRY_ID": f'Country_{random.randint(1, 100)}',
            "EMAIL_PARTY_TYPE_ID": random.randint(1, 10),
            "EMAIL": email_df["email"][i],
            #"EMAIL_NUMBER_PREFIX": f'+{random.randint(10, 99)}',
            "SRC_ID": f'SRC_{random.randint(1, 100)}',
            "SRC_SYS_ID": f'SYS_{random.randint(1, 100)}',
            "DEL_FLAG": random.randint(0, 1),
            "INSERT_DATETIME": datetime.now().date(),
            "INS_PROCESS_ID": f'Process_{random.randint(1, 100)}',
            "UPDATE_DATETIME":  pd.to_datetime('2024-01-05'),
            "UPD_PROCESS_ID": f'Process_{random.randint(101, 200)}',
            "UPD_EFF_DATE": datetime.now().date()
        })
        i += 1
    return pd.DataFrame(data)

def MTCH_PT_EMAIL_definition(table_ddl):

    columns = extract_columns_from_ddl(table_ddl)
    return pd.DataFrame(columns=columns)

def insert_emails(df : pd.DataFrame):
    transformed_df = pd.DataFrame(
        {
        'EMAIL_PARTY_ID': df['EMAIL_PARTY_ID'],
        'PARTY_ID': df['PARTY_ID'],
        #'COUNTRY_ID': df['COUNTRY_ID'],
        'EMAIL_PARTY_TYPE_ID': df['EMAIL_PARTY_TYPE_ID'],
        'SOURCE_EMAIL': df['EMAIL'],
        
        'SRC_ID': df['SRC_ID'],
        'SRC_SYS_ID': df['SRC_SYS_ID'],
        'DELETE_FLAG': df['DEL_FLAG'],
        'INSERT_DATETIME': df['INSERT_DATETIME'],
        'INSERT_PROCESS_ID': df['INS_PROCESS_ID'],
        'UPDATE_DATETIME': df['UPDATE_DATETIME'],
        'UPDATE_DATETIME_PROCESS_ID': df['UPD_PROCESS_ID'],
        'UPDATE_DATETIME_EFFECTIVE_DATE': df['UPD_EFF_DATE']
        }
        
    )
    return transformed_df


def impute_null_values(df, column, value):
    for index, row in tqdm(df[df[column].isnull()].iterrows(),desc = "Null value impute is running ..."):
        df.loc[index,column] = value
    return df 

def unify_email(df,column_for_unification,group_by_column, phone_key, result_column):
    """
        df :  pd.DataFrame() with party email data
        column_for_unification :  column which contains valid email for unification
        group_by_column :  key, based on which the data will be grouped 
        phone_key : primary key, which identifies the email
        result_column : name of the result column, where the result values will be added
    """

    filtered_df = df[df[column_for_unification].notnull() & ~df[column_for_unification].isna()]
    grouped_phones = filtered_df.groupby(group_by_column)[phone_key].apply(list)
    df[result_column] = df[group_by_column].map(grouped_phones)
    
    return df


def list_to_string_transformation(df: pd.DataFrame, column: str):
    for index, row in tqdm(df.iterrows(), desc="List transformation is running ..."):
        # Check if the entry is iterable (e.g., a list), if not, skip the iteration.
        if not isinstance(row[column], list):
            #print(f"Non-iterable item at index {index}: {row['UNIFICATION_PARTY_PHONE_ID']}")
            continue
        
        resulted_string = ""
        for val in row[column]:
            # First method
            values_as_string = ["'{}'".format(int(val)) for val in row[column]]
            resulted_string = ', '.join(values_as_string)
            df.loc[index, column]
            
            # Properly concatenate the string with each value.
        df.loc[index, column] = resulted_string
    return df


## Table creation

In [54]:
ddl = """
    SOURCE_EMAIL VARCHAR(255),
    MATCHING_EMAIL VARCHAR(255),
    MASTER_EMAIL_PARTY_ID INTEGER,
    UNIFICATION_EMAIL_PARTY_ID INTEGER,
    SOURCE_IDENTIFIER VARCHAR(255),
    SOURCE_SYSTEM_IDENTIFIER VARCHAR(255),
    DELETE_FLAG INTEGER,
    INSERT_DATETIME DATE,
    INSERT_PROCESS_ID VARCHAR(255),
    UPDATE_DATETIME DATE,
    UPDATE_DATETIME_PROCESS_ID VARCHAR(255),
    UPDATE_DATETIME_EFFECTIVE_DATE DATE
"""
MTCH_PT_EMAIL = MTCH_PT_EMAIL_definition(ddl)
EMAIL_PARTY = generate_dummy_data_email_df(32029)
EMAIL_PARTY = EMAIL_PARTY.drop_duplicates()

#Type correction
MTCH_PT_EMAIL['UPDATE_DATETIME'] = pd.to_datetime(MTCH_PT_EMAIL['UPDATE_DATETIME'], errors='coerce')
MTCH_PT_EMAIL['INSERT_DATETIME'] = pd.to_datetime(MTCH_PT_EMAIL['INSERT_DATETIME'], errors='coerce')

print(EMAIL_PARTY.shape)
print(MTCH_PT_EMAIL.shape)

## MTCH table population
 #- Insert not validated data 

print(MTCH_PT_EMAIL.shape)
last_checked_date = pd.to_datetime('2024-01-10')
filtered_MTCH_PT = EMAIL_PARTY[EMAIL_PARTY['UPDATE_DATETIME']<last_checked_date]
print(filtered_MTCH_PT.shape)
MTCH_PT_EMAIL = pd.concat([MTCH_PT_EMAIL, insert_emails(filtered_MTCH_PT)], ignore_index=True)
for index, row in tqdm(MTCH_PT_EMAIL.iterrows(),desc = "EMAIL union"):
    MTCH_PT_EMAIL.loc[index,'SOURCE_EMAIL'] = row['SOURCE_EMAIL'].lower()
print(MTCH_PT_EMAIL.shape)

(32029, 12)
(0, 12)
(0, 12)
(32029, 12)


EMAIL union: 32029it [00:10, 3193.74it/s]

(32029, 17)





# Validation part
- check whether the domain used in the email is a valid domain and whether or not it has a valid MX record:

In [65]:

address = "murazoor@mail.ru"
bool_result_with_dns = is_email(address)
#detailed_result_with_dns = is_email(address, check_dns=True, diagnose=True)

print(bool_result_with_dns)
#print(detailed_result_with_dns)

True


In [69]:
print("MATCHING_EMAIL null value count ",MTCH_PT_EMAIL.MATCHING_EMAIL.isna().sum(),"\n")

for index, row in tqdm(MTCH_PT_EMAIL.iterrows(), "Email validation method is running ...",mininterval=1):
    # Assuming 'EMAIL_NUMBER' is the actual phone number column
    if is_email(row['SOURCE_EMAIL'], check_dns=False):       
        MTCH_PT_EMAIL.loc[index, 'MATCHING_EMAIL'] = row['SOURCE_EMAIL']
    else:
        MTCH_PT_EMAIL.loc[index, 'MATCHING_EMAIL'] = None

print("\n")
print("MATCHING_EMAIL null value count ",MTCH_PT_EMAIL.MATCHING_EMAIL.isna().sum())
print("\n")
print("UNIFICATION_EMAIL_PARTY_ID null value count ",MTCH_PT_EMAIL['UNIFICATION_EMAIL_PARTY_ID'].isna().sum())
print("\n")
MTCH_PT_EMAIL = unify_email(MTCH_PT_EMAIL,'MATCHING_EMAIL','PARTY_ID', 'EMAIL_PARTY_ID', 'UNIFICATION_EMAIL_PARTY_ID')
print("UNIFICATION_EMAIL_PARTY_ID null value count ",MTCH_PT_EMAIL['UNIFICATION_EMAIL_PARTY_ID'].isna().sum())
print("\n")

MTCH_PT_EMAIL = impute_null_values(MTCH_PT_EMAIL,'MATCHING_EMAIL',None)
print("MATCHING_EMAIL null value count ",MTCH_PT_EMAIL.MATCHING_EMAIL.isna().sum())

MATCHING_EMAIL null value count  0 



Email validation method is running ...: 32029it [00:20, 1595.90it/s]




MATCHING_EMAIL null value count  0


UNIFICATION_PARTY_EMAIL_ID null value count  0


UNIFICATION_PARTY_EMAIL_ID null value count  0




Null value impute is running ...: 0it [00:00, ?it/s]

MATCHING_EMAIL null value count  0





## List transform to store data in KBC

In [None]:
MTCH_PT_EMAIL = list_to_string_transformation(MTCH_PT_EMAIL,'UNIFICATION_EMAIL_PARTY_ID' )

## Add column to matching party table 

In [73]:
MTCH_PT = pd.read_csv('mtch_pt.csv')
MTCH_PT['UNIFICATION_EMAIL_PARTY_ID'] = None
#MTCH_PT = MTCH_PT.rename(columns={"MATCHING_MFO": "UNIFICATION_PARTY_PHONE_ID"})
#merged_df = pd.merge(MTCH_PT, MTCH_PT_PHONE[['PARTY_ID', 'UNIFICATION_PARTY_PHONE_ID']], on='PARTY_ID', how='left')

merged_df = pd.merge(MTCH_PT, 
                     MTCH_PT_EMAIL[['PARTY_ID', 'UNIFICATION_EMAIL_PARTY_ID']], 
                     on='PARTY_ID', 
                     how='left',
                     suffixes=('', '_from_email'))

# Update UNIFICATION_PARTY_PHONE_ID in MTCH_PT with the values from MTCH_PT_PHONE
MTCH_PT['UNIFICATION_EMAIL_PARTY_ID'] = merged_df['UNIFICATION_EMAIL_PARTY_ID_from_email']

In [74]:
MTCH_PT.head(10)['UNIFICATION_EMAIL_PARTY_ID']

0    [639.0, 949.0, 60.0, 722.0, 561.0, 419.0, 238....
1    [639.0, 949.0, 60.0, 722.0, 561.0, 419.0, 238....
2    [639.0, 949.0, 60.0, 722.0, 561.0, 419.0, 238....
3    [639.0, 949.0, 60.0, 722.0, 561.0, 419.0, 238....
4    [639.0, 949.0, 60.0, 722.0, 561.0, 419.0, 238....
5    [639.0, 949.0, 60.0, 722.0, 561.0, 419.0, 238....
6    [639.0, 949.0, 60.0, 722.0, 561.0, 419.0, 238....
7    [639.0, 949.0, 60.0, 722.0, 561.0, 419.0, 238....
8    [639.0, 949.0, 60.0, 722.0, 561.0, 419.0, 238....
9    [639.0, 949.0, 60.0, 722.0, 561.0, 419.0, 238....
Name: UNIFICATION_PARTY_EMAIL_ID, dtype: object