In [None]:
!pip install torch
!pip install sentence-transformers
!pip install pandas
!pip install matplotlib
!pip install --upgrade transformers sentence-transformers
!pip install seaborn
!pip install tqdm
!pip install ipywidgets

In [47]:
import pandas as pd
from sentence_transformers import SentenceTransformer, util
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import torch
from tqdm import tqdm
import os
import json


In [48]:
#Load csv files
df1 = pd.read_csv(r"C:\Users\Yasvanth.Pamidi\OneDrive - ENCORA\Desktop\DataMap\LHS.csv") #path to file1
df2 = pd.read_csv(r"C:\Users\Yasvanth.Pamidi\OneDrive - ENCORA\Desktop\DataMap\RHS.csv") #path to file2

In [49]:
#define columns to compare by converting columns in to string format

column1 = 'Description' #column name in df1
df1[column1] = df1[column1].astype(str)

column2 = 'Description' #column name in df2
df2[column2] = df2[column2].astype(str)

In [50]:

# Load a pre-trained model
model = SentenceTransformer('sentence-transformers/paraphrase-mpnet-base-v2')

In [None]:
#embeddings for selected rows

#function to encode sentences in batches
def batch_encode(column, batch_size, model):
    embeddings = []
    column = column.tolist()
    with torch.no_grad():
        for i in tqdm(range(0, len(column), batch_size), desc = "Encoding Batches"):
            batch = column[i:i+batch_size]
            batch_embeddings = model.encode(batch, convert_to_tensor= True,show_progress_bar=True)
            embeddings.append(batch_embeddings)
    return torch.cat(embeddings, dim=0)


embeddings1 = batch_encode(df1[column1], batch_size=128, model=model)
embeddings2 = batch_encode(df2[column2], batch_size=128, model=model)


#embeddings1 = model.encode(df1[column1].to_list(), show_progress_bar=True,convert_to_tensor=True)
#embeddings2 = model.encode(df2[column2].to_list(), show_progress_bar=True,convert_to_tensor=True)

In [52]:
#compute the cosine_similarity_matrix here output will be in range of -1 to 1

similarity_matrix = util.cos_sim(embeddings1,embeddings2).cpu().numpy()

In [53]:
# normalizing the similarity matrix as the out values to range from 0 to 1 
normalized_similarity_matrix = (similarity_matrix + 1) / 2

In [54]:
mappings_dir = r"C:\Users\Yasvanth.Pamidi\OneDrive - ENCORA\Desktop\VSC\DataMapper\versions\mappings"

load_mappings : 

-> Initializes an empty dictionary to store database mappings

-> Checks if the specified directory exists. If not, raises a FileNotFoundError

-> Iterates through files in the directory: Selects only .json files, excluding compatibilities.json. Extracts the database name from the filename. Opens the JSON file in read mode, reads its contents using json.load, and stores it in the mappings dictionary under the respective database name.

-> Checks if no mappings were loaded and raises a ValueError.

-> Returns the mappings dictionary containing database-to-data type mappings.

In [55]:
def load_mappings(mappings_dir):
    mappings = {} # Initializes an empty dictionary to store database mappings

    print(f"Loading mappings from directory: {mappings_dir}")

    if not os.path.exists(mappings_dir):
        raise FileNotFoundError(f"Mappings directory '{mappings_dir}' does not exist.") #Checks if the specified directory exists. If not, raises a FileNotFoundError
    for filename in os.listdir(mappings_dir):
        if filename.endswith(".json") and filename != "compatibilities.json":
            database_name = os.path.splitext(filename)[0]

            print(f"Processing file: {filename} as database: {database_name}")
            
            with open(os.path.join(mappings_dir, filename), "r") as file:
                mappings[database_name] = json.load(file) #Iterates through files in the directory: Selects only .json files, excluding compatibilities.json. Extracts the database name from the filename. Reads the JSON file and loads its content into the mappings dictionary.

                print(f"Loaded mapping for {database_name}: {mappings[database_name]}")
    
    if not mappings:
        raise ValueError(f"No mapping files found in '{mappings_dir}'.") #Checks if no mappings were loaded and raises a ValueError.
    
    print("Mappings loaded successfully:", mappings)
    return mappings #Returns the mappings dictionary containing database-to-data type mappings.

load_compatibilities: Reads the compatibility JSON file and loads it into a Python dictionary.

In [56]:
def load_compatibilities(file_path):
    with open(file_path, 'r') as file:
        return json.load(file)

