In [None]:
# Disable SettingWithCopyWarning
import warnings

# Disable the SettingWithCopyWarning
# Note: Disabling warnings should be done cautiously. Ensure all DataFrame modifications are intentional.
warnings.filterwarnings("ignore")


In [None]:
import pandas as pd

# Usage
file_path = input("Enter the path to the Excel file: ")

# Read the Excel file
df = pd.read_excel(file_path)


In [None]:
selected_columns = [
    "DisNo.",
    "Classification Key",
    "Disaster Group",
    "Disaster Subgroup",
    "Disaster Type",
    "Disaster Subtype",
    "External IDs",
    "Event Name",
    "ISO",
    "Country",
    "Subregion",
    "Region",
    "Location",
    "AID Contribution ('000 US$)",
    "Latitude",
    "Longitude",
    "River Basin",
    "Start Year",
    "Start Month",
    "Start Day",
    "End Year",
    "End Month",
    "End Day",
    "Total Deaths",
    "No. Injured",
    "No. Affected",
    "No. Homeless",
    "Total Affected",
    "Reconstruction Costs, Adjusted ('000 US$)",
    "Total Damage, Adjusted ('000 US$)",
    "Admin Units",
]

# Filter the DataFrame to keep only the selected columns
df_filtered = df[selected_columns]


1. Conbine dates from `Start Year/Month/Day` and `End Year/Month/Day` into a single datetime column
2. Ensure the end date is not earlier than the start date
3. Add a new column `start_date` and `end_date` to the DataFrame


In [None]:
import numpy as np

# Combine start date components into a single datetime column, use 1 for missing values
df_filtered["start_date"] = pd.to_datetime(
    df_filtered["Start Year"].astype(int).astype(str)
    + "-"
    + df_filtered["Start Month"].fillna(1).astype(int).astype(str).str.zfill(2)
    + "-"
    + df_filtered["Start Day"].fillna(1).astype(int).astype(str).str.zfill(2),
    format="%Y-%m-%d",
    errors="coerce",  # Handle invalid dates gracefully
)

# Combine end date components into a single datetime column, use 1 for missing values
df_filtered["end_date"] = pd.to_datetime(
    df_filtered["End Year"].astype(int).astype(str)
    + "-"
    + df_filtered["End Month"].fillna(1).astype(int).astype(str).str.zfill(2)
    + "-"
    + df_filtered["End Day"].fillna(1).astype(int).astype(str).str.zfill(2),
    format="%Y-%m-%d",
    errors="coerce",  # Handle invalid dates gracefully
)

# Check if end_date > start_date, if not, change end_date to start_date
df_filtered["end_date"] = np.where(
    df_filtered["end_date"] < df_filtered["start_date"],
    df_filtered["start_date"],
    df_filtered["end_date"],
)

# Display the first few rows to verify the changes
print("\nFirst few rows after date correction:")
display(df_filtered[["start_date", "end_date"]].head())

# Count the number of rows where end_date was changed
changed_rows = (df_filtered["end_date"] == df_filtered["start_date"]).sum()
print(
    f"\nNumber of rows where end_date was changed to match start_date: {changed_rows}"
)


In [None]:
df_filtered[
    [
        "Start Year",
        "Start Month",
        "Start Day",
        "End Year",
        "End Month",
        "End Day",
        "start_date",
        "end_date",
    ]
].head()


- Extract GLIDE number from `External IDs`


In [None]:
# Extract External IDs and create new columns
def extract_external_ids(external_id):
    """
    Extract the External ID types and values from the given string.

    Args:
    external_id (str): The External ID string to process.

    Returns:
    dict: A dictionary containing the External ID types as keys and lists of values as values.
    """
    if pd.isna(external_id):
        return {}
    id_dict = {}
    for id_pair in external_id.split("|"):
        parts = id_pair.split(":", 1)
        if len(parts) == 2:
            id_type, id_value = parts
            if id_type in id_dict:
                id_dict[id_type].append(id_value)
            else:
                id_dict[id_type] = [id_value]
    return id_dict


# Apply the function to create new columns
df_filtered["ExternalIDs"] = df_filtered["External IDs"].apply(
    extract_external_ids
)

# Create separate columns for each External ID type
external_id_types = set()
for id_dict in df_filtered["ExternalIDs"]:
    external_id_types.update(id_dict.keys())

for id_type in external_id_types:
    df_filtered[id_type] = df_filtered["ExternalIDs"].apply(
        lambda x: x.get(id_type, None)
    )

