In [None]:
%load_ext autoreload
%autoreload 3

In [None]:
from matplotlib import pyplot as plt
import pandas as pd
import numpy as np
import sqlalchemy as sa

from dagster import AssetKey
from ferc1_eia_match.candidate_set_creation import DataframeEmbedder, SimilaritySearcher
from ferc1_eia_match.name_cleaner import CompanyNameCleaner
from ferc1_eia_match import config
import pudl
from pudl.etl import defs
from splink.duckdb.linker import DuckDBLinker
import splink.duckdb.comparison_template_library as ctl

In [None]:
pudl_engine = sa.create_engine(pudl.workspace.setup.PudlPaths().pudl_db)

# Get and Clean Inputs

FERC: get the FERC plant and utility information

EIA: get the plant and utility information for unmapped EIA utilities

- There's no FERC plant location data and there's no FERC utility address information, so the match should just be done on plant name and utility name

TODO: more string cleaning on null values

In [None]:
ferc_df = pd.read_sql("denorm_plants_utilities_ferc1", pudl_engine)

In [None]:
eia_df = pd.read_sql("denorm_plants_utilities_eia", pudl_engine)

In [None]:
# TODO: is this already handled by the company name cleaner?
eia_df = eia_df.fillna({"utility_name_eia": "", "plant_name_eia": ""})
ferc_df = ferc_df.fillna({"utility_name_ferc1": "", "plant_name_ferc1": ""})

In [None]:
utility_name_cleaner = CompanyNameCleaner()
eia_df = utility_name_cleaner.get_clean_df(eia_df, "utility_name_eia", "utility_name")
ferc_df = utility_name_cleaner.get_clean_df(ferc_df, "utility_name_ferc1", "utility_name")

In [None]:
# default rules except keep words in parentheses
plant_name_cleaner = CompanyNameCleaner(
    cleaning_rules_list=[
        "replace_amperstand_between_space_by_AND",
        "replace_hyphen_between_spaces_by_single_space",
        "replace_underscore_by_space",
        "replace_underscore_between_spaces_by_single_space",
        "remove_text_puctuation_except_dot",
        "remove_math_symbols",
        "add_space_before_opening_parentheses",
        "add_space_after_closing_parentheses",
        "remove_parentheses",
        "remove_brackets",
        "remove_curly_brackets",
        "enforce_single_space_between_words",
    ]
)
eia_df = plant_name_cleaner.get_clean_df(eia_df, "plant_name_eia", "plant_name")
ferc_df = plant_name_cleaner.get_clean_df(ferc_df, "plant_name_ferc1", "plant_name")

In [None]:
# TODO: figure out what to actually do with nulls
replace = ["u.f.", "n o n e", "o", "0", "1", "", ".", "n/a"]
eia_df = eia_df.replace(replace, pd.NA)
ferc_df = ferc_df.replace(replace, pd.NA)

In [None]:
eia_df = eia_df.drop_duplicates(subset=["plant_name", "utility_name", "plant_id_pudl", "utility_id_pudl"])
eia_df = eia_df.reset_index(drop=True)

In [None]:
ferc_df = ferc_df.drop_duplicates(subset=["plant_name", "utility_name", "plant_id_pudl", "utility_id_pudl"])
ferc_df = ferc_df.reset_index(drop=True)

In [None]:
eia_df.head(3)

In [None]:
ferc_df.head(3)

# Run Blocking

In [None]:
# set configuration for model
embedding_config_dict = {
    "embedding_map": {
        "plant_name": {"embedding_type": "tfidf_vectorize"},
        "utility_name": {"embedding_type": "tfidf_vectorize"},
    },
    "matching_cols": [
        "plant_name",
        "utility_name",
    ],
}
embedding_config = config.EmbeddingConfig(**embedding_config_dict)

In [None]:
embedder = DataframeEmbedder(
    left_df = eia_df, 
    right_df = ferc_df, 
    embedding_map = embedding_config.embedding_map)

In [None]:
embedder.embed_dataframes()

In [None]:
embedder.left_embedding_matrix.shape, embedder.right_embedding_matrix.shape

In [None]:
searcher = SimilaritySearcher(query_embedding_matrix=embedder.left_embedding_matrix,
                   menu_embedding_matrix=embedder.right_embedding_matrix,
                   query_blocks_dict=embedder.left_blocks_dict,
                   menu_blocks_dict=embedder.right_blocks_dict)

