In [1]:
from utils.extraction.extraction import *
from utils.transformation.transformation import *
from utils.load.load import *
import pandas as pd

In [None]:
def extraction():
    contact_raw_data=contact_collector()
    return contact_raw_data

In [None]:
def duplicates_management(df):
    """
    Manages duplicates in a DataFrame based on email values.

    Args:
        df (pandas.DataFrame): Input DataFrame.

    Returns:
        temp_df (pandas.DataFrame): DataFrame with duplicates managed based on email values.
    """

    temp_df=df.copy()
    duplicates_list = temp_df['email'].duplicated(keep=False) & (df['email'].notna())
    duplicated = temp_df[duplicates_list]

    for _, group in duplicated.groupby('email'):
        
        ordered_group = group.sort_values('lastmodifieddate', ascending=False)

        ordered_group = ordered_group.fillna(method='bfill')

        industries = ';'+';'.join(str(industry) for industry in ordered_group['industry'].unique())

        ordered_group.loc[ordered_group.index[0], 'industry'] = industries


        id_list=list(ordered_group['hs_object_id'])
        value_to_replace=id_list[0]
        row_to_replace = temp_df[temp_df['hs_object_id'] == value_to_replace].index[0]
        temp_df.loc[row_to_replace] = ordered_group[ordered_group['hs_object_id'] == value_to_replace].iloc[0]

        reference_values = id_list[1:]
        temp_df = temp_df[~temp_df['hs_object_id'].isin(reference_values)]


    return temp_df

In [None]:
def transformation(contacts_df):
    contacts_df = contacts_df.drop('createdate', axis=1)
    contacts_df[['country','city']]=contacts_df['country'].apply(country_recognition).apply(pd.Series)
    contacts_df['raw_email']=contacts_df['raw_email'].apply(found_emails)
    contacts_df = contacts_df.rename(columns={'raw_email': 'email'})
    contacts_df['phone']=contacts_df.apply(lambda x: fix_phone_numbers(x['phone'], x['country']), axis=1)

    #Duplicates Management
    no_duplicates_df=duplicates_management(contacts_df)

    return no_duplicates_df

In [None]:
def load(df):
    records = df.to_dict(orient='records')
    for record in records:
        refix_cord=load_record_management(record)
        saving_contact(refix_cord)

In [None]:
def run_pipeline():

    #Extaction
    contacts_data=extraction()
    contacts_df=pd.DataFrame(contacts_data)
    contacts_df.to_csv('csv_results/extraction.csv', index=False)


    #Transformation
    transformed_df=transformation(contacts_df)
    transformed_df.to_csv('csv_results/transformed.csv', index=False)


    #Load
    load(transformed_df)

In [None]:
if __name__ == "__main__":
    
    run_pipeline()