# Drop the temporary column
df_filtered = df_filtered.drop(columns=["ExternalIDs"])

# Display the first few rows to verify the changes
print("\nFirst few rows after External ID extraction:")
display(df_filtered[["External IDs"] + list(external_id_types)].head())

# Count the number of non-null values in each new column
print("\nNumber of non-null values in each new External ID column:")
display(df_filtered[list(external_id_types)].count())

display(df_filtered[df_filtered["GLIDE"].notna()].head())


- Extract Admin 1 and 2 level and names from `Admin Units`


In [None]:
df_filtered[df_filtered["Admin Units"].notna()].head()

import ast


def extract_admin_levels(admin_units):
    """
    Extract admin level 1 and 2 names from the Admin Units string.

    Args:
    admin_units (str): The Admin Units string to process.

    Returns:
    tuple: A tuple containing lists of admin level 1 and 2 names.
    """
    if pd.isna(admin_units):
        return [], []

    # Convert string representation of list to actual list of dictionaries
    try:
        admin_list = ast.literal_eval(admin_units)
    except:
        return [], []

    admin_level_1 = []
    admin_level_2 = []

    for admin in admin_list:
        if "adm1_name" in admin:
            admin_level_1.append(admin["adm1_name"])
        elif "adm2_name" in admin:
            admin_level_2.append(admin["adm2_name"])

    return admin_level_1, admin_level_2


# Apply the function to create new columns
df_filtered[["admin_level_1", "admin_level_2"]] = df_filtered[
    "Admin Units"
].apply(lambda x: pd.Series(extract_admin_levels(x)))

# Display the first few rows to verify the changes
print("\nFirst few rows after Admin Units extraction:")
display(df_filtered[["Admin Units", "admin_level_1", "admin_level_2"]].head())


- Transform unit from '000 US$ to US$


In [None]:
selected_columns_to_transform = [
    "AID Contribution ('000 US$)",
    "Reconstruction Costs, Adjusted ('000 US$)",
    "Total Damage, Adjusted ('000 US$)",
]

# Transform unit from '000 US$ to US$
for column in selected_columns_to_transform:
    # Check if the column exists in the DataFrame
    if column in df_filtered.columns:
        # Convert '000 US$ to US$ by multiplying by 1000
        df_filtered[column] = df_filtered[column].multiply(1000)

        # Rename the column to reflect the new unit and format
        new_column_name = (
            column.replace("('000 US$)", "")
            .strip()
            .lower()
            .replace(", ", "_")
            .replace(" ", "_")
        )
        df_filtered.rename(columns={column: new_column_name}, inplace=True)

        # Add inline comment to explain the transformation
        print(
            f"Transformed {column} to {new_column_name}"
        )  # Conversion from '000 US$ to US$
    else:
        print(f"Column {column} not found in the DataFrame")

# Display the first few rows of the transformed columns to verify changes
print("\nFirst few rows after unit transformation:")
display(
    df_filtered[
        [
            col.replace("('000 US$)", "")
            .strip()
            .lower()
            .replace(", ", "_")
            .replace(" ", "_")
            for col in selected_columns_to_transform
            if col.replace("('000 US$)", "")
            .strip()
            .lower()
            .replace(", ", "_")
            .replace(" ", "_")
            in df_filtered.columns
        ]
    ].head()
)


- Transform ISO3, Admin name, and Admin level to GADM's standard


In [None]:
import pandas as pd

# Read the first 10 rows from the Excel file
emdat_admin_mapping = pd.read_excel(
    "./EMDAT_admin_area_mapping.xlsx", nrows=10
)

# Display the extracted rows
print("First 10 rows of EMDAT admin area mapping:")
display(emdat_admin_mapping)


In [None]:
# Import required libraries
import pandas as pd

# Load the full EMDAT admin area mapping
emdat_admin_mapping = pd.read_excel("./EMDAT_admin_area_mapping.xlsx")


