# NCVPRIPG-2025 - DEHADO Challenege
## Creating the summary statistics from the InternVL results
## Dev version-3
Kernel - ncvpripg_2025_summary

In [None]:
import os
import re
import cv2
import random
import json

import pandas as pd
import numpy as np

from tqdm import tqdm

from jiwer import wer
from jiwer import cer

from PIL import Image
from PIL import ImageStat
import matplotlib.pyplot as plt

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'

# Set display options
pd.set_option('display.max_rows', 500)       # Show up to 500 rows
pd.set_option('display.max_columns', 100)    # Show up to 100 columns
pd.set_option('display.width', 1000)         # Set display width for better readability
pd.set_option('display.max_colwidth', None)  # Show full content in each column

In [None]:
# Libraries for post processing
from unidecode import unidecode
from rapidfuzz import process, fuzz

from dateutil import parser
from datetime import datetime

import unicodedata


## Load the Post-Processing Functions

In [None]:
from post_processing import *

In [None]:
BASE_PATH = '/DATA/gyan/GP/ncvpripg2025/dehado/'

PATH_PHASE_1 = os.path.join(BASE_PATH,'DEHADO-AI_TRAINING_DATASET')
PATH_PHASE_2 = os.path.join(BASE_PATH,'DEHADO-AI_TRAINING_DATASET_PHASE_II')

LABEL_PATH_1 = os.path.join(PATH_PHASE_1,'LABELS_750')
LABEL_PATH_2 = os.path.join(PATH_PHASE_2,'LABELS_750')


In [None]:
TAG_1 = 'output_phase_1'
TAG_2 = 'output_phase_2'

OUTPUT_PATH_1  = os.path.join(BASE_PATH,TAG_1)
OUTPUT_PATH_2  = os.path.join(BASE_PATH,TAG_2)

## Load the Label Data for all the images = GROUND TRUTH

In [None]:
def get_gt(PATH):

    all_labels = os.listdir(PATH)
    print(f'We have a total of {len(all_labels)} Label files.')

    df_gt = list()
    error_fl = list()

    for fl in tqdm(all_labels):
        
        fl_path = os.path.join(PATH,fl)

        try:

            # Load JSON from a file
            with open(fl_path, "r", encoding="utf-8") as f:
                data = json.load(f)
            
            # Convert to DataFrame
            df = pd.DataFrame(data)

            nm,_ = os.path.splitext(fl)

            df['filename'] = nm

            df_gt.append(df)
        except:
            error_fl.append(fl)

    print(f'# Error Files in GT = {len(error_fl)}')

    df_gt = pd.concat(df_gt, axis = 0)
    df_gt.rename({'Field name':'field_name','Field value':'field_value'}, axis=1, inplace = True)
    df_gt.columns = map(str.lower, df_gt.columns)

    # DROP Bounding-Box Columns as of now -> CHANGE
    df_gt.drop(['coordinate'], axis = 1, inplace = True)

    # Convert field_name and field_value into lowercase
    for col in df_gt.select_dtypes(include='object').columns:
        df_gt[col] = df_gt[col].str.lower()

    # df_gt['field_name'] = df_gt['field_name'].str.lower()
    # df_gt['field_value'] = df_gt['field_value'].str.lower()

    return df_gt

In [None]:
def ground_truth_data():

    df_gt_1 = get_gt(PATH = LABEL_PATH_1)
    df_gt_1['tag'] = TAG_1

    df_gt_2 = get_gt(PATH = LABEL_PATH_2)
    df_gt_2['tag'] = TAG_2

    df_gt = pd.concat([df_gt_1,df_gt_2], axis=0)
    df_gt = df_gt.sort_values(['filename','field_name']).reset_index(drop = True)

    return df_gt

In [None]:
df_gt = ground_truth_data()

In [None]:
df_gt.head(1)
df_gt.shape
df_gt['tag'].value_counts(normalize = True)*100
df_gt['filename'].nunique()

## Create a dataframe of all the OCRed Text

In [None]:
def post_processing(txt):

    d = txt.lower()
    d = d.replace('json',"")
    d = d.replace('```',"")

    return d