normalize_data_type:

-> Normalizes the data type of a field by converting it to lowercase

-> Prepares a list to store databases where a match is found.

-> Iterates through each database in the mappings.

-> Converts all keys in the mapping to lowercase to ensure case-insensitive lookup.

-> If a match for rhs_type_lower is found, it: Updates normalized_type. Adds the database to matching_databases.

-> If matches are found, returns the normalized type and list of matching databases. Otherwise, returns the original type (converted to uppercase) and None.

In [57]:
def normalize_data_type(rhs_type, database, mappings):
    
    print(f"\nNormalizing RHS Type: {rhs_type} using mappings")
    
    rhs_type_lower = rhs_type.lower()

    print(f"Lowercase RHS Type: {rhs_type_lower}")

    matching_databases = []
    normalized_type = None #Normalizes the data type of a field by converting it to lowercase, Prepares a list to store databases where a match is found.

    for db_name, db_mapping in mappings.items():

        print(f"Checking database: {db_name}")

        normalized = {k.lower(): v for k, v in db_mapping.items()}.get(rhs_type_lower)
        if normalized:
            normalized_type = normalized
            matching_databases.append(db_name)

            print(f"Match found in {db_name}: Normalized Type -> {normalized_type}")

    if matching_databases:
        
        print(f"Final Normalized Type: {normalized_type}, Matching Databases: {matching_databases}")

        return normalized_type, matching_databases
    
    
    print(f"No matches found. Returning RHS Type in uppercase: {rhs_type.upper()}")

    return rhs_type.upper(), None

are_compatible: 

Purpose: Checks if two data types (lhs_type and rhs_type) are compatible.

-> Converts both types to uppercase for consistency.

-> Checks: 1:If the two types are identical. 2:If rhs_type exists in the compatibility list of lhs_type.

In [58]:
def are_compatible(lhs_type, rhs_type, compatibilities):
    lhs_type = lhs_type.upper()
    rhs_type = rhs_type.upper()
    return lhs_type == rhs_type or rhs_type in compatibilities.get(lhs_type, [])

normalize_length: 

Purpose: Converts a length to an integer.

Logic: If the length is invalid (e.g., non-numeric or None), it returns None.

In [59]:
def normalize_length(length):
    try:
        return int(length)
    except (ValueError, TypeError):
        return None

Purpose: Classifies a length into a bin based on predefined ranges.

Logic:

-> Iterates through bins (ranges of lengths).

-> Returns the index of the bin where the length falls.

In [60]:
def classify_length(length, bins):
    for i, (lower, upper) in enumerate(bins):
        if lower <= length <= upper:
            return i
    return None

Purpose: Divides lengths into num_bins equal ranges (bins).

Steps:

-> Finds the minimum and maximum lengths.

-> Calculates the size of each bin.

-> Constructs bins as tuples of (lower bound, upper bound).

In [61]:
def create_length_bins(lengths, num_bins=3):
    min_length = min(lengths)
    max_length = max(lengths)
    bin_size = (max_length - min_length) / num_bins
    bins = [(min_length + i * bin_size, min_length + (i + 1) * bin_size) for i in range(num_bins)]
    return bins

Purpose: Determines if two lengths are compatible.

Logic:

-> If check_length is False, compatibility check is skipped

-> Normalizes lengths to ensure valid numerical values.

-> Handles missing values by returning False.

-> Classifies lengths into bins and checks if they fall into the same bin.

In [62]:
def is_length_compatible(lhs_length, rhs_length, bins, check_length=True):
    if not check_length:
        return True
    
    lhs_length = normalize_length(lhs_length)
    rhs_length = normalize_length(rhs_length)

    print(f"LHS Length: {lhs_length}, RHS Length: {rhs_length}")
    
    if lhs_length is None or rhs_length is None:

        print("One of the lengths is invalid. Returning False.")
        
        return False
    
    lhs_category = classify_length(lhs_length, bins)
    rhs_category = classify_length(rhs_length, bins)

    print(f"LHS Category: {lhs_category}, RHS Category: {rhs_category}")
    
    return lhs_category == rhs_category

In [63]:
# Function to convert numpy types to native Python types
def convert_to_native_type(value):
    if isinstance(value, (np.integer, np.int64)):
        return int(value)
    elif isinstance(value, (np.floating, np.float64)):
        return float(value)
    elif isinstance(value, np.ndarray):
        return value.tolist()
    else:
        return value

