In [1275]:
import json
import pandas as pd
from collections import Counter
import sys
import os
from datetime import datetime
from fuzzywuzzy import fuzz
sys.path.append(os.path.abspath('../../'))


from extraction_package import genericFunctions as generic

# Functions

In [1313]:
def normalize_unit_values(value):
    if not isinstance(value, str):
        return ""
    if value == '':
        return ""
    if value == "%":
        return 'Q201'

    minmod_units = generic.read_csv_to_dict("../../codes/minmod_units.csv")
    correct_units = {}
    for key in minmod_units:
        correct_units[key['unit name']] = key['minmod_id']
        correct_units[key['unit aliases']] = key['minmod_id']


    found_value = generic.find_best_match(value, list(correct_units.keys())) 
    # print(f"original: {value} found_value: {found_value}")
    if found_value != "" and found_value is not None:
        # print(f"Output: {correct_units[found_value]}")
        return correct_units[found_value]
    else:
        return value


def normalize_location(value, code_file):
    if not isinstance(value, str) or value == '':
        return ""
    minmod_country = generic.read_csv_to_dict(f"../../codes/{code_file}.csv")
    # print(minmod_country[0])

    correct_units = {}
    for key in minmod_country:
        correct_units[key['name']] = key['\ufeffminmod_id']

    # print(correct_units)
    # Find the best match using the provided function
    found_value = generic.find_best_match(value, list(correct_units.keys()))

    if found_value:
        return correct_units[found_value]
    else:
        return value

def remove_words(text, words_to_remove):
    for word in words_to_remove:
        text = text.replace(word, '')  
    return text.strip()  

def normalize_string_names(gt, predicted, threshold=80):
    words_to_remove = ['property', 'project']
    normalized_list = []
    
    # Preprocess `gt` and `predicted` by removing unwanted words and converting to lowercase
    gt_normalized = [remove_words(gt_name.lower(), words_to_remove) for gt_name in gt]
    predicted_normalized = [remove_words(p_name.lower(), words_to_remove) for p_name in predicted]
    
    for p_name, original_p_name in zip(predicted_normalized, predicted):
        matched = False
        for gt_name in gt_normalized:
            similarity = fuzz.partial_ratio(p_name, gt_name)
            # print("Here is the similarity score: ", similarity)
            if similarity >= threshold:
                normalized_list.append(gt[gt_normalized.index(gt_name)])
                matched = True
                break  
        if not matched:
            normalized_list.append(original_p_name)
    
    return normalized_list

In [1315]:
 def clean_value(value):
    if isinstance(value, list):
        if value == [''] or value == [0.0]:
            return []
    return value
     
def extract_data(path_to_file):
    # Open the JSON file and load data
    with open(path_to_file, 'r') as file:
        data = json.load(file)

    # print(data)
    # Get location info

    loc = {
        'mining_site_name': [data[0].get('name', '')],
        'country_observed_name': [
            normalize_location(country['observed_name'], "country")
            for country in data[0]['location_info'].get('country', [])
        ],
        'state_or_province_observed_name': [
            normalize_location(state['observed_name'], "state_or_province")
            for state in data[0]['location_info'].get('state_or_province', [])
        ]
    }

    # Get reference info
    # print(data[0]['mineral_inventory'][0]['reference'])
    ref = {
        'authors': data[0]['reference'][0]['document'].get('authors',""),
        'year': [data[0]['reference'][0]['document'].get('year',"")],
        'month': [data[0]['reference'][0]['document'].get('month',"")]
    }



    # Initialize lists for mineral resource info
    commodity_observed_name = []
    category_observed_name = []
    ore_unit_observed_name = []
    ore_value = []
    grade_unit_observed_name = []
    grade_value = []
    cutoff_grade_unit_observed_name = []
    cutoff_grade_value = []
    contained_metal = []
    zone = []


    # Extract mineral resource data
    for d in data[0].get('mineral_inventory', []):
        commodity_observed_name.append(d.get('commodity', {}).get('observed_name', ''))
        category = d.get('category', [])
        if category:
            category_observed_name.append(category[0].get('observed_name', ''))
        else:
            category_observed_name.append('')
            
        ## change ore value to make sure its correct            
        ore_unit_temp = normalize_unit_values(d.get('ore', {}).get('unit', {}).get('observed_name', ''))
        ore_value_temp = d.get('ore', {}).get('value', 0.0)


        if ore_unit_temp.lower() == 'q202':
            ore_unit_temp = 'q200'
            ore_value_temp *= 1_000_000

        # Append the updated values to the lists
        ore_unit_observed_name.append(ore_unit_temp)
        ore_value.append(ore_value_temp)

        
        grade_unit_observed_name.append(normalize_unit_values(d.get('grade', {}).get('unit', {}).get('observed_name', '')))
        grade_value.append(d.get('grade', {}).get('value', 0.0))
        cutoff_grade_unit_observed_name.append(normalize_unit_values(d.get('cutoff_grade', {}).get('unit', {}).get('observed_name', '')))
        cutoff_grade_value.append(d.get('cutoff_grade', {}).get('value', 0.0))

        
        contained_metal.append(d.get('contained_metal', 0.0))
        zone.append(d.get('zone', ''))
    

    mineral_inventory = {
        'commodity_observed_name': clean_value(commodity_observed_name),
        'category_observed_name': clean_value(category_observed_name),
        'ore_unit_observed_name': clean_value(ore_unit_observed_name),
        'ore_value': clean_value(ore_value),
        'grade_unit_observed_name': clean_value(grade_unit_observed_name),
        'grade_value': clean_value(grade_value),
        'cutoff_grade_unit_observed_name': clean_value(cutoff_grade_unit_observed_name),
        'cutoff_grade_value': clean_value(cutoff_grade_value),
        'contained_metal': clean_value(contained_metal),
        'zone': clean_value(zone)
    }


    # Convert the dictionary into a DataFrame and remove duplicates
    inv_df = pd.DataFrame(mineral_inventory)
    inv_df = inv_df.drop_duplicates()

    # Convert the DataFrame back to a dictionary format
    min_inv_dict = inv_df.to_dict('list')

    # Return location, reference, and mineral inventory data
    return loc, ref, min_inv_dict