In [None]:
def parse_json_output(json_obj):

    # Flatten the JSON
    records = []

    for key, value in json_obj.items():
        if key == 'references_mobile_number':
            if isinstance(value, list):
                if len(value) > 0:
                    records.append({'field_name': 'referencescmob1', 'ocr_text': str(value[0])})
                if len(value) > 1:
                    records.append({'field_name': 'referencescmob2', 'ocr_text': str(value[1])})
        elif key == 'experience':
            if isinstance(value, list):
                if len(value) > 0:
                    records.append({'field_name': 'experience', 'ocr_text': str(value[0])})
                if len(value) > 1:
                    records.append({'field_name': 'experience1', 'ocr_text': str(value[1])})
        elif isinstance(value, dict):
            for sub_key, sub_value in value.items():
                records.append({'field_name': f'{key}_{sub_key}', 'ocr_text': str(sub_value)})
        elif isinstance(value, list):
            records.append({'field_name': key, 'ocr_text': ', '.join(map(str, value))})
        else:
            records.append({'field_name': key, 'ocr_text': str(value)})

    # Convert to DataFrame
    df_json_flattened = pd.DataFrame(records)

    return df_json_flattened

In [None]:
def get_ocr_output(OUTPUT_PATH):

    all_txt_files = os.listdir(OUTPUT_PATH)
    
    df_internvl = []
    failed_cases = []

    for tf in tqdm(all_txt_files):

        try:

            # Path of the '.txt' file
            tf_pth = os.path.join(OUTPUT_PATH,tf)

            # Read the '.txt' file
            with open(tf_pth, "r", encoding="utf-8") as f:
                d = f.read()

            d = post_processing(txt = d)

            obj = json.loads(d)

            df = parse_json_output(json_obj = obj)

            nm,_ = os.path.splitext(tf)

            df['filename'] = nm

            df_internvl.append(df)

        except:
            failed_cases.append(tf)

    df_ocr = pd.concat(df_internvl, axis = 0)

    # Convert all string values into lowercase
    for col in df_ocr.select_dtypes(include='object').columns:
        df_ocr[col] = df_ocr[col].str.lower()

    return failed_cases,df_ocr

In [None]:
def get_ocr_data():
    
    print(f'OUTPUT_PATH_1: We have {len(os.listdir(OUTPUT_PATH_1))} output files from OCR.')
    failed_cases_1,df_ocr_1 = get_ocr_output(OUTPUT_PATH = OUTPUT_PATH_1)

    print(f'# OUTPUT_PATH_1 - Failure cases: {len(failed_cases_1)}')

    print(f'OUTPUT_PATH_2: We have {len(os.listdir(OUTPUT_PATH_2))} output files from OCR.')
    failed_cases_2,df_ocr_2 = get_ocr_output(OUTPUT_PATH = OUTPUT_PATH_2)

    print(f'# OUTPUT_PATH_2 - Failure cases: {len(failed_cases_2)}')

    df_ocr_1['tag'] = TAG_1
    df_ocr_2['tag'] = TAG_2

    df_ocr = pd.concat([df_ocr_1,df_ocr_2], axis = 0)

    return df_ocr

In [None]:
df_raw_ocr = get_ocr_data()

In [None]:
df_raw_ocr.head(2)
df_raw_ocr.shape
df_raw_ocr.isna().sum()

## Handle the Govt-ID Part

In [None]:
# Take only the Govt-ID part from the data
df_govid_temp = df_raw_ocr[df_raw_ocr['field_name'].isin(['government_id_type','government_id_value'])].sort_values(['tag','filename']).reset_index(drop = True)
    
# Remove the Govt ID from the OCR data
df_raw_ocr = df_raw_ocr[~df_raw_ocr['field_name'].isin(['government_id_type','government_id_value'])].sort_values(['tag','filename']).reset_index(drop = True)

In [None]:
df_govid = get_govid_subset(df = df_govid_temp)
df_govid[['field_name', 'government_id_value_cleaned']] = df_govid['government_id_value'].apply(
    lambda x: pd.Series(clean_government_id(x))
)
df_govid.rename({'government_id_value':'ocr_text'}, axis=1, inplace = True)
df_govid = df_govid[['field_name','ocr_text','filename','tag']]

df_govid_aadhar = df_govid.copy()
df_govid_aadhar['field_name'] = 'aadhaarcard'

df_govid_pan = df_govid.copy()
df_govid_pan['field_name'] = 'pancard'

df_govid = pd.concat([df_govid_aadhar,df_govid_pan])

In [None]:
df_govid.head(2)
df_govid.shape
df_govid.columns

In [None]:
df_ocr = pd.concat([df_raw_ocr,df_govid], axis = 0)

In [None]:
df_ocr.head(2)
df_ocr.shape
df_ocr.isna().sum()

In [None]:
change_field_name_dict = {
    'alternate_number' : 'alternateno',
    'blood_group' : 'bloodgroup',
    'candidate_name' : 'candidatename',
    'contact_number' : 'contactnumber',
    'date_of_birth' : 'dateofbirth',
    'father_or_husband_name' : 'father/husbandname',
    'languages_known' : 'languageknown',
    'marital_status':'maritalstatus',
    'permanent_address':'permanentaddress',
    'present_address':'presentaddress'
    }

