In [1]:
import pandas as pd
import numpy as np
import json

from typing import List, Dict, Callable

from src.path import DataPaths
from tools.type_check import print_detailed_info


df = pd.read_parquet(DataPaths.file_parquet_clean)
print_detailed_info(df)

Original dataset: 21,946 rows
Number of columns in the DataFrame: 31
Data columns (total 31 columns):
 #   Column                               Actual type     Preview
---  ------------------------------------ ---------------- --------------------------------------------------
 0   unspsc                               str             Sewing and stitchery and weaving equipme...
 1   root_domain                          str             studio-atcoat.com
 2   page_url                             str             https://studio-atcoat.com/1372696759/?id...
 3   product_title                        str             Glimakra Warping Board (8m)
 4   product_summary                      str             The Glimakra Warping Board is designed f...
 5   product_name                         str             Warping Board
 6   product_identifier                   ndarray(0,)     []
 7   brand                                str             CST
 8   intended_industries                  ndarray(1,)     [

In [2]:
# ========== Scalar Column Handling Functions ==========

def merge_unspsc(values):
    """Merge UNSPSC values with '|' separator"""
    if values.empty:
        return None

    # Split any values that already contain '|'
    all_values = []
    for val in values:
        if pd.notna(val):
            if isinstance(val, str) and '|' in val:
                all_values.extend([v.strip() for v in val.split('|')])
            else:
                all_values.append(str(val).strip())

    # Remove duplicates and empty values
    unique_values = sorted(set(v for v in all_values if v and v != 'nan'))
    return '|'.join(unique_values) if unique_values else None

def merge_root_domain(values):
    """Handle root_domain: preserve if unique"""
    if values.empty:
        return None

    # Filter out nulls
    non_null = [v for v in values if pd.notna(v) and v]

    if not non_null:
        return None

    # Check if all non-null values are the same
    if len(set(non_null)) == 1:
        return non_null[0]  # Return the single unique value
    else:
        raise ValueError('Different root_domain values')

def merge_text_longest(values):
    """Return the longest text string"""
    if values.empty:
        return None

    valid_strings = [s for s in values if pd.notna(s) and isinstance(s, str) and s]
    return max(valid_strings, key=len) if valid_strings else None

def merge_eco_friendly(values):
    """Handle eco_friendly: preserve if unique, don't merge if conflicting"""
    if values.empty:
        return None

    # Filter out nulls
    non_null = [v for v in values if v is not None and not pd.isna(v)]

    if not non_null:
        return None

    # Check for both True and False values
    has_true = any(v is True for v in non_null)
    has_false = any(v is False for v in non_null)

    # If we have both True and False, raise ValueError
    if has_true and has_false:
        raise ValueError('Different eco_friendly values')

    # Return the single value type we have
    if has_true:
        return True
    if has_false:
        return False

    return None

def merge_max_year(values):
    """Return the maximum year"""
    if values.empty:
        return None

    non_null = [y for y in values if pd.notna(y)]
    return max(non_null) if non_null else None

def get_scalar_aggregation_dict() -> Dict[str, Callable]:
    """
    Create a dictionary mapping scalar columns to their aggregation functions.

    Returns:
    dict: Dictionary mapping column names to aggregation functions
    """
    # Predefined scalar columns from the dataset
    _scalar_columns = [
        'unspsc',                # column 0
        'root_domain',           # column 1
        'page_url',              # column 2
        'product_title',         # column 3
        'product_summary',       # column 4
        'product_name',          # column 5
        'brand',                 # column 7
        'eco_friendly',          # column 10
        'manufacturing_year',    # column 17
        'description'            # column 30
    ]

    agg_dict = {}
    for col in _scalar_columns:
        if col == 'unspsc':
            agg_dict[col] = merge_unspsc
        elif col == 'root_domain':
            agg_dict[col] = merge_root_domain
        elif col == 'page_url':
            agg_dict[col] = merge_text_longest
        elif col in ['product_title', 'product_summary', 'product_name', 'brand', 'description']:
            agg_dict[col] = merge_text_longest
        elif col == 'eco_friendly':
            agg_dict[col] = merge_eco_friendly
        elif col == 'manufacturing_year':
            agg_dict[col] = merge_max_year

    return agg_dict

In [3]:
# ========== Array Column Handling Functions ==========

def merge_array_simple(values):
    """
    Merge arrays by concatenating all elements and removing duplicates.

    Parameters:
    values (pandas.Series): Series containing arrays (numpy arrays or lists)

    Returns:
    numpy.ndarray: Array with all unique elements merged
    """
    if values.empty:
        return np.array([])

    # Collect all non-empty arrays
    all_elements = []
    for arr in values:
        if isinstance(arr, np.ndarray) and len(arr) > 0:
            all_elements.extend(arr)
        elif isinstance(arr, list) and len(arr) > 0:
            all_elements.extend(arr)

    # Remove duplicates by converting to set (if elements are hashable)
    try:
        unique_elements = list(set(all_elements))
    except TypeError:
        # If elements are not hashable (like lists), try a different approach
        unique_elements = []
        for item in all_elements:
            if item not in unique_elements:
                unique_elements.append(item)

    return np.array(unique_elements)

def merge_arrays_dictionary(values: pd.Series) -> np.ndarray:
    """
    Merge arrays containing dictionaries by combining all unique dictionaries.

    This function handles numpy arrays or lists containing dictionaries, and
    combines them while removing duplicates based on dictionary content.

    Parameters:
    values (pandas.Series): Series containing arrays of dictionaries
                           (numpy arrays or lists)

    Returns:
    numpy.ndarray: Array with all unique dictionaries merged
    """
    if values.empty:
        return np.array([])

    # Collect all non-empty arrays of dictionaries
    all_dictionaries = []
    for arr in values:
        if arr is None or (hasattr(arr, '__len__') and len(arr) == 0):
            continue

        if isinstance(arr, np.ndarray):
            all_dictionaries.extend(arr.tolist())
        elif isinstance(arr, list):
            all_dictionaries.extend(arr)

    # To identify unique dictionaries, we'll convert each to a JSON string for comparison
    unique_dicts = []
    seen_json_strings = set()

    for dictionary in all_dictionaries:
        if not isinstance(dictionary, dict):
            continue

        # Convert dictionary to a standardized JSON string for comparison
        # Sort keys to ensure consistent string representation
        json_str = json.dumps(dictionary, sort_keys=True)

        if json_str not in seen_json_strings:
            seen_json_strings.add(json_str)
            unique_dicts.append(dictionary)

    return np.array(unique_dicts)

def get_array_aggregation_dict() -> Dict[str, Callable]:
    """
    Create a dictionary mapping array columns to their appropriate aggregation functions.

    Returns:
    dict: Dictionary mapping column names to aggregation functions
    """
    # Predefined lists of array columns
    _simple_arrays = [
        'product_identifier',          # column 6
        'intended_industries',         # column 8
        'applicability',               # column 9
        'ethical_and_sustainability_practices',  # column 11
        'materials',                   # column 14
        'ingredients',                 # column 15
        'manufacturing_countries',     # column 16
        'manufacturing_type',          # column 18
        'customization',               # column 19
        'packaging_type',              # column 20
        'form',                        # column 21
        'quality_standards_and_certifications',  # column 28
        'miscellaneous_features'       # column 29
    ]

    _dictionary_arrays = [
        'production_capacity',         # column 12
        'price',                       # column 13
        'size',                        # column 22
        'color',                       # column 23
        'purity',                      # column 24
        'energy_efficiency',           # column 25
        'pressure_rating',             # column 26
        'power_rating',                # column 27
    ]

    agg_dict = {}

    for col in _simple_arrays:
        agg_dict[col] = merge_array_simple

    for col in _dictionary_arrays:
        agg_dict[col] = merge_arrays_dictionary

    return agg_dict

In [4]:
# def merge_dataframe_rows(df, key_column):
#     """
#     Merge rows in a DataFrame that share the same key value.
#
#     This function uses specialized merging strategies for different column types:
#     - Scalar columns: Uses specific functions for text, domains, etc.
#     - Array columns: Handles both simple arrays and dictionary arrays
#
#     Parameters:
#     df (pd.DataFrame): DataFrame to merge
#     key_column (str): Column to use as the grouping key
#
#     Returns:
#     pd.DataFrame: DataFrame with merged rows
#     """
#     # Check if key_column exists in DataFrame
#     if key_column not in df.columns:
#         raise ValueError(f"Key column '{key_column}' not found in DataFrame")
#
#     # Handle empty DataFrame
#     if df.empty:
#         return df.copy()
#
#     # Get aggregation dictionaries for both scalar and array columns
#     scalar_agg_dict = get_scalar_aggregation_dict()
#     array_agg_dict = get_array_aggregation_dict()
#
#     # Combine both dictionaries
#     agg_dict = {**scalar_agg_dict, **array_agg_dict}
#
#     # Remove the key column from aggregation if it's in any of the dictionaries
#     if key_column in agg_dict:
#         del agg_dict[key_column]
#
#     # For any columns that don't have an aggregation function,
#     # use a simple first() aggregation
#     for col in df.columns:
#         if col != key_column and col not in agg_dict:
#             agg_dict[col] = lambda x: x.iloc[0] if not x.empty else None
#
#     # Group by key_column and apply aggregation
#     result_df = df.groupby(key_column, as_index=False).agg(agg_dict)
#
#     # Handle potential None values in array columns
#     for col in array_agg_dict:
#         if col in result_df.columns:
#             # Replace None with empty arrays
#             result_df[col] = result_df[col].apply(
#                 lambda x: np.array([]) if x is None else x
#             )
#
#     return result_df

In [5]:
sample_df = df.head(10000).copy()
sample_df.insert(0, 'merge_key', 1)

sample_df.head(10)

Unnamed: 0,merge_key,unspsc,root_domain,page_url,product_title,product_summary,product_name,product_identifier,brand,intended_industries,...,form,size,color,purity,energy_efficiency,pressure_rating,power_rating,quality_standards_and_certifications,miscellaneous_features,description
0,1,Sewing and stitchery and weaving equipment and...,studio-atcoat.com,https://studio-atcoat.com/1372696759/?idx=510,Glimakra Warping Board (8m),The Glimakra Warping Board is designed for use...,Warping Board,[],,[Textile],...,[],"[{'dimension': 'Length', 'qualitative': False,...",[],[],[],[],[],[],[],"The ""Warping Board"" is designed for use with f..."
1,1,Electric alternating current AC motors,worm-gears.net,https://worm-gears.net/tag/worm-gear-box/,NMRV Worm Gearbox Motor,The NMRV Worm Gearbox Motor is a high-efficien...,Worm Gearbox Motor,[],,[Industrial],...,[],[],"[{'original': 'Blue', 'simple': 'Blue'}, {'ori...",[],[],[],"[{'qualitative': False, 'type': 'min', 'unit':...",[],"[Omnibearing installation, High radiation effi...","The ""Worm Gearbox Motor"" is a high-efficiency ..."
2,1,Vehicle trim and exterior covering,customcarcoverco.com,https://customcarcoverco.com/collections/vendo...,Nissan R33 GTR Car Cover,A custom car cover designed for the Nissan R33...,Car Cover,[],,[Automotive],...,[],[],[],[],[],[],[],[],"[Personalization with custom brand logos, grap...","The ""Car Cover"" is a custom-designed cover tai..."
3,1,Pipe connectors,plumbmaster.com,https://www.plumbmaster.com/search?q=wolverine...,Flexible Fittings,"Flexible fittings for plumbing applications, a...",Flexible Fittings,[],,[Plumbing],...,[],[],[],[],[],[],[],[],"[allows for movement, flexibility in installat...","""Flexible Fittings"" are designed for plumbing ..."
4,1,Doors,sogno.in,http://www.sogno.in/product-detail-CST-HGD-331...,CST-HGD-33103 Hinged Closet Door,The CST-HGD-33103 Hinged Closet Door is a meti...,Hinged Closet Door,[],CST,"[Home Appliances, Construction]",...,[],[],[],[],[],[],[],[],"[Italian craftsmanship, German engineering, Sm...","The ""Hinged Closet Door"" is a storage solution..."
5,1,Faucets or taps,plumbmaster.com,https://www.plumbmaster.com/search?q=wolverine...,Deep Faucets,"Faucets with a deep design, providing a secure...",Deep Faucets,[],,[Plumbing],...,[],[],[],[],[],[],[],[],"[deep design, secure and stable connection]","""Deep Faucets"" are designed with a deep design..."
6,1,Dispensing tools,advancedpressuresystems.ca,https://advancedpressuresystems.ca/collections...,10K Dry Shut-Off Gun Handle Assembly,The 10K Dry Shut-Off Gun Handle Assembly is a ...,Dry Shut-Off Gun Handle Assembly,[],,[Manufacturing],...,[],[],[],[],[],"[{'qualitative': True, 'type': 'exact', 'unit'...",[],[],[],"The ""Dry Shut-Off Gun Handle Assembly"" is a co..."
7,1,Medical facility materials handling and distri...,armstrongmedical.com,https://www.armstrongmedical.com//cart-systems...,Cranberry Cart Systems,Cranberry Cart Systems are part of the Armstro...,Cranberry Cart Systems,[],Armstrong Medical,[Healthcare],...,[],[],"[{'original': 'Cranberry', 'simple': 'Red'}]",[],[],[],[],[],[vibrant and eye-catching look],"""Cranberry Cart Systems"" from the Armstrong Me..."
8,1,Pneumatic tools,advancedpressuresystems.ca,https://advancedpressuresystems.ca/collections...,10K Air Operated Control Gun,An air operated control gun designed for water...,Air Operated Control Gun,[],,[Manufacturing],...,[],"[{'dimension': None, 'qualitative': True, 'typ...",[],[],[],[],[],[],[],"The ""Air Operated Control Gun"" is designed for..."
9,1,Tshirts,workwonderly.com,https://www.workwonderly.com/tags/Medicine/col...,5 THINGS YOU SHOULD KNOW ABOUT MY NURSE PRACTI...,A long sleeve tee with the message '5 THINGS Y...,Long Sleeve Tee,[],,"[Fashion, Retail]",...,[],[],"[{'original': 'Blue', 'simple': 'Blue'}, {'ori...",[],[],[],[],[],[Long sleeve],"The ""Long Sleeve Tee"" is a long sleeve t-shirt..."


In [17]:
from src.merge import merge_dataframe_rows

sample_df = df.head(3).copy()
sample_df.insert(0, 'merge_key', 1)
sample_df.loc[2, 'manufacturing_year'] = 2005
sample_df.loc[1, 'root_domain'] = 'studio-atcoat.com'
sample_df.loc[2, 'root_domain'] = 'studio-atcoat.com'
sample_df.loc[0, 'eco_friendly'] = True
# sample_df.loc[1, 'eco_friendly'] = False
sample_df.loc[2, 'eco_friendly'] = None
sample_df.loc[0, 'size'] = [{'dimension': 'Length', 'qualitative': False, 'type': 'exact', 'unit': 'm', 'value': '8'}]
sample_df.loc[1, 'size'] = [{'dimension': 'Length', 'qualitative': False, 'type': 'exact', 'unit': 'm', 'value': '8'}]
sample_df.loc[2, 'size'] = [{'dimension': 'Length', 'qualitative': False, 'type': 'exact', 'unit': 'm', 'value': '8'}]


merged_df = merge_dataframe_rows(sample_df, 'merge_key')


merged_df.drop(columns=['merge_key'], inplace=True)

original_columns = list(sample_df.columns)
original_columns.remove('merge_key')
merged_df = merged_df[original_columns]


from src.path import DataPaths
from tools.save_data import export_dataframe

export_dataframe(merged_df, DataPaths.test_folder, 'test')
merged_df.head(10)

Exported data to: E:\veridion_deduplication\data\test\test.csv


Unnamed: 0,unspsc,root_domain,page_url,product_title,product_summary,product_name,product_identifier,brand,intended_industries,applicability,...,form,size,color,purity,energy_efficiency,pressure_rating,power_rating,quality_standards_and_certifications,miscellaneous_features,description
0,Electric alternating current AC motors|Sewing ...,studio-atcoat.com,https://worm-gears.net/tag/worm-gear-box/,Glimakra Warping Board (8m),The NMRV Worm Gearbox Motor is a high-efficien...,Worm Gearbox Motor,[],,"[Textile, Automotive, Industrial]","[use with floor looms, industrial applications...",...,[],"[{'dimension': 'Length', 'qualitative': False,...","[{'original': 'Blue', 'simple': 'Blue'}, {'ori...",[],[],[],"[{'qualitative': False, 'type': 'min', 'unit':...",[],"[High radiation efficiency, Good service life,...","The ""Worm Gearbox Motor"" is a high-efficiency ..."