# Update country names and ISO codes
def update_country_info(row):
    """
    Update the country name and ISO code based on GADM's definition.

    Args:
    row (pandas.Series): A row from the DataFrame.

    Returns:
    tuple: Updated ISO code, country name, and admin levels.
    """
    iso = row["ISO"]
    country = row["Country"]
    admin1 = (
        row["admin_level_1"] if isinstance(row["admin_level_1"], list) else []
    )
    admin2 = (
        row["admin_level_2"] if isinstance(row["admin_level_2"], list) else []
    )

    # Special cases
    if iso in ["HKG", "MAC"]:
        admin1.append(country)
        return "CHN", "China", admin1, admin2
    elif iso == "ANT":
        admin1.append(country)
        return "NLD", "Netherlands", admin1, admin2
    elif iso == "SCG":
        # Check if any admin1 or admin2 names match Serbia's or Montenegro's locations
        srb_locations = set()
        mne_locations = set()

        for country_code in ["SRB", "MNE"]:
            country_data = emdat_admin_mapping[
                emdat_admin_mapping["ISO3"] == country_code
            ]
            locations = set()

            # Handle GADM_Admin1 and GADM_Admin2
            for col in ["GADM_Admin1", "GADM_Admin2"]:
                locations.update(country_data[col].dropna().tolist())

            # Handle alternative and local names
            for col in [
                "GADM_Admin1_Alt",
                "GADM_Admin2_Alt",
                "GADM_Admin1_Local",
                "GADM_Admin2_Local",
            ]:
                locations.update(
                    [
                        item
                        for sublist in country_data[col]
                        .dropna()
                        .str.split("|")
                        for item in sublist
                    ]
                )

            if country_code == "SRB":
                srb_locations = locations
            else:
                mne_locations = locations

        all_locations = set(admin1 + admin2)

        if any(loc in srb_locations for loc in all_locations):
            return "SRB", "Serbia", admin1, admin2
        elif any(loc in mne_locations for loc in all_locations):
            return "MNE", "Montenegro", admin1, admin2
        else:
            # If no match found, return None to indicate this row should be dropped
            return None
    elif iso == "AB9":
        admin2.append("Abyei Area")
        return "SDN", "Sudan", admin1, admin2
    elif iso == "XKK":
        return "XKO", "Kosovo", admin1, admin2

    # General case
    match = emdat_admin_mapping[emdat_admin_mapping["ISO3"] == iso]
    if not match.empty:
        return iso, match.iloc[0]["GADM_Country"], admin1, admin2
    return iso, country, admin1, admin2


# Update country names and ISO codes
df_filtered[["ISO", "Country", "admin_level_1", "admin_level_2"]] = (
    df_filtered.apply(update_country_info, axis=1, result_type="expand")
)


In [None]:
# Remove rows where update_country_info returned None (for unmatched SCG cases)
df_filtered = df_filtered.dropna(subset=["ISO"])


In [None]:
# Function to check if a name matches any of the alternatives
def match_name(name, alternatives):
    """
    Check if a name matches any of the alternatives.

    Args:
    name (str): The name to check.
    alternatives (str): Pipe-separated string of alternative names.

    Returns:
    bool: True if there's a match, False otherwise.
    """
    if pd.isna(alternatives):
        return False
    alt_list = alternatives.split("|")
    return name in alt_list


# Update admin level 1 names
def update_admin1_name(row, admin1_name):
    """
    Update the admin level 1 name based on ISO3 code and alternative names.

    Args:
    row (pandas.Series): A row from the DataFrame.
    admin1_name (str): The original admin1 name.

    Returns:
    str: Updated admin1 name or original name if no match found.
    """
    matches = emdat_admin_mapping[
        (emdat_admin_mapping["ISO3"] == row["ISO"])
        & (
            (emdat_admin_mapping["GADM_Admin1"] == admin1_name)
            | (
                emdat_admin_mapping["GADM_Admin1_Alt"].apply(
                    lambda x: match_name(admin1_name, x)
                )
            )
            | (
                emdat_admin_mapping["GADM_Admin1_Local"].apply(
                    lambda x: match_name(admin1_name, x)
                )
            )
        )
    ]
    if not matches.empty:
        return matches.iloc[0]["GADM_Admin1"]
    return admin1_name


# Update admin level 1 names
df_filtered["admin_level_1"] = df_filtered.apply(
    lambda row: [
        update_admin1_name(row, name) for name in row["admin_level_1"]
    ]
    if isinstance(row["admin_level_1"], list)
    else row["admin_level_1"],
    axis=1,
)


In [None]:
# Update admin level 2 names
def update_admin2_name(row, admin2_name):
    """
    Update the admin level 2 name based on ISO3 code and alternative names.

    Args:
    row (pandas.Series): A row from the DataFrame.
    admin2_name (str): The original admin2 name.

    Returns:
    str: Updated admin2 name or original name if no match found.
    """
    matches = emdat_admin_mapping[
        (emdat_admin_mapping["ISO3"] == row["ISO"])
        & (
            (emdat_admin_mapping["GADM_Admin2"] == admin2_name)
            | (
                emdat_admin_mapping["GADM_Admin2_Alt"].apply(
                    lambda x: match_name(admin2_name, x)
                )
            )
            | (
                emdat_admin_mapping["GADM_Admin2_Local"].apply(
                    lambda x: match_name(admin2_name, x)
                )
            )
        )
    ]
    if not matches.empty:
        return matches.iloc[0]["GADM_Admin2"]
    return admin2_name