In [None]:
k = 1

In [None]:
%%time
cand_set, distances = searcher.run_candidate_pair_search(k)

In [None]:
cand_set.shape

In [None]:
i = 1

In [None]:
cols = ["utility_name", "plant_name", "utility_id_pudl", "plant_id_pudl"]
pd.concat([eia_df.iloc[[i]][cols],
           ferc_df.iloc[cand_set[i]][cols]])

In [None]:
distances[i]

In [None]:
rename_d = {"plant_id_pudl": "plant_id_pudl_right",
            "utility_id_pudl": "utility_id_pudl_right",
            "utility_name": "utility_name_right",
            "plant_name": "plant_name_right"}
result = pd.concat([eia_df[cols],
                    ferc_df[cols].iloc[cand_set.squeeze()].rename(columns=rename_d).reset_index(drop=True)],
                    axis=1)

In [None]:
result["distance"] = distances

In [None]:
util_mask = (result.utility_id_pudl == result.utility_id_pudl_right)
plant_mask = (result.plant_id_pudl == result.plant_id_pudl_right)
correct = result[util_mask & plant_mask]

In [None]:
# use 1.5 as thresh?
plt.hist(correct.distance, bins=20)
plt.show()

In [None]:
u, c = np.unique(cand_set, return_counts=True)

In [None]:
len(eia_df), len(ferc_df), cand_set.shape, len(u)

In [None]:
eia_cands = eia_df.reset_index(names="block_num")

In [None]:
ferc_cands = ferc_df.iloc[cand_set.flatten()]
block_nums = np.repeat(np.arange(len(eia_df)), k)
ferc_cands.loc[:, "block_num"] = block_nums

# Match With Splink

In [None]:
matching_cols = ["plant_name", "utility_name"]
extra_cols = ["plant_id_pudl", "utility_id_pudl", "index", "block_num"]

In [None]:
eia_left = eia_cands.reset_index()
ferc_right = ferc_cands.reset_index()

In [None]:
eia_left

In [None]:
settings_dict = {"link_type": "link_only",
                 "unique_id_column_name": "index",
                 "additional_columns_to_retain": ["plant_id_pudl", "utility_id_pudl"]}

In [None]:
linker = DuckDBLinker(
    [eia_left[matching_cols + extra_cols], 
     ferc_right[matching_cols + extra_cols]], 
    input_table_aliases = ["eia_df", "ferc_df"], 
    settings_dict=settings_dict)

In [None]:
linker.profile_columns(matching_cols, top_n=10, bottom_n=5)

In [None]:
# TODO: try with leveshtein thresholds
plant_name_comparison = ctl.name_comparison("plant_name", 
                                            damerau_levenshtein_thresholds=[], 
                                            jaro_winkler_thresholds=[.9, .8, .7])
utility_name_comparison = ctl.name_comparison("utility_name", 
                                              damerau_levenshtein_thresholds=[], 
                                              jaro_winkler_thresholds=[.9, .8], 
                                              term_frequency_adjustments=True)

From docs: "For linkages in DuckDB on a standard laptop, we suggest using blocking rules that create no more than about 20 million comparisons. For Spark and Athena, try starting with fewer than a a billion comparisons, before scaling up."

In [None]:
blocking_rule = "l.block_num = r.block_num"
count = linker.count_num_comparisons_from_blocking_rule(blocking_rule)
print(f"Number of comparisons generated by '{blocking_rule}': {count:,.0f}")

In [None]:
settings_dict.update({
    "comparisons": [
        plant_name_comparison,
        utility_name_comparison,
    ],
    "blocking_rules_to_generate_predictions": [
        blocking_rule
    ],
    "retain_matching_columns": True,
    "retain_intermediate_calculation_columns": False,
    "probability_two_random_records_match": 1/len(eia_df) # is this correct?
    }
)

In [None]:
linker.load_settings(settings_dict)

In [None]:
%%time
linker.estimate_u_using_random_sampling(max_pairs=1e7)

In [None]:
training_blocking_rule_1 = "jaro_winkler_similarity(l.plant_name, r.plant_name) >= 0.9"
training_session_1 = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule_1)

