In [None]:
from urllib.parse import urlparse
def get_exact_urls_from_list(list_urls : list, validation_list : str) -> list:
	url_set = set()
	for url in list_urls:
		if url == "nan":
			continue
		parsed_url = urlparse(url)
		last_section = parsed_url.path.strip('/').split('/')[-1]
		for sub_url in validation_list:
			if sub_url in str(last_section).strip().lower():
				url_set.add(url)
				break

	return list(url_set)

In [None]:
import json
import src.webscrapper.webscrapper as wsc
import src.util.helper as helper
import src.settings.constants as const
import src.util.sqllite_helper as db_manager
import requests
def get_website_child_url(website : str, company_name : str, robot_file,child_url_keywords:list) ->dict:
	rsp = {}
	rsp["is_success"] = True 
	rsp["child_urls"] = None
	rsp["error"] = ""

	website = helper.format_url(website)
	str_sql= "SELECT child_url FROM child_url_list WHERE company_name = ? AND root_url=?"
	res = db_manager.select_scaler(str_sql,[company_name,website])
	if res != const.NOT_EXISTS:
		rsp["child_urls"]= json.loads(res)
		return rsp
	
	if robot_file == const.NOT_FOUND:
		is_url_allowed_to_scrap = True
	else:
		is_url_allowed_to_scrap = wsc.is_scraping_allowed_by_robots_file(website,robot_file)

	if not is_url_allowed_to_scrap:
		#logger.info(f"url {website} is not allowed scrap by robot.txt file.")
		rsp["error"] = "SCRAPPING_NOT_ALLOWED"	
		rsp["is_success"] = False
		return rsp	
	try:
		headers = {'User-Agent': const.USER_AGENTS[0]}
		with requests.get(website,headers=headers, timeout=const.REQUEST_TIME_OUT,verify=False) as r:
			r.raise_for_status()
			page_urls = wsc.url_from_html(r.text)
			#logger.info(f"Home Page All Urls : {page_urls}")
			child_urls = ",".join(wsc.validate_page_urls(website,page_urls))
	except Exception as e:
		rsp["is_success"] = False
		rsp["error"] = e
		#logger.error(f"error occured : {e}")
	exact_urls =  str(child_urls).strip().lower().split(",")
	exact_urls = get_exact_urls_from_list(exact_urls,child_url_keywords)
	rsp["child_urls"] = exact_urls
	if rsp["is_success"]:
		str_sql="INSERT INTO child_url_list (company_name,root_url, child_url)VALUES (?, ?, ?)"
		db_manager.execute_sql(str_sql,[company_name, website, json.dumps(exact_urls)])
	return rsp



    


In [None]:
import random
import src.util.helper as helper
def get_page_content(page_url:str, robot_file )->dict :
	"""
	This Function will go through Website specific page Find page text content.
	"""
	rsp = {}
	rsp["is_success"] = False
	str_sql = "SELECT page_text from company where page_url= ?"
	res = db_manager.select_scaler(str_sql,[page_url])
	if res != const.NOT_EXISTS:
		rsp["page_text"]= res
		rsp["is_success"] = True
		return rsp


	if robot_file == const.NOT_FOUND:
		is_url_allowed_to_scrap = True
	else:
		is_url_allowed_to_scrap = wsc.is_scraping_allowed_by_robots_file(page_url,robot_file)

	if not is_url_allowed_to_scrap:
		#logger.info(f"url {page_url} is not allowed scrap by robot.txt file.")
		rsp["page_text"] = "SCRAPPING_NOT_ALLOWED"	
		return rsp
	
	#logger.info(f"Request started for : {page_url}")
	try:
		headers = {'User-Agent': random.choice(const.USER_AGENTS)}
		with requests.get(page_url,headers=headers, timeout=const.REQUEST_TIME_OUT) as r:
			r.raise_for_status()
			rsp["page_text"] = helper.clean_scrapped_text(wsc.text_from_html(r.text))
			rsp["is_success"] = True
	except Exception as e:
		rsp["page_text"] = "ERROR" 
		rsp["is_success"] = False
		#logger.error(f"error occured : {e}")
	#logger.info(f"Request Completed for : {page_url}")
	return rsp

In [None]:
def is_retirive_all(data: dict):
    for key, value in data.items():
        if value == const.NOT_FOUND:
            return False
    return True