# Update admin level 2 names
df_filtered["admin_level_2"] = df_filtered.apply(
    lambda row: [
        update_admin2_name(row, name) for name in row["admin_level_2"]
    ]
    if isinstance(row["admin_level_2"], list)
    else row["admin_level_2"],
    axis=1,
)

# Display the first few rows to verify changes
print("First few rows after updating country, ISO, admin1, and admin2 names:")
display(
    df_filtered[["ISO", "Country", "admin_level_1", "admin_level_2"]].head()
)


In [None]:
# Rename columns based on the provided mapping
column_mapping = {
    "Disaster Group": "disaster_group",
    "Disaster Subgroup": "disaster_subgroup",
    "Disaster Type": "disaster_type",
    "Disaster Subtype": "disaster_subtype",
    "ISO": "iso3_code",
    "Country": "admin_level_0",
    "Total Deaths": "total_deaths",
    "No. Injured": "number_injured",
    "No. Affected": "number_affected",
    "No. Homeless": "number_homeless",
    "Total Affected": "total_affected",
}

# Rename the columns in df_filtered
df_filtered.rename(columns=column_mapping, inplace=True)

# Display the first few rows to verify changes
print("First few rows after renaming columns:")
display(df_filtered[list(column_mapping.values())].head())

# Display column names to confirm changes
print("\nUpdated column names:")
print(df_filtered.columns.tolist())


In [None]:
import json

# Define the columns to be included in the metadata
metadata_columns = [
    "DisNo.",
    "Classification Key",
    "Event Name",
    "External IDs",
    "Subregion",
    "Region",
    "Location",
    "Latitude",
    "Longitude",
    "River Basin",
    "Start Year",
    "Start Month",
    "Start Day",
    "End Year",
    "End Month",
    "End Day",
    "Admin Units",
]


# Function to create metadata JSON
def create_metadata(row):
    """
    Create a JSON string containing metadata from specified columns.

    Args:
    row (pandas.Series): A row from the DataFrame.

    Returns:
    str: JSON string containing metadata.
    """
    metadata = {
        col: row[col] for col in metadata_columns if pd.notna(row[col])
    }
    return json.dumps(metadata)


# Create the metadata column
df_filtered["metadata"] = df_filtered.apply(create_metadata, axis=1)

# Display the first few rows to verify the new metadata column
print("First few rows after adding metadata column:")
display(df_filtered[["metadata"] + list(column_mapping.values())].head())

# Remove the original columns that are now in metadata
df_filtered = df_filtered.drop(columns=metadata_columns)

print("\nUpdated column names after removing metadata columns:")
print(df_filtered.columns.tolist())


In [None]:
# Add 'source' column with value 'EMDAT'
df_filtered["source"] = "EMDAT"

# Display the first few rows to verify the new column
print("First few rows after adding 'source' column:")
display(df_filtered[["source"] + df_filtered.columns[:-1].tolist()].head())

# Confirm the new column is added
print("\nUpdated column names:")
print(df_filtered.columns.tolist())


In [None]:
import uuid


def generate_short_uuid():
    """Generate a short UUID."""
    return str(uuid.uuid4())[:8]


def create_event_name(row):
    """
    Create an event name based on the specified pattern.

    Args:
    row (pandas.Series): A row from the DataFrame.

    Returns:
    str: Event name string.
    """
    # Define default values for missing data
    iso3 = (
        row["iso3_code"] if pd.notna(row["iso3_code"]) else "UNKNOWN-ISO3-CODE"
    )
    disaster_type = (
        row["disaster_type"]
        if pd.notna(row["disaster_type"])
        else "UNKNOWN-DISASTER-TYPE"
    )
    disaster_subtype = (
        row["disaster_subtype"]
        if pd.notna(row["disaster_subtype"])
        else "UNKNOWN-DISASTER-SUBTYPE"
    )

    # Convert start_date to string format YYYYMMDD
    start_time = (
        row["start_date"].strftime("%Y%m%d")
        if pd.notna(row["start_date"])
        else "UNKNOWN-START-DATE"
    )

    # Generate short UUID
    short_uuid = generate_short_uuid()

    # Create event name
    event_name = (
        f"{iso3}_{disaster_type}_{disaster_subtype}_{start_time}_{short_uuid}"
    )

    return event_name