In [None]:
training_blocking_rule_2 = "jaro_winkler_similarity(l.utility_name, r.utility_name) >= 0.9"
training_session_2 = linker.estimate_parameters_using_expectation_maximisation(training_blocking_rule_2)

In [None]:
# add more jaro_winkler thresholds in?
linker.match_weights_chart()

In [None]:
linker.m_u_parameters_chart()

In [None]:
n = 0

In [None]:
settings = linker.save_model_to_json(f"./splink_model_settings/pudl_id_model_settings_{n}.json", overwrite=False)

In [None]:
df_preds = linker.predict()

In [None]:
sorted_preds_df = df_preds.as_pandas_dataframe().sort_values(by="match_probability", ascending=False)

In [None]:
plant_id_match = (sorted_preds_df.plant_id_pudl_l == sorted_preds_df.plant_id_pudl_r)
utility_id_match = (sorted_preds_df.utility_id_pudl_l == sorted_preds_df.utility_id_pudl_r)
matches = sorted_preds_df[plant_id_match & utility_id_match]
plant_matches = sorted_preds_df[plant_id_match]
utility_matches = sorted_preds_df[utility_id_match]

In [None]:
# really should look at how many of the blocks have a correct match
len(matches)/len(eia_df), len(plant_matches)/len(eia_df), len(utility_matches)/len(eia_df)

In [None]:
matches.head(1)

In [None]:
plt.hist(matches.match_probability, bins=30)
plt.show()

# Use model to predict on owner utilities

This is just an example and no longer relevant because the EIA owner utilities didn't seem to have a good match. If you were to use the model to predict on new utilities, you'd loosely follow this. First clean and run blocking, then predict with splink model. 

In [None]:
eia_own_left = eia_own.reset_index()
# temporarily need to fill in utility_id_pudl
eia_own_left["utility_id_pudl"] = 0

Run blocking - use same embedding config

In [None]:
embedder = DataframeEmbedder(
    left_df = eia_own_left, 
    right_df = ferc_df, 
    embedding_map = embedding_config.embedding_map)

In [None]:
embedder.embed_dataframes()

In [None]:
embedder.left_embedding_matrix.shape, embedder.right_embedding_matrix.shape

In [None]:
searcher = SimilaritySearcher(query_embedding_matrix=embedder.left_embedding_matrix,
                   menu_embedding_matrix=embedder.right_embedding_matrix,
                   query_blocks_dict=embedder.left_blocks_dict,
                   menu_blocks_dict=embedder.right_blocks_dict)

In [None]:
k = 1

In [None]:
%%time
cand_set, distances = searcher.run_candidate_pair_search(k)

In [None]:
distances.shape

In [None]:
cand_set.shape

In [None]:
i = 0

In [None]:
pd.concat([eia_own_left.iloc[[i]][["utility_name", "plant_name"]],
           ferc_df.iloc[cand_set[i]][["utility_name", "plant_name"]]])

In [None]:
distances[i]

In [None]:
cols = ["utility_name", "plant_name", "utility_id_pudl", "plant_id_pudl"]
rename_d = {"plant_id_pudl": "plant_id_pudl_right",
            "utility_id_pudl": "utility_id_pudl_right",
            "utility_name": "utility_name_right",
            "plant_name": "plant_name_right"}
result = pd.concat([eia_own_left[cols],
                    ferc_df[cols].iloc[cand_set.squeeze()].rename(columns=rename_d).reset_index(drop=True)],
                    axis=1)

In [None]:
result["distance"] = distances

In [None]:
result[result.distance <= 2].to_csv("owner_util_prelim_match.csv")

In [None]:
eia_cands = eia_own_left.reset_index(names="block_num")

In [None]:
ferc_cands = ferc_df.iloc[cand_set.flatten()]
block_nums = np.repeat(np.arange(len(eia_own_left)), k)
ferc_cands.loc[:, "block_num"] = block_nums

Load linker

In [None]:
eia_own_left = eia_cands
ferc_own_right = ferc_cands.reset_index()

In [None]:
own_linker = DuckDBLinker([eia_own_left[matching_cols + extra_cols],
                          ferc_own_right[matching_cols + extra_cols]])
own_linker.load_settings(f"./splink_model_settings/pudl_id_model_settings_{n}.json")

In [None]:
df = own_linker.predict()

In [None]:
sorted_preds = df.as_pandas_dataframe().sort_values(by="match_probability", ascending=False)