# CIHR Grant Data Merger

This notebook merges data from "Silver" sheets (yearly grant data) into the "Gold" sheet (All Recipients CMZ, PoP, CHRP) to fill in missing information such as:
- Primary Institute (PrimaryInstituteEN_InstitutPrincipalAN)
- Research Categories (AllResearchCategoriesEN_TousCategoriesRechercheAN)
- Application Keywords (ApplicationKeywords_MotsClesDemande)

The matching is done based on researcher names.

## 1. Initialize Spark Session

In [None]:
# No need to initialize Spark in Databricks as it's already available
# Just check the Spark version
print(f"Spark Version: {spark.version}")

## 2. Define Paths

In [None]:
# Define paths to data files
gold_sheet_path = "/Volumes/cihr/default/grant_datasets_bronze/all_recipients_cmz_pop_chrp.csv"
silver_dir_path = "/Volumes/cihr/default/grant_datasets_bronze/"
output_path = "/Volumes/cihr/default/grant_datasets_silver/enriched_recipients"

# List files in the silver directory
silver_files = dbutils.fs.ls(silver_dir_path)
excel_files = [f.path for f in silver_files if f.path.endswith('.xlsx')]
print(f"Found {len(excel_files)} Excel files in the silver directory")

## 3. Load Gold Sheet

In [None]:
from pyspark.sql.functions import col, lower, trim, lit, when

def load_gold_sheet(gold_sheet_path):
    """Load the Gold sheet (All Recipients CMZ, PoP, CHRP)"""
    print(f"Loading Gold sheet from: {gold_sheet_path}")
    
    # Load the gold sheet
    gold_df = spark.read.format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load(gold_sheet_path)
    
    # Create a clean name column for matching
    gold_df = gold_df.withColumn("clean_name", 
                               lower(trim(col("Name"))))
    
    print(f"Gold sheet loaded with {gold_df.count()} records")
    return gold_df

# Load the Gold sheet
gold_df = load_gold_sheet(gold_sheet_path)

# Display the first few rows and schema
print("\nGold Sheet Schema:")
gold_df.printSchema()
print("\nGold Sheet Sample:")
display(gold_df.limit(5))

## 4. Load and Process Silver Sheets

In [None]:
def process_silver_file(file_path):
    """Process a single Silver sheet"""
    print(f"Processing: {file_path}")
    
    # Extract year from filename (format: cihr_investments_investissements_irsc_YYYYMM.xlsx)
    file_name = file_path.split('/')[-1]
    year_str = file_name.split('_')[-1].split('.')[0]
    fiscal_year = year_str[:4] + "-" + year_str[4:6]
    
    # Load the Excel file
    try:
        silver_df = spark.read.format("com.crealytics.spark.excel") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("dataAddress", "'Sheet1'!A1") \
            .load(file_path)
    except Exception as e:
        print(f"Error loading {file_path}: {e}")
        # Try with different sheet name
        try:
            silver_df = spark.read.format("com.crealytics.spark.excel") \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .load(file_path)
        except Exception as e2:
            print(f"Could not load {file_path}: {e2}")
            return None
    
    # Get actual column names
    actual_columns = silver_df.columns
    
    # Find name column
    name_col = None
    name_candidates = ["NomineeNameEN_NomCandidatAN", "NomineeName_NomCandidat"]
    for candidate in name_candidates:
        if candidate in actual_columns:
            name_col = candidate
            break
    
    if not name_col:
        # Try to find any column that might contain names
        name_candidates = [col for col in actual_columns if "name" in col.lower() or "nom" in col.lower()]
        if name_candidates:
            name_col = name_candidates[0]
        else:
            print(f"Warning: Could not find name column in {file_path}")
            return None
    
    # Find target columns
    target_cols = [
        "PrimaryInstituteEN_InstitutPrincipalAN",
        "AllResearchCategoriesEN_TousCategoriesRechercheAN",
        "ApplicationKeywords_MotsClesDemande"
    ]
    
    column_mapping = {}
    for target_col in target_cols:
        found = False
        for actual_col in actual_columns:
            if target_col.lower() == actual_col.lower():
                column_mapping[target_col] = actual_col
                found = True
                break
        if not found:
            print(f"Warning: Could not find column {target_col} in {file_path}")
    
    # Create a clean name column for matching
    silver_df = silver_df.withColumn("clean_name", 
                                    lower(trim(col(name_col))))
    
    # Add fiscal year column
    silver_df = silver_df.withColumn("fiscal_year", lit(fiscal_year))
    
    # Select only the columns we need
    select_cols = ["clean_name", "fiscal_year"]
    for target_col, actual_col in column_mapping.items():
        silver_df = silver_df.withColumnRenamed(actual_col, target_col)
        select_cols.append(target_col)
    
    silver_df = silver_df.select(*select_cols)
    return silver_df