In [None]:
import src.util.api_helper as api_helper
import src.settings.constants as const
def get_comapny_data(website : str, comapny_name : str) ->dict:
    #logger.info(f"Searchin website : {website} for company name : {comapny_name}")
    rsp ={}
    url_set=set()
    rsp["is_success"]= False
    rsp["url"]=[]
    rsp["error"]=""
    rsp["llm_company_name"]=const.NOT_FOUND
    rsp["llm_address"]=const.NOT_FOUND
    rsp["llm_email"]=const.NOT_FOUND
    rsp["llm_phone"]=const.NOT_FOUND


    website = helper.format_url(website)
    robot_file = wsc.get_website_robot_file(website, comapny_name)
    rsp_url = get_website_child_url(website,comapny_name,robot_file,const.COMPANY_CONACT_ABOUT)

    if  not rsp_url["is_success"]  :
        rsp['error']= rsp_url['error']
        return rsp
    
    for url in rsp_url['child_urls']:
        rsp_page= get_page_content(url,robot_file)
        if rsp_page['is_success']:
            openAi_rsp = json.loads(api_helper.get_response_from_openai_json(const.PROMPT_DICT["compnay"].replace("{context}",rsp_page['page_text'])))

            if openAi_rsp['company_name'] != const.NOT_FOUND:
                rsp["is_success"]= True
                rsp["llm_company_name"]= openAi_rsp["company_name"]
                url_set.add(url)
            if openAi_rsp['phone'] != const.NOT_FOUND:
                rsp["is_success"]= True
                rsp["llm_phone"]= openAi_rsp["phone"]
                url_set.add(url)
            if openAi_rsp['email'] != const.NOT_FOUND:
                rsp["is_success"]= True
                rsp["llm_email"]= openAi_rsp["email"]
                url_set.add(url)
            if openAi_rsp['address'] != const.NOT_FOUND:
                rsp["is_success"]= True
                rsp["llm_address"]= openAi_rsp["address"]
                url_set.add(url)
            if is_retirive_all(rsp):
                break

    rsp["url"]= list(url_set)

    if rsp["is_success"]:
        str_sql = "Insert into company (name,page_url,url_type,team_info_json) Values(?,?,?,?)"
        db_manager.execute_sql(str_sql,[rsp["llm_company_name"],json.dumps(rsp['url']),"contact",json.dumps(rsp)])
    
    return rsp  

In [None]:
import src.data_enrichment.company_data as cenrich

In [None]:
value = cenrich.get_comapny_data("www.a3finance.com","A3 Finance")

In [None]:
value

In [None]:
value =cenrich.get_comapny_data("https://www.unlbric.com","Big Red Investment Club")

In [None]:
value

## Testing

In [None]:
import src.util.log_helper as log_helper
import src.util.helper as helper
import time
import os

logger = log_helper.set_get_logger("contact_deduplication",helper.get_logfile_name())

def get_file_name_and_extension(path):
	file_name, file_extension = os.path.splitext(os.path.basename(path))
	return file_name, file_extension



In [None]:
def get_col_value(row : any,col_list : list) -> str:
	val_list = []
	for col in col_list:
		val_list.append(str(row[col]).strip())
	return " ".join(val_list)


In [None]:
import src.util.similarity_helper as simi_helper

def compare_rows(src_row, trg_row):
	"""
	Based On Compare Columns, Calculate Levenstine, Jaro Winkle and Fuzzy raito and select
	where Error rate lowest or similarity ratio is higher
	"""
	comparesion_list = []
	id = 0
	min_error = 100.0
	min_error_id = -1
	for comp_columns in const.COMPARE_COLUMNS:
		str_src = get_col_value(src_row, comp_columns['src_cols'])
		str_trg = get_col_value(trg_row, comp_columns['trg_cols'])
		res = {}
		res["name"] = comp_columns["name"]
		res["f" + str(id)] = simi_helper.get_fuzzy_similarity(str_src, str_trg)
		res["l" + str(id)] = simi_helper.get_levenshtein_similarity(str_src, str_trg)
		res["j" + str(id)] = simi_helper.get_jaro_winkler_similarity(str_src, str_trg)
		# logger.info(f"src val : {str_src}, trg val : {str_trg}, fuzz : {res['f' + str(id)] }, levst : {res['l' + str(id)]}, jero : {res['j' + str(id)]}")
		res["e" + str(id)] = (1.0 - (float(res["f" + str(id)]))*0.15 + float(res["l" + str(id)])*0.15 + float(res["j" + str(id)])*0.70) * 100
		if float(res["e" + str(id)]) < min_error:
			min_error = res["e" + str(id)]
			min_error_id = id
		comparesion_list.append(res)
		id += 1
	# logger.info(f"min error rate : {min_error}, min error id : {min_error_id}")
	return min_error_id, comparesion_list[min_error_id]

In [None]:
def find_dup_row_by_sequence(df, sort_col):
	df = df.sort_values(by=[sort_col])
	list_dup_rows = []
	# Now Iterate through Each record and try to find out Group
	for src_idx in range(len(df)):
		src_row = df.iloc[src_idx]
		for trg_idx in range(len(df)):
			trg_row = df.iloc[trg_idx]
			# logger.info(f"source index {src_idx} ,target index : {trg_idx}")
			if trg_idx > src_idx:
				dup_grp = {}
				dup_grp[const.COL_SOURCE_ID] = src_row[const.COL_ID]
				dup_grp[const.COL_TARGET_ID] = trg_row[const.COL_ID]
				id, rsp = compare_rows(src_row, trg_row)
				# logger.info(id, rsp)
				dup_grp[const.COL_FUZZ_SIMILARITY] = float(rsp["f" + str(id)])
				df[const.COL_LEVENSHTEIN_SIMILARITY]  = float(rsp["l" + str(id)])
				dup_grp[const.COL_JARO_SIMILARITY] = float(rsp["j" + str(id)])
				dup_grp[const.COL_ERROR_RATE] = float(rsp["e" + str(id)])
				if dup_grp[const.COL_ERROR_RATE] <= const.MAX_ERROR_RATE:
					list_dup_rows.append(dup_grp)
	return list_dup_rows