def calculate_precision_and_recall(predictions, ground_truth):
    ground_truth_normalized = [str(commodity).strip().lower() for commodity in ground_truth]
    predictions_normalized = [str(commodity).strip().lower() for commodity in predictions]
    
    # Use Counter to handle duplicates
    truth_counter = Counter(ground_truth_normalized)
    prediction_counter = Counter(predictions_normalized)
    print("----------------------------------------------------")
    print(f"Truth_count = {truth_counter}\nprediction counter: {prediction_counter}\n")
    
    # Calculate true positives
    true_positives = truth_counter & prediction_counter  # Intersection of two counters
    num_true_positives = sum(true_positives.values())
    print(f"Number of tp: {num_true_positives}\n\n ")
    false_positives = prediction_counter - truth_counter
    num_false_positives = sum(false_positives.values())
    print(f"Number of FP: {num_false_positives}\n\n ")
    print("----------------------------------------------------")
    # Calculate precision and recall
    num_predictions = sum(prediction_counter.values())
    num_truth = sum(truth_counter.values())
    precision = num_true_positives / num_predictions if num_predictions else 0
    recall = num_true_positives / num_truth if num_truth else 0
    
    return precision, recall, num_true_positives, num_false_positives, num_truth

In [1317]:
def run(report_name, minimal_or_complete, append_date, commodity_folder):
    folder_name = report_name.split("_")[0]
    gt_path = '../data/gt/'+ commodity_folder + "/" + folder_name + '/' + minimal_or_complete + '.json'
    ex_path = '../data/extracted/'+ commodity_folder + "/" + report_name + append_date + '.json'
    
    gt_loc, gt_ref, gt_inv_dict = extract_data(gt_path)
    ex_loc, ex_ref, ex_inv_dict = extract_data(ex_path)
    
    
    print(f"Locations: \nGround Truth: \n{gt_loc}\nExtracted:\n{ex_loc}\n\n")
    print(f"Reference: \nGround Truth: \n{gt_ref}\nExtracted:\n{ex_ref}\n\n")

    print(f"Mineral Inventory: \nGround Truth:{len(gt_inv_dict['commodity_observed_name'])}\nExtracted:{len(ex_inv_dict['commodity_observed_name'])}\n\n")
    
    if len(gt_inv_dict['commodity_observed_name']) > 0 and len(ex_inv_dict['commodity_observed_name']) > 0: 
        gt_merged_dict = {**gt_loc, **gt_ref, **gt_inv_dict}
        ex_merged_dict = {**ex_loc, **ex_ref, **ex_inv_dict}
    else:
        print("No Mineral Inventory")
        return None
    
    res = {}
    for k, v in gt_merged_dict.items():
        if "authors" in k or "mining_site_name" in k:
            actual = list(v)
            predicted = normalize_string_names(v, ex_merged_dict[k])
        elif "value" in k or "contained_metal" in k:
            actual = [round(val, 1) for val in v] 
            predicted = [round(val, 1) for val in ex_merged_dict[k]]
        else:    
            actual = list(v)
            predicted = list(ex_merged_dict[k])
        print(f"Key: {k}")

        
        precision, recall, num_true_positives, num_false_positives, num_truth = calculate_precision_and_recall(predicted, actual)
        res[k] = {'Precision':precision,'Recall':recall, 'TP count': num_true_positives, 'FP count': num_false_positives, "GT count": num_truth}
    report = pd.DataFrame(res).T
    report_path = '../data/gt/'+ commodity_folder + "/" + folder_name + '/' + minimal_or_complete+"_"+'report.csv'
    report.to_csv(report_path, index=True)
    return report