# Create the event_name column
df_filtered["event_name"] = df_filtered.apply(create_event_name, axis=1)

# Display the first few rows to verify the new event_name column
print("First few rows after adding event_name column:")
display(df_filtered[["event_name"] + df_filtered.columns[:-1].tolist()].head())

# Confirm the new column is added
print("\nUpdated column names:")
print(df_filtered.columns.tolist())


In [None]:
# Save the final df_filtered to a CSV file
csv_filename = "preprocessed_emdat.csv"
df_filtered.to_csv(csv_filename, index=False)
print(f"DataFrame saved to {csv_filename}")


In [None]:
from getpass import getpass

import psycopg2
from psycopg2.extras import execute_values


def insert_emdat_data(df):
    """
    Insert the preprocessed EM-DAT data into the events_emdat table.

    Args:
    df (pandas.DataFrame): The preprocessed EM-DAT data.

    Returns:
    None
    """
    try:
        # Connect to the local PostgreSQL database
        conn = psycopg2.connect(
            dbname="merge",
            user="postgres",
            password=getpass("Enter the database password: "),
            host=input("Enter the database host: "),
        )

        # Create a cursor object
        cur = conn.cursor()

        # Prepare the data for insertion
        data = df.to_dict("records")

        # SQL query for inserting data
        insert_query = """
        INSERT INTO events_emdat (
            event_name, disaster_group, disaster_subgroup, disaster_type, disaster_subtype,
            iso3_code, admin_level_0, admin_level_1, admin_level_2, start_date, end_date,
            total_deaths, number_injured, number_affected, number_homeless, total_affected,
            total_damage_adjusted, reconstruction_costs_adjusted, aid_contribution,
            source, metadata, USGS, GLIDE, DFO
        ) VALUES %s
        ON CONFLICT (event_name, iso3_code, start_date) DO NOTHING
        """

        # Convert data to tuple format for psycopg2, handling potential integer overflow
        values = [
            (
                row["event_name"],
                row["disaster_group"],
                row["disaster_subgroup"],
                row["disaster_type"],
                row["disaster_subtype"],
                row["iso3_code"],
                row["admin_level_0"],
                row["admin_level_1"],
                row["admin_level_2"],
                row["start_date"],
                row["end_date"],
                int(row["total_deaths"])
                if pd.notna(row["total_deaths"])
                else None,
                int(row["number_injured"])
                if pd.notna(row["number_injured"])
                else None,
                int(row["number_affected"])
                if pd.notna(row["number_affected"])
                else None,
                int(row["number_homeless"])
                if pd.notna(row["number_homeless"])
                else None,
                int(row["total_affected"])
                if pd.notna(row["total_affected"])
                else None,
                float(row["total_damage_adjusted"])
                if pd.notna(row["total_damage_adjusted"])
                else None,
                float(row["reconstruction_costs_adjusted"])
                if pd.notna(row["reconstruction_costs_adjusted"])
                else None,
                float(row["aid_contribution"])
                if pd.notna(row["aid_contribution"])
                else None,
                row["source"],
                row["metadata"],
                row["USGS"],
                row["GLIDE"],
                row["DFO"],
            )
            for row in data
        ]

        # Execute the insert query
        execute_values(cur, insert_query, values)

        # Commit the changes
        conn.commit()

        print("Data inserted successfully into events_emdat table.")

    except (Exception, psycopg2.Error) as error:
        print(f"Error inserting data into events_emdat table: {error}")

    finally:
        # Close the cursor and connection
        if cur:
            cur.close()
        if conn:
            conn.close()


# Call the function to insert the data
insert_emdat_data(df_filtered)

# Print a message to confirm the operation is complete
print("EM-DAT data insertion process completed.")


# show info of the final df


In [None]:
# Display information about the filtered DataFrame
print("Filtered DataFrame Info:")
df_filtered.info()

# Display the first few rows of the filtered DataFrame
print("\nFirst few rows of the filtered DataFrame:")
display(df_filtered.head())

# Display summary statistics for the filtered DataFrame
print("\nSummary statistics for the filtered DataFrame:")
display(df_filtered.describe())

# Count non-null values for each column in the filtered DataFrame
print("\nNon-null value counts for each column:")
display(df_filtered.count())