In [None]:
import pandas as pd

def find_dup_row_by_fname_lname(df, col_first_name="FirstName"):
	list_dup_rows = []
	# Group by Fname first.
	df_dup = df.groupby(["fname"], as_index=False).agg(count=("fname", 'count'))
	df_dup = df_dup[df_dup["count"] > 1]
	for id, row in df_dup.iterrows():
		df_flt = df[df['fname'] == row['fname']]
		df_flt = df_flt.sort_values(by=[col_first_name])
		
		logger.info(f"Started processing records start with {row['fname']}.")
		# Group By lname
		df_dup_lname = df_flt.groupby(["lname"], as_index=False).agg(count=("lname", 'count'))
		df_dup_lname = df_dup_lname[df_dup_lname["count"] > 1]

		# Now Process through each lname group Records
		for id1, row in df_dup_lname.iterrows():
			df_flt_lname = df_flt[df_flt['lname'] == row['lname']]
			df_flt_lname = df_flt_lname.sort_values(by=["LastName"])
			list_dup_rows.extend(find_dup_row_by_sequence(df=df_flt_lname, sort_col="LastName"))

	logger.info("Completed processing all records.")
	df_dup_rows = pd.DataFrame.from_dict(list_dup_rows)
	return df_dup_rows

In [None]:
import pandas as pd

def read_file_prep_dataframe(file_path, file_ext, map_file_path):
	if file_ext.lower() == ".csv":
		df = pd.read_csv(file_path, encoding='cp1252')
	else:
		df = pd.read_excel(file_path)

	# Reading Mapping file prep dictionary
	df_map = pd.read_excel(map_file_path)
	if "SRC_COL" not in df_map.columns or "TRG_COL" not in df_map.columns:
		logger.error("Mapping file does not have 'SRC_COL' or 'TRG_COL'. Please correct file and re run.")
		return

	for id, row in df_map.iterrows():
		src_col = str(row["SRC_COL"]).strip()
		trg_col = str(row["TRG_COL"]).strip()
		if row["TRG_COL"] not in df.columns:
			logger.error(f"Target column {trg_col} does not exists into input file {file_path}. Please correct mapping file and re run.")
			return
		# Add Column
		if trg_col.lower() == src_col.lower():
			df.rename(columns={trg_col: trg_col + "_org"}, inplace=True)
			df[src_col] = df[trg_col + "_org"]
		else:
			df[src_col] = df[trg_col]
		# Clean only SRC Columns
		df[src_col] = df[src_col].map(lambda x: helper.clean_contact_data(x) if isinstance(x, str) else x)

	df = df.fillna("")  # Fill Empty value as all columns are strings only

	# Clean text of all dataframe text data one time
	# This is not required now. Need to do during export the file.
	# df = df.map(lambda x: helper.clean_contact_data(x) if isinstance(x, str) else x)

	## Add New Columns to Add into original Dataframe
	df.insert(0, const.COL_DUP_GROUP_ID, 999999)
	if const.COL_ID not in df.columns:
		# Add RowNum to DataFrame
		df.insert(1, const.COL_ID, range(0 + 1, len(df) + 1))
	else:
		df.rename(columns={const.COL_ID: const.COL_ID + "_org"}, inplace=True)
		df.insert(1, const.COL_ID, range(0 + 1, len(df) + 1))

	df.insert(2, const.COL_ERROR_RATE, 100.00)
	df[const.COL_FUZZ_SIMILARITY] = 0.0
	df[const.COL_LEVENSHTEIN_SIMILARITY] = 0.0
	df[const.COL_JARO_SIMILARITY] = 0.0
	df[const.COL_DUP_ROW_GROUP] = ""

	# Inserting new column from First Name, take first letter of first name
	df['fname'] = df["FirstName"].apply(lambda x: x[0] if len(x) > 0 else '')
	#df['lname'] = df["LastName"].apply(lambda x: x[0] if len(x) > 0 else '')
	df['lname'] = df["LastName"].apply(lambda x: x[:2] if isinstance(x, str) else '')

	# Inserting new column from First Name, take first letter of first name
	df['part_email'] = df["Email"].apply(lambda x: x.split('@')[0])
	# Add new columns required
	df["dup_group_type"] = ""
	df["data_source"] = ""
	df["action"] = ""
	df["source"] = ""
	df["new_title"] = ""
	df["new_phone"] = ""
	df["new_email"] = ""
	df["prv_org"] = ""
	df["prv_title"] = ""

	return df