In [1319]:
def write_multiple_reports(Note, report_names, minimal_or_complete, commodity, append_date, current_date):
  
    overall_report = pd.DataFrame()
    minimal_or_complete='minimal'
    sum_columns = ['TP count', 'GT count', 'FP count']
    average_columns = ['Precision', 'Recall']
    for report_name in report_names:
        print(f"Looking at: {report_name}\n")
        report = run(report_name, minimal_or_complete, append_date, commodity)
        print(f"report: \n {report} \n")
        # doing a run without 
        if report is not None:
        
            if overall_report.empty:
                for col in average_columns+sum_columns:
                    overall_report[col] = report[col]
                list_of_reports = report_name
                
            else:
                list_of_reports += ", " + report_name
                for col in average_columns:
                    overall_report[col] = (overall_report[col] * (len(overall_report) - 1) + report[col]) / len(overall_report)
            
                    for col in sum_columns:
                        overall_report[col] += report[col]
                        
        
            overall_report['Calculated Precision from TP/FP'] = overall_report['TP count'] / (overall_report['TP count'] + overall_report['FP count'])
            overall_report['Calculated Recall from TP/FP'] = overall_report['TP count'] / overall_report['GT count']
            overall_report['Added report'] = list_of_reports
            overall_report['note'] = Note

    
    ## at the end
    rename_dict = {col: f"Averaged {col}" for col in average_columns}
    rename_dict.update({col: f"Total {col}" for col in sum_columns})

    overall_report = overall_report.rename(columns=rename_dict)
    overall_report_path = f'../data/gt/{commodity}/{commodity}_overall_report_{current_date}.csv'
    overall_report.to_csv(overall_report_path, index=True)



# Run

In [1322]:
### Run one Report
# report_name = '02771a5d21ae0aca3c5bfe28f1b0c73eebe1790745adcf42cc10105946c31add6e_NI_43-101_Technical_Report_for_the_Nunavik_Project_in_North_America_dated_April_2010_summary'
# minimal_or_complete='minimal'
# report = run(report_name, minimal_or_complete, append_date)

In [1362]:
### Run for multiple reports
Note = """Testing new prompts:

    """
commodity = "nickel"
folder_path = f"../data/extracted/{commodity}/"
report_names = [
    '_summary'.join(file.split('_summary')[:-1]) + '_summary'
    for file in os.listdir(folder_path)
    if file.endswith('.json')
]
append_date = "_20241028"

current_date = datetime.today().strftime('%Y%m%d')
write_multiple_reports(Note, report_names, minimal_or_complete, commodity, append_date, current_date)

Looking at: 0200a1c6d2cfafeb485d815d95966961d4c119e8662b8babec74e05b59ba4759d2_NI_43-101_Technical_Report_for_the_Turnagain_Project_in_North_America_dated_March_2007_summary

Locations: 
Ground Truth: 
{'mining_site_name': ['Turnagain Nickel Project'], 'country_observed_name': ['Q1038'], 'state_or_province_observed_name': ['Q2654']}
Extracted:
{'mining_site_name': ['Turnagain Nickel Project'], 'country_observed_name': ['Q1038'], 'state_or_province_observed_name': ['Q2654']}


Reference: 
Ground Truth: 
{'authors': ['Ronald G. Simpson'], 'year': [2007], 'month': [3]}
Extracted:
{'authors': ['Ronald G. Simpson'], 'year': [2007], 'month': [3]}


Mineral Inventory: 
Ground Truth:28
Extracted:26


Key: mining_site_name
----------------------------------------------------
Truth_count = Counter({'turnagain nickel project': 1})
prediction counter: Counter({'turnagain nickel project': 1})

Number of tp: 1

 
Number of FP: 0

 
----------------------------------------------------
Key: country_ob