In [None]:
def retrieve_top_similar_sentences_json(selected_index, top_n, normalized_similarity_matrix, df1, df2, column1, column2, type_mapping, compatibilities, filter_compatible=True):
    if normalized_similarity_matrix.shape != (len(df1), len(df2)):

        print(f"Error: Similarity matrix shape {normalized_similarity_matrix.shape} does not match dataframes: LHS {len(df1)}, RHS {len(df2)}")

        raise ValueError("The similarity matrix dimensions must match the LHS and RHS datasets.")
    
    print("Input validation passed. Proceeding...")
    
    all_lengths = pd.concat([df1['Length'], df2['Length']]).dropna().tolist()

    print(f"All lengths combined: {all_lengths}")

    bins = create_length_bins(all_lengths, num_bins=3)

    print(f"Length bins created: {bins}")

    similarities = normalized_similarity_matrix[selected_index]
    matches = []
    for idx in range(len(similarities)):
        rhs_field = df2.loc[idx, 'Attribute']
        rhs_desc = df2.loc[idx, column2]
        rhs_type = df2.loc[idx, 'Data_Type']
        rhs_length = df2.loc[idx, 'Length'] # Adding length

        print(f"\nProcessing RHS Index {idx}:")
        print(f"  Field: {rhs_field}, Description: {rhs_desc}, Type: {rhs_type}, Length: {rhs_length}")

        database_context = df2.loc[idx, 'Database'] if 'Database' in df2.columns else "default_database"
        normalized_rhs_type, database_context = normalize_data_type(rhs_type, None, type_mapping)

        print(f"  Normalized RHS Type: {normalized_rhs_type}, Database Context: {database_context}")

        lhs_type = df1.loc[selected_index, 'Data_Type'].upper()
        lhs_length = df1.loc[selected_index, 'Length']  # Adding length
        normalized_lhs_type, _ = normalize_data_type(lhs_type, None, type_mapping)

        print(f"  LHS Type: {lhs_type}, Normalized LHS Type: {normalized_lhs_type}")

        normalized_rhs_type = normalized_rhs_type.upper()
        is_compatible = are_compatible(lhs_type, normalized_rhs_type, compatibilities)
        length_compatible = is_length_compatible(lhs_length, rhs_length, bins, check_length=check_length)
        if is_compatible and length_compatible:
            matches.append({
                "rank": None,  # Placeholder to set rank later
                "similarity_score": float(similarities[idx]),
                "rhs_index": int(df2.index[idx]),
                "rhs_field_name": rhs_field,
                "rhs_field_desc": rhs_desc,
                "rhs_data_type": rhs_type,
                "normalized_rhs_type": normalized_rhs_type,
                "database_name": database_context if database_context else "Unknown",
                "compatibility": "Compatible",
                "rhs_length": convert_to_native_type(rhs_length)  # Adding length 
            })
    matches = sorted(matches, key=lambda x: x["similarity_score"], reverse=True)

    print("\nSorted Matches by Similarity Score:")

    matches = matches[:top_n]
    for rank, match in enumerate(matches, start=1):
        match["rank"] = rank
    if not matches:
        matches.append({"message": "No compatible matches found."})
    lhs_field = df1.loc[selected_index,'Field Name']
    lhs_desc = df1.loc[selected_index, column1]
    lhs_type = df1.loc[selected_index, 'Data_Type']
    lhs_length = df1.loc[selected_index, 'Length']  # Adding length 
    result = {
        "lhs_field_index": selected_index,
        "lhs_field_name": lhs_field,
        "lhs_field_description": lhs_desc,
        "lhs_field_data_type": lhs_type,
        "normalized_lhs_type": normalized_lhs_type,
        "lhs_field_length": convert_to_native_type(lhs_length),  # Adding length
        "matches": matches
    }
    return {"results": [result]}

type_mapping = load_mappings(mappings_dir)
compatibilities_file = os.path.join(mappings_dir, 'compatibilities.json')
compatibilities = load_compatibilities(compatibilities_file)

enter_index = int(input("Enter the index from LHS to process: "))
top_n = int(input("Enter the number of top similar sentences to retrieve (N): ") or 3)

check_length = True  #Adding Length

output_json = retrieve_top_similar_sentences_json(
    enter_index, top_n, normalized_similarity_matrix, df1, df2, "Description", "Description", type_mapping, compatibilities, filter_compatible=True
)
print(json.dumps(output_json, indent=2))