In [None]:

def load_file(file_path : str, ext:str):
	if ext.lower() == ".csv":
		df = pd.read_csv(file_path, encoding='cp1252')
	else:
		df = pd.read_excel(file_path)
	return df

In [None]:
def find_group(row, row_to_group, group_id):
	if row in row_to_group:
		return row_to_group[row]
	row_to_group[row] = group_id
	return group_id

def union_groups(row1, row2, row_to_group, group_id):
	group1 = find_group(row1, row_to_group, group_id)
	group2 = find_group(row2, row_to_group, group_id)
	if group1 != group2:
		for row in row_to_group:
			if row_to_group[row] == group2:
				row_to_group[row] = group1

In [None]:
def assign_group_ids(pairs):
	row_to_group = {}
	group_id = 1

	for src_row, trg_row in pairs:
		if src_row not in row_to_group and trg_row not in row_to_group:
			row_to_group[src_row] = group_id
			row_to_group[trg_row] = group_id
			group_id += 1
		elif src_row in row_to_group and trg_row not in row_to_group:
			row_to_group[trg_row] = row_to_group[src_row]
		elif trg_row in row_to_group and src_row not in row_to_group:
			row_to_group[src_row] = row_to_group[trg_row]
		else:
			union_groups(src_row, trg_row, row_to_group, group_id)

	# Ensure each component has a unique group ID
	unique_groups = {}
	current_group_id = 1
	for row in row_to_group:
		old_group_id = row_to_group[row]
		if old_group_id not in unique_groups:
			unique_groups[old_group_id] = current_group_id
			current_group_id += 1
		row_to_group[row] = unique_groups[old_group_id]

	return row_to_group

In [None]:
import src.settings.constants as const

def assign_dup_row_groups(df, dup_df):
	dup_df = dup_df.sort_values(by=["src_row_no"])

	# This will give Group ID assign to each row
	group_ids = assign_group_ids(list(dup_df[['src_row_no', 'trg_row_no']].itertuples(index=False, name=None)))

	# In this Loop If any two record qualify or three records, One Record data is missing.
	for src_idx in range(len(dup_df)):
		row = dup_df.iloc[src_idx]

		if df[df[const.COL_ID] == row[const.COL_SOURCE_ID]][const.COL_DUP_GROUP_ID].values[0] == 999999:
			df.loc[df[const.COL_ID] == row[const.COL_SOURCE_ID], [const.COL_FUZZ_SIMILARITY, const.COL_LEVENSHTEIN_SIMILARITY,
																  const.COL_JARO_SIMILARITY, const.COL_ERROR_RATE, const.COL_DUP_GROUP_ID, const.COL_DUP_ROW_GROUP]] = [
				row[const.COL_FUZZ_SIMILARITY], row[const.COL_LEVENSHTEIN_SIMILARITY], row[const.COL_JARO_SIMILARITY],
				row[const.COL_ERROR_RATE], group_ids[row[const.COL_SOURCE_ID]], str(row[const.COL_SOURCE_ID]) + "-" + str(row[const.COL_TARGET_ID])]
		else:
			df.loc[df[const.COL_ID] == row[const.COL_TARGET_ID], [const.COL_FUZZ_SIMILARITY, const.COL_LEVENSHTEIN_SIMILARITY,
																  const.COL_JARO_SIMILARITY, const.COL_ERROR_RATE, const.COL_DUP_GROUP_ID, const.COL_DUP_ROW_GROUP]] = [
				row[const.COL_FUZZ_SIMILARITY], row[const.COL_LEVENSHTEIN_SIMILARITY], row[const.COL_JARO_SIMILARITY],
				row[const.COL_ERROR_RATE], group_ids[row[const.COL_TARGET_ID]], str(row[const.COL_SOURCE_ID]) + "-" + str(row[const.COL_TARGET_ID])]

	# Loop Through again with Dup Record for any Source Or Target Row is missing For Group ID Update
	for src_idx in range(len(dup_df)):
		row = dup_df.iloc[src_idx]
		grp_id = df[df[const.COL_ID] == row[const.COL_SOURCE_ID]][const.COL_DUP_GROUP_ID].values[0]
		if grp_id != 999999:
			df.loc[df[const.COL_ID] == row[const.COL_TARGET_ID], [const.COL_FUZZ_SIMILARITY, const.COL_LEVENSHTEIN_SIMILARITY,
																  const.COL_JARO_SIMILARITY, const.COL_ERROR_RATE, const.COL_DUP_GROUP_ID, const.COL_DUP_ROW_GROUP]] = [
				row[const.COL_FUZZ_SIMILARITY], row[const.COL_LEVENSHTEIN_SIMILARITY], row[const.COL_JARO_SIMILARITY],
				row[const.COL_ERROR_RATE], grp_id, str(row[const.COL_SOURCE_ID]) + "-" + str(row[const.COL_TARGET_ID])]
		else:
			grp_id = df[df[const.COL_ID] == row[const.COL_TARGET_ID]][const.COL_DUP_GROUP_ID].values[0]
			df.loc[df[const.COL_ID] == row[const.COL_SOURCE_ID], [const.COL_FUZZ_SIMILARITY, const.COL_LEVENSHTEIN_SIMILARITY,
																  const.COL_JARO_SIMILARITY, const.COL_ERROR_RATE, const.COL_DUP_GROUP_ID, const.COL_DUP_ROW_GROUP]] = [
				row[const.COL_FUZZ_SIMILARITY], row[const.COL_LEVENSHTEIN_SIMILARITY], row[const.COL_JARO_SIMILARITY],
				row[const.COL_ERROR_RATE], grp_id, str(row[const.COL_SOURCE_ID]) + "-" + str(row[const.COL_TARGET_ID])]

	return df