In [None]:
# Create a new column into the OCR data using the matching field_name
df_ocr['field_name'] = df_ocr['field_name'].map(change_field_name_dict).fillna(df_ocr['field_name'])

In [None]:
df_ocr.head(2)
df_ocr.shape
df_ocr.isna().sum()

In [None]:
df_gt.head(2)
df_gt.shape

# Post processing - For all the fields

## Apply the blanket post-processing for the OCR Text

# Write the main for loop

In [None]:
# Make the list of all the fileds

all_field_name = list(df_gt.field_name.unique())
print(f'In Ground Truth we have #field_name: {len(all_field_name)}')
all_field_name

In [None]:
no_base_cleaning = ['bloodgroup','qualification',
                    'candidatename','father/husbandname',
                    'contactnumber','alternateno','referencescmob1','referencescmob2',
                    'date','dateofbirth']

def general_post_processing(df,fl):
    
    if fl in no_base_cleaning:
        df['ocr_text_cleaned'] = df['ocr_text']
    else:
        df['ocr_text_cleaned'] = df['ocr_text'].apply(clean_ocr_text)

    return df

In [None]:
def field_level_post_processing(df,fl,usage = 'yes'):

    if usage == 'no':
        df['ocr_post_processed_text'] = df['ocr_text_cleaned']
    elif usage == 'clean':
        if fl == 'yes':
            df['ocr_post_processed_text'] = df['ocr_text_cleaned'].apply(clean_nationality)
        elif fl == 'gender':
            df['ocr_post_processed_text'] = df['ocr_text_cleaned'].apply(clean_gender)
        elif fl == 'maritalstatus':
            df['ocr_post_processed_text'] = df['ocr_text_cleaned'].apply(clean_marital_status)
        if fl == 'bloodgroup':
            df['ocr_post_processed_text'] = df['ocr_text_cleaned'].apply(clean_blood_group)
        elif fl == 'qualification':
            df['ocr_post_processed_text'] = df['ocr_text_cleaned'].apply(clean_qualification)
        elif fl in ['candidatename','father/husbandname']:
            df['ocr_post_processed_text'] = df['ocr_text_cleaned'].apply(clean_candidate_name)
        elif fl in ['contactnumber','alternateno']:
            df['ocr_post_processed_text'] = df['ocr_text_cleaned'].apply(clean_indian_contact_number)
        elif fl in ['referencescmob1','referencescmob2']:
            df['ocr_post_processed_text'] = df['ocr_text_cleaned'].apply(clean_reference_text)
        elif fl in ['date','dateofbirth']:
            df['ocr_post_processed_text'] = df['ocr_text_cleaned'].apply(clean_and_parse_date)
        else:
            df['ocr_post_processed_text'] = df['ocr_text_cleaned']
    else:
        raise ValueError(f'Usage must be either "yes" or "no", but got {usage}')
    return df


In [None]:
def random_fill_na(df,fl):

    '''
    For every filed we shall fill NA with random choice of the valid options of that field
    '''

    if fl == 'nationality':
        valid_options = valid_nationality
    elif fl == 'gender':
        valid_options = valid_genders
    elif fl == 'maritalstatus':
        valid_options = valid_statuses
    elif fl == 'bloodgroup':
        valid_options = valid_blood_groups
    elif fl == 'qualification':
        valid_options = valid_qualifications
    else:
        valid_options  = list()

    if len(valid_options) > 0:
        df['ocr_post_processed_text'] = df['ocr_post_processed_text'].apply(
                lambda x: np.random.choice(valid_options) if pd.isna(x) else x
            )
    else:
        pass

    return df

In [None]:
df_output = list()