# Process all silver files
all_silver_dfs = []
for file_path in excel_files:
    silver_df = process_silver_file(file_path)
    if silver_df is not None:
        all_silver_dfs.append(silver_df)

# Combine all silver dataframes
if all_silver_dfs:
    combined_silver_df = all_silver_dfs[0]
    for df in all_silver_dfs[1:]:
        combined_silver_df = combined_silver_df.unionByName(df, allowMissingColumns=True)
    
    print(f"\nCombined Silver sheets with {combined_silver_df.count()} records")
    print("\nSilver Sheet Schema:")
    combined_silver_df.printSchema()
    print("\nSilver Sheet Sample:")
    display(combined_silver_df.limit(5))
else:
    print("No valid Silver sheets found")

## 5. Merge Data

In [None]:
def merge_data(gold_df, silver_df):
    """Merge data from Silver sheets into Gold sheet based on name matching"""
    print("Merging data from Silver sheets into Gold sheet")
    
    # Join the dataframes on the clean name
    merged_df = gold_df.join(
        silver_df,
        gold_df["clean_name"] == silver_df["clean_name"],
        "left"
    )
    
    # Drop the duplicate clean_name column
    merged_df = merged_df.drop(silver_df["clean_name"])
    
    # Fill missing values with "Not Found"
    for col_name in ["PrimaryInstituteEN_InstitutPrincipalAN", 
                     "AllResearchCategoriesEN_TousCategoriesRechercheAN", 
                     "ApplicationKeywords_MotsClesDemande"]:
        if col_name in merged_df.columns:
            merged_df = merged_df.withColumn(
                col_name, 
                when(col(col_name).isNull(), "Not Found").otherwise(col(col_name))
            )
    
    print(f"Merged data with {merged_df.count()} records")
    return merged_df

# Merge the data
if 'combined_silver_df' in locals():
    merged_df = merge_data(gold_df, combined_silver_df)
    
    print("\nMerged Data Schema:")
    merged_df.printSchema()
    print("\nMerged Data Sample:")
    display(merged_df.limit(5))
    
    # Calculate match statistics
    total_records = merged_df.count()
    matched_records = merged_df.filter(col("PrimaryInstituteEN_InstitutPrincipalAN") != "Not Found").count()
    match_percentage = (matched_records / total_records) * 100 if total_records > 0 else 0
    
    print(f"\nMatch Statistics:")
    print(f"Total Records: {total_records}")
    print(f"Matched Records: {matched_records}")
    print(f"Match Percentage: {match_percentage:.2f}%")

## 6. Save Results

In [None]:
def save_results(merged_df, output_path):
    """Save the merged data to a new CSV file"""
    print(f"Saving merged data to: {output_path}")
    
    # Write the merged data to a CSV file
    merged_df.write.format("csv") \
        .option("header", "true") \
        .mode("overwrite") \
        .save(output_path)
    
    # Also save as a Delta table for better performance
    delta_path = output_path + "_delta"
    merged_df.write.format("delta") \
        .mode("overwrite") \
        .save(delta_path)
    
    print("Merged data saved successfully")
    print(f"CSV output: {output_path}")
    print(f"Delta output: {delta_path}")

# Save the results
if 'merged_df' in locals():
    save_results(merged_df, output_path)

## 7. Summary and Next Steps

In [None]:
# Print a summary of the process
if 'merged_df' in locals():
    print("\nData Merging Process Summary:")
    print(f"Gold Sheet Records: {gold_df.count()}")
    if 'combined_silver_df' in locals():
        print(f"Combined Silver Sheet Records: {combined_silver_df.count()}")
    print(f"Merged Records: {merged_df.count()}")
    
    # Calculate match statistics for each target column
    for col_name in ["PrimaryInstituteEN_InstitutPrincipalAN", 
                     "AllResearchCategoriesEN_TousCategoriesRechercheAN", 
                     "ApplicationKeywords_MotsClesDemande"]:
        if col_name in merged_df.columns:
            matched = merged_df.filter(col(col_name) != "Not Found").count()
            match_pct = (matched / merged_df.count()) * 100
            print(f"{col_name} Match Rate: {match_pct:.2f}% ({matched} records)")
    
    print("\nNext Steps:")
    print("1. Review the merged data for accuracy")
    print("2. Consider additional data sources for missing information")
    print("3. Use the enriched data for further analysis")