In [None]:

def process_contact_deduplication(file_path: str, map_file_path: str):
	start_time = time.time()
	try:
		if not os.path.exists(file_path):
			logger.error(f"File {file_path} does not exist. Please check file path and try again.")
			return

		if not os.path.exists(map_file_path):
			logger.error(f"Mapping file {map_file_path} does not exist. Please check file path and try again.")
			return

		name, ext = helper.get_file_name_and_extension(file_path)
		logger.info(f"input file name: {name}{ext}")
		if not (ext.lower() == ".csv" or ext.lower() == ".xlsx"):
			logger.error("System supports csv and xlsx file types only. Please check file extension and try again.")
			return

		# Step 1
		file_out = const.OUTPUT_PATH + "/" + name + "_org_nGram" + ext
		if not os.path.exists(file_out):
			logger.info(f"Started processing file {file_path}")
			df = read_file_prep_dataframe(file_path, ext, map_file_path)
			logger.info(f"No Of Records {len(df)} found in file {file_path}")
			# return df
			logger.info(f"Saving original file {file_out}")
			helper.save_file(df, file_out, ext)
		else:
			logger.info(f"original file {file_out} exists, It will use it")
			df = helper.load_file(file_out, ext)
			# return df

		file_out = const.OUTPUT_PATH + "/" + name + "_dup_N_gram" + ext
		if not os.path.exists(file_out):
			# Step 2
			logger.info("Started finding probable and exact duplicate records")
			df_dup = find_dup_row_by_fname_lname(df)
			if len(df_dup) == 0:
				logger.info("There are no duplicate rows found.")
				return df
			else:
				logger.info(f"No Of Duplicate Records found: {len(df_dup)}")

			# Step 3
			logger.info("Assigning duplicate group id to duplicate rows")
			df = assign_dup_row_groups(df, df_dup)
			df = df.fillna("")
			logger.info(f"Saving file with duplicate groups file {file_out}")
			helper.save_file(df, file_out, ext)
		else:
			logger.info(f"duplicate group file {file_out} exists, It will use it.")
			df = helper.load_file(file_out, ext)
	except Exception as e:
		logger.error(f"An error occurred: {e}")

In [None]:
process_contact_deduplication("./data/Contact_03222021.xlsx","./data/Contact_03222021_Mapping.xlsx")

In [None]:
process_contact_deduplication("./data/Contact_03222021.xlsx","./data/Contact_03222021_Mapping.xlsx")

## New Algorithems

In [None]:
import src.util.similarity_helper as simi_helper
import time
def compare_rows(src_row, trg_row):
	"""
	Based On Compare Columns, Calculate Levenstine, Jaro Winkle and Fuzzy raito and select
	where Error rate lowest or similarity ratio is higher
	"""
	comparesion_list = []
	id = 0
	min_error = 100.0
	min_error_id = -1
	for comp_columns in const.COMPARE_COLUMNS:
		str_src = get_col_value(src_row, comp_columns['src_cols'])
		str_trg = get_col_value(trg_row, comp_columns['trg_cols'])
		res = {}
		res["name"] = comp_columns["name"]
		res["f" + str(id)] = simi_helper.get_fuzzy_similarity(str_src, str_trg)
		res["n" + str(id)] = simi_helper.ngram_similarity(str_src, str_trg)
		res["j" + str(id)] = simi_helper.get_jaro_winkler_similarity(str_src, str_trg)
		res["s" + str(id)] = simi_helper.sound_index(str_src,str_trg)
		# logger.info(f"src val : {str_src}, trg val : {str_trg}, fuzz : {res['f' + str(id)] }, levst : {res['l' + str(id)]}, jero : {res['j' + str(id)]}")
		res["e" + str(id)] = (1.0 - ((float(res["f" + str(id)])*.05) + (float(res["n" + str(id)])*0.35) + (float(res["j" + str(id)])*0.30)+(float(res["s" + str(id)])*0.30)) )*100
		if float(res["e" + str(id)]) < min_error:
			min_error = res["e" + str(id)]
			min_error_id = id
		comparesion_list.append(res)
		id += 1
	# logger.info(f"min error rate : {min_error}, min error id : {min_error_id}")
	return min_error_id, comparesion_list[min_error_id]