for fl in tqdm(all_field_name):

    # Make the subset of GroundTruth and OCR Data
    
    df_gt_fl = df_gt[df_gt['field_name'] == fl].sort_values(['tag','filename']).reset_index(drop=True)
    df_ocr_fl = df_ocr[df_ocr['field_name'] == fl].sort_values(['tag','filename']).reset_index(drop=True)

    # Apply the Recommended Post-Processing to both the GroundTruth and OCR Text
    df_gt_fl['field_value'] = df_gt_fl['field_value'].apply(recommended_cleaning)
    df_ocr_fl['ocr_text'] = df_ocr_fl['ocr_text'].apply(recommended_cleaning)


    # Apply General Post-Processing to the OCR Text
    df_ocr_fl = general_post_processing(df = df_ocr_fl,
                                        fl = fl)
    
    # Apply Filed Level Post-Processing to the OCR Text
    df_ocr_fl = field_level_post_processing(df = df_ocr_fl,
                                            fl = fl,
                                            usage = 'no')

    # Merge GroudTruth - OCR Text
    
    df_result_fl = pd.merge(left = df_gt_fl, 
                            right = df_ocr_fl, 
                            on = ['tag','filename','field_name'], 
                            how = 'left')

    df_result_fl = df_result_fl.sort_values(['tag','filename']).reset_index(drop=True)

    # Fill the NA values using the random choices of valid values
    df_result_fl = random_fill_na(df = df_result_fl,
                                  fl = fl)
    
    # Fill NA Values with empty string for Sanity
    # Fill NaNs with empty strings (or handle them as needed)
    df_result_fl['field_value'] = df_result_fl['field_value'].fillna("").astype(str)
    df_result_fl['ocr_text'] = df_result_fl['ocr_text'].fillna("").astype(str)
    df_result_fl['ocr_post_processed_text'] = df_result_fl['ocr_post_processed_text'].fillna("").astype(str)

    df_ocr_fl['ocr_post_processed_text'] = df_ocr_fl['ocr_post_processed_text'].apply(recommended_cleaning)

    # Calculate WER
    df_result_fl['wer_ocr_text'] = df_result_fl.apply(
        lambda row: wer(row['field_value'], row['ocr_text']) if row['field_value'] and row['ocr_text'] else None,
        axis=1
        )
    
    df_result_fl['wer_ocr_post_processed_text'] = df_result_fl.apply(
        lambda row: wer(row['field_value'], row['ocr_post_processed_text']) if row['field_value'] and row['ocr_post_processed_text'] else None,
        axis=1
        )

    # Calculate CER
    df_result_fl['cer_ocr_text'] = df_result_fl.apply(
        lambda row: cer(row['field_value'], row['ocr_text']) if row['field_value'] and row['ocr_text'] else None,
        axis=1
        )
    
    df_result_fl['cer_ocr_post_processed_text'] = df_result_fl.apply(
        lambda row: cer(row['field_value'], row['ocr_post_processed_text']) if row['field_value'] and row['ocr_post_processed_text'] else None,
        axis=1
        )

    # Calculate the "Text Field Accuracy"
    df_result_fl['field_correct_ocr_text'] = np.where(
        df_result_fl['field_value'] == df_result_fl['ocr_text'],
        1, 0
        )

    df_result_fl['field_correct_ocr_post_processed_text'] = np.where(
        df_result_fl['field_value'] == df_result_fl['ocr_post_processed_text'],
        1, 0
        )
    
    # Calculate the "Document-Level Accuracy"

    # Save the Output
    df_output.append(df_result_fl)
    

In [None]:
df_output = pd.concat(df_output,axis=0)

In [None]:
df_output.head(2)
df_output.shape

In [None]:
def get_metrices(output_data):

    wer_val = round(output_data['wer_ocr_text'].mean(),2)
    cer_val = round(output_data['cer_ocr_text'].mean(),2)

    # Calculate Text Field Accuracy (TFA) and Document-Level Accuracy (DLA)

    df_metric = output_data.groupby(['filename']).agg(sum_tfa = ('field_correct_ocr_text','sum'),
                                                      nunq_field_name =('field_name', 'nunique')).reset_index()
    
    df_metric['dla_ind'] = np.where(
        df_metric['sum_tfa'] == df_metric['nunq_field_name'],
        1, 0
        )
    
    df_metric['tfa_pct'] = (df_metric['sum_tfa'] / df_metric['nunq_field_name']) * 100
    
    print(df_metric.head(2))

    
    tfa_pct = round(df_metric['tfa_pct'].mean(),2)
    dla_pct = round((df_metric['dla_ind'].sum() / len(df_metric)) * 100,2)

    # print(f'Word Error Rate (WER): {wer_val:.2f} \n Character Error Rate (CER): {cer_val:.2f} \n Text Field Accuracy (TFA): {tfa_pct.mean():.2f}% \n Document-Level Accuracy (DLA): {dla_pct:.2f}%')
    
    return wer_val,cer_val,tfa_pct,dla_pct

In [None]:
wer_val,cer_val,tfa_pct,dla_pct = get_metrices(output_data = df_output)
print(f'Word Error Rate (WER): {wer_val:.2f} \n Character Error Rate (CER): {cer_val:.2f} \n Text Field Accuracy (TFA): {tfa_pct.mean():.2f}% \n Document-Level Accuracy (DLA): {dla_pct:.2f}%')