In [None]:
def find_dup_row_by_sequence(df, sort_col):
	df = df.sort_values(by=[sort_col])
	list_dup_rows = []
	# Now Iterate through Each record and try to find out Group
	for src_idx in range(len(df)):
		src_row = df.iloc[src_idx]
		for trg_idx in range(len(df)):
			trg_row = df.iloc[trg_idx]
			# logger.info(f"source index {src_idx} ,target index : {trg_idx}")
			if trg_idx > src_idx:
				dup_grp = {}
				dup_grp[const.COL_SOURCE_ID] = src_row[const.COL_ID]
				dup_grp[const.COL_TARGET_ID] = trg_row[const.COL_ID]
				id, rsp = compare_rows(src_row, trg_row)
				# logger.info(id, rsp)
				dup_grp[const.COL_FUZZ_SIMILARITY] = float(rsp["f" + str(id)])
				dup_grp["ngram"] = float(rsp["n" + str(id)])
				dup_grp[const.COL_JARO_SIMILARITY] = float(rsp["j" + str(id)])
				dup_grp["Sound_index"] = float(rsp["s" + str(id)])
				dup_grp[const.COL_ERROR_RATE] = float(rsp["e" + str(id)])
				if dup_grp[const.COL_ERROR_RATE] <= const.MAX_ERROR_RATE:
					list_dup_rows.append(dup_grp)
	return list_dup_rows

In [None]:
import pandas as pd

def find_dup_row_by_fname_lname(df, col_first_name="FirstName"):
	list_dup_rows = []
	# Group by Fname first.
	df_dup = df.groupby(["fname"], as_index=False).agg(count=("fname", 'count'))
	df_dup = df_dup[df_dup["count"] > 1]
	for id, row in df_dup.iterrows():
		df_flt = df[df['fname'] == row['fname']]
		df_flt = df_flt.sort_values(by=[col_first_name])
		
		logger.info(f"Started processing records start with {row['fname']}.")
		# Group By lname
		df_dup_lname = df_flt.groupby(["lname"], as_index=False).agg(count=("lname", 'count'))
		df_dup_lname = df_dup_lname[df_dup_lname["count"] > 1]

		# Now Process through each lname group Records
		for id1, row in df_dup_lname.iterrows():
			df_flt_lname = df_flt[df_flt['lname'] == row['lname']]
			df_flt_lname = df_flt_lname.sort_values(by=["LastName"])
			list_dup_rows.extend(find_dup_row_by_sequence(df=df_flt_lname, sort_col="LastName"))

	logger.info("Completed processing all records.")
	df_dup_rows = pd.DataFrame.from_dict(list_dup_rows)
	return df_dup_rows

In [None]:
import pandas as pd

def read_file_prep_dataframe(file_path, file_ext, map_file_path):
	if file_ext.lower() == ".csv":
		df = pd.read_csv(file_path, encoding='cp1252')
	else:
		df = pd.read_excel(file_path)

	# Reading Mapping file prep dictionary
	df_map = pd.read_excel(map_file_path)
	if "SRC_COL" not in df_map.columns or "TRG_COL" not in df_map.columns:
		logger.error("Mapping file does not have 'SRC_COL' or 'TRG_COL'. Please correct file and re run.")
		return

	for id, row in df_map.iterrows():
		src_col = str(row["SRC_COL"]).strip()
		trg_col = str(row["TRG_COL"]).strip()
		if row["TRG_COL"] not in df.columns:
			logger.error(f"Target column {trg_col} does not exists into input file {file_path}. Please correct mapping file and re run.")
			return
		# Add Column
		if trg_col.lower() == src_col.lower():
			df.rename(columns={trg_col: trg_col + "_org"}, inplace=True)
			df[src_col] = df[trg_col + "_org"]
		else:
			df[src_col] = df[trg_col]
		# Clean only SRC Columns
		df[src_col] = df[src_col].map(lambda x: helper.clean_contact_data(x) if isinstance(x, str) else x)

	df = df.fillna("")  # Fill Empty value as all columns are strings only

	# Clean text of all dataframe text data one time
	# This is not required now. Need to do during export the file.
	# df = df.map(lambda x: helper.clean_contact_data(x) if isinstance(x, str) else x)

	## Add New Columns to Add into original Dataframe
	df.insert(0, const.COL_DUP_GROUP_ID, 999999)
	if const.COL_ID not in df.columns:
		# Add RowNum to DataFrame
		df.insert(1, const.COL_ID, range(0 + 1, len(df) + 1))
	else:
		df.rename(columns={const.COL_ID: const.COL_ID + "_org"}, inplace=True)
		df.insert(1, const.COL_ID, range(0 + 1, len(df) + 1))

	df.insert(2, const.COL_ERROR_RATE, 100.00)
	df[const.COL_FUZZ_SIMILARITY] = 0.0
	df["ngram"] = 0.0
	df[const.COL_JARO_SIMILARITY] = 0.0
	df["Sound_index"]= 0.0
	df[const.COL_DUP_ROW_GROUP] = ""

	# Inserting new column from First Name, take first letter of first name
	df['fname'] = df["FirstName"].apply(lambda x: x[0] if len(x) > 0 else '')
	# df['lname'] = df["LastName"].apply(lambda x: x[0] if len(x) > 0 else '')
	df['lname'] = df["LastName"].apply(lambda x: x[:2] if isinstance(x, str) else '')

	# Inserting new column from First Name, take first letter of first name
	df['part_email'] = df["Email"].apply(lambda x: x.split('@')[0])
	# Add new columns required
	df["dup_group_type"] = ""
	df["data_source"] = ""
	df["action"] = ""
	df["source"] = ""
	df["new_title"] = ""
	df["new_phone"] = ""
	df["new_email"] = ""
	df["prv_org"] = ""
	df["prv_title"] = ""
	

	return df

In [None]:
import src.settings.constants as const

def assign_dup_row_groups(df, dup_df):
	dup_df = dup_df.sort_values(by=["src_row_no"])

	# This will give Group ID assign to each row
	group_ids = assign_group_ids(list(dup_df[['src_row_no', 'trg_row_no']].itertuples(index=False, name=None)))

	# In this Loop If any two record qualify or three records, One Record data is missing.
	for src_idx in range(len(dup_df)):
		row = dup_df.iloc[src_idx]

		if df[df[const.COL_ID] == row[const.COL_SOURCE_ID]][const.COL_DUP_GROUP_ID].values[0] == 999999:
			df.loc[df[const.COL_ID] == row[const.COL_SOURCE_ID], ["Sound_index",const.COL_FUZZ_SIMILARITY, "ngram",
																  const.COL_JARO_SIMILARITY, const.COL_ERROR_RATE, const.COL_DUP_GROUP_ID, const.COL_DUP_ROW_GROUP]] = [
				row["Sound_index"],row[const.COL_FUZZ_SIMILARITY], row["ngram"], row[const.COL_JARO_SIMILARITY],
				row[const.COL_ERROR_RATE], group_ids[row[const.COL_SOURCE_ID]], str(row[const.COL_SOURCE_ID]) + "-" + str(row[const.COL_TARGET_ID])]
		else:
			df.loc[df[const.COL_ID] == row[const.COL_TARGET_ID], ["Sound_index",const.COL_FUZZ_SIMILARITY, "ngram",
																  const.COL_JARO_SIMILARITY, const.COL_ERROR_RATE, const.COL_DUP_GROUP_ID, const.COL_DUP_ROW_GROUP]] = [
				row["Sound_index"],row[const.COL_FUZZ_SIMILARITY], row["ngram"], row[const.COL_JARO_SIMILARITY],
				row[const.COL_ERROR_RATE], group_ids[row[const.COL_TARGET_ID]], str(row[const.COL_SOURCE_ID]) + "-" + str(row[const.COL_TARGET_ID])]

	# Loop Through again with Dup Record for any Source Or Target Row is missing For Group ID Update
	for src_idx in range(len(dup_df)):
		row = dup_df.iloc[src_idx]
		grp_id = df[df[const.COL_ID] == row[const.COL_SOURCE_ID]][const.COL_DUP_GROUP_ID].values[0]
		if grp_id != 999999:
			df.loc[df[const.COL_ID] == row[const.COL_TARGET_ID], ["Sound_index",const.COL_FUZZ_SIMILARITY, "ngram",
																  const.COL_JARO_SIMILARITY, const.COL_ERROR_RATE, const.COL_DUP_GROUP_ID, const.COL_DUP_ROW_GROUP]] = [
				row["Sound_index"],row[const.COL_FUZZ_SIMILARITY], row["ngram"], row[const.COL_JARO_SIMILARITY],
				row[const.COL_ERROR_RATE], grp_id, str(row[const.COL_SOURCE_ID]) + "-" + str(row[const.COL_TARGET_ID])]
		else:
			grp_id = df[df[const.COL_ID] == row[const.COL_TARGET_ID]][const.COL_DUP_GROUP_ID].values[0]
			df.loc[df[const.COL_ID] == row[const.COL_SOURCE_ID], ["Sound_index",const.COL_FUZZ_SIMILARITY, "ngram",
																  const.COL_JARO_SIMILARITY, const.COL_ERROR_RATE, const.COL_DUP_GROUP_ID, const.COL_DUP_ROW_GROUP]] = [
				row["Sound_index"],row[const.COL_FUZZ_SIMILARITY], row["ngram"], row[const.COL_JARO_SIMILARITY],
				row[const.COL_ERROR_RATE], grp_id, str(row[const.COL_SOURCE_ID]) + "-" + str(row[const.COL_TARGET_ID])]

	return df


In [None]:

def process_contact_deduplication(file_path: str, map_file_path: str):
	start_time = time.time()
	try:
		if not os.path.exists(file_path):
			logger.error(f"File {file_path} does not exist. Please check file path and try again.")
			return

		if not os.path.exists(map_file_path):
			logger.error(f"Mapping file {map_file_path} does not exist. Please check file path and try again.")
			return

		name, ext = helper.get_file_name_and_extension(file_path)
		logger.info(f"input file name: {name}{ext}")
		if not (ext.lower() == ".csv" or ext.lower() == ".xlsx"):
			logger.error("System supports csv and xlsx file types only. Please check file extension and try again.")
			return

		# Step 1
		file_out = const.OUTPUT_PATH + "/" + name + "_org_New_logic" + ext
		if not os.path.exists(file_out):
			logger.info(f"Started processing file {file_path}")
			df = read_file_prep_dataframe(file_path, ext, map_file_path)
			logger.info(f"No Of Records {len(df)} found in file {file_path}")
			# return df
			logger.info(f"Saving original file {file_out}")
			helper.save_file(df, file_out, ext)
		else:
			logger.info(f"original file {file_out} exists, It will use it")
			df = helper.load_file(file_out, ext)
			# return df

		file_out = const.OUTPUT_PATH + "/" + name + "_dup_New_logic" + ext
		if not os.path.exists(file_out):
			# Step 2
			logger.info("Started finding probable and exact duplicate records")
			df_dup = find_dup_row_by_fname_lname(df)
			if len(df_dup) == 0:
				logger.info("There are no duplicate rows found.")
				return df
			else:
				logger.info(f"No Of Duplicate Records found: {len(df_dup)}")

			# Step 3
			logger.info("Assigning duplicate group id to duplicate rows")
			df = assign_dup_row_groups(df, df_dup)
			df = df.fillna("")
			logger.info(f"Saving file with duplicate groups file {file_out}")
			helper.save_file(df, file_out, ext)
		else:
			logger.info(f"duplicate group file {file_out} exists, It will use it.")
			df = helper.load_file(file_out, ext)
	except Exception as e:
		logger.error(f"An error occurred: {e}")

In [None]:
process_contact_deduplication("./data/Contact Test_Data_Jan 15-2025-407 records-allscenario.xlsx","./data/Contact Test_Data_Jan 15-2025-407 records-allscenario_Mapping.xlsx")

In [None]:
import jellyfish as jfish
str_src= "Adam Friedman"
str_trg = "Adam Freda"
def sound_index(str_src, str_trg):
	soundex1 = jfish.soundex(str_src)
	soundex2 = jfish.soundex(str_trg)
	soundex_score = 1.0 if soundex1 == soundex2 else 0.0
	return soundex_score
sound_index(str_src,str_trg)

In [1]:
import Levenshtein

str_src= "ajit"
str_trg = "ankit"
def get_levenshtein_similarity(str_src,str_trg):
	"""
	Levenshtein distance measures the minimum number of single-character edits required to change one string into another.
	"""
	if  (len(str_src) != 0 and len(str_trg) != 0):
		return Levenshtein.ratio(str_src, str_trg)
	else:
		return 0.0	

levestin=get_levenshtein_similarity(str_src,str_trg)
levestin

0.6666666666666667

In [26]:
from rapidfuzz import fuzz
str_src= "Geoffrey"
str_trg = "Gregory"
def get_fuzzy_similarity(str_src, str_trg):
	if len(str_src) != 0 and len(str_trg) != 0:
		return fuzz.token_set_ratio(str_src, str_trg) / 100
	else:
		return 0.0

fuzzy=get_fuzzy_similarity(str_src,str_trg)
fuzzy

0.6666666666666665

In [27]:
import jellyfish as jfish

str_src= "Geoffrey"
str_trg = "Gregory"
def get_jaro_winkler_similarity(str_src, str_trg):
	if len(str_src) != 0 and len(str_trg) != 0:
		return jfish.jaro_winkler_similarity(str_src, str_trg)
	else:
		return 0.0

jarao=get_jaro_winkler_similarity(str_src, str_trg)
jarao

0.8017857142857143

In [None]:
len("Robert Walker")

In [None]:
error = 100-(((float(fuzzy))*0.25 + float(levestin)*0.25 + float(jarao)*0.50) * 100)
error

In [None]:
error

In [None]:
(1.0-(float(fuzzy) + float(levestin) + float(jarao) )/3)*100

In [6]:
import unicodedata

def remove_accents(input_str):
    # Normalize the string to its decomposed form (NFKD)
    normalized = unicodedata.normalize('NFKD', input_str)
    # Build a new string without the accent characters (combining marks)
    plain_str = ''.join(
        char for char in normalized
        if not unicodedata.combining(char)
    )
    return plain_str

# Example usage:
french_name = "Ã‡avusoglu"
plain_name = remove_accents(french_name)
print(plain_name)  # Output: "Francois L'Olonnais"


A‡avusoglu
