In [71]:
import re
import pandas as pd
import os
from thefuzz import fuzz
from src.data_processing import clean_master_angler_data

from src.snowflake_db import snowflake_writer

from src.ml import index_builder

import recordlinkage

### Perform String Matching

In [73]:
def strip_all_cols(df):

    for col in df.columns:
        df[col] = df[col].str.strip()

In [74]:
builder_object = snowflake_writer.SnowflakeDfWriter().build_session()

with builder_object.create() as session:

    all_species_df  = session.sql('select * from storage_database.CPW_DATA.ALL_SPECIES').to_pandas()
    master_angler_df  = session.sql('select * from storage_database.CPW_DATA.MASTER_ANGLER_AWARD').to_pandas()
strip_all_cols(all_species_df)

2023-12-11 21:53:50,069 [session.py] [            __init__()] [INFO] - Snowpark Session information: 
"version" : 1.10.0,
"python.version" : 3.10.11,
"python.connector.version" : 3.4.1,
"python.connector.session.id" : 16311285063090202,
"os.name" : Windows

2023-12-11 21:53:51,612 [session.py] [               close()] [INFO] - Closing session: 16311285063090202
2023-12-11 21:53:51,613 [session.py] [          cancel_all()] [INFO] - Canceling all running queries
2023-12-11 21:53:51,861 [session.py] [               close()] [INFO] - Closed session: 16311285063090202


In [None]:
### Make the data pipeline
# Step 1 clean all_species_df and master_angler_df
# Step 2 run the matching and write to a new tableS

In [None]:
### CI/CD: create a local deployment script then refactor for github actions
# Each script (.py) gets their own register function
# all scripts share the same snowflake stage

# logging --- will not be repo dependent since ther is only one event table per account

# setup script
# stage
#   

### Case Study: Finding Matches for Lake Trout

In [136]:
# to figure out the best way to match data we will do a matching exercise on lake trout
all_species_df["Main Species"].unique()

array(['Brook', 'Brown', 'Cutthroat', 'Golden', 'Lake', 'Rainbow',
       'Snake River Cutthroat', 'Tiger', 'Unspecified', 'Cutbow'],
      dtype=object)

In [310]:
master_angler_df["Species"] = master_angler_df.Species.str.replace(r'\s*\(Native\)\s*Trout\)*|\s*Trout', "", regex=True)
master_trout_df = master_angler_df.loc[master_angler_df.Species.isin(all_species_df["Main Species"].unique())].copy()
master_trout_df

Unnamed: 0,Angler,Species,Length,Location,Date,Released,Location_clean
3,Aaron Lingwall,Snake River Cutthroat,22,Hohnholz Lake #2,October/2021,Yes,hohnholz #2
4,Aaron Mathew Carter,Cutbow,24,Charlie Meyers (Dream Stream),July/2021,Yes,charlie meyers dream stream
5,Aaron ORTIZ,Rainbow,24,Blue River Silverthorn,May/2021,Yes,blue river silverthorn
17,Adam Allsup,Rainbow,24,Spinney Mountain Reservoir,May/2021,Yes,spinney mountain
20,Adam Spice-Alva,Cutbow,23,Chessman Canyon,June/2021,Yes,chessman canyon
...,...,...,...,...,...,...,...
3899,Wyatt Ivey,Cutbow,25,Elevenmile Reservoir,January/2023,Yes,elevenmile
3905,Zach Ianger,Rainbow,29,Blue River,July/2023,Yes,blue river
3907,Zach Langer,Rainbow,25,Blue River,July/2023,No,blue river
3909,Zachary Baerg,Cutbow,23,Steamboat Lake,January/2023,No,steamboat


In [279]:
# This adds 100 new matches
def clean_text_columns(df: pd.DataFrame, columns = list):
    """ Removes all non character or spaces, makes strings lower case and removes white spaces that may exist after
    removing substrings.
    """
    for col in columns:
        clean_column = col+"_clean"

        df[clean_column] = df[col].str.replace("[^a-zA-Z0-9# ]", "", regex=True)

        df[clean_column] = df[clean_column].str.lower()

        df[clean_column] = df[clean_column].str.replace("lake|reservoir|reservior", "", regex=True)

        df[clean_column] = df[clean_column].str.strip()

clean_text_columns(master_angler_df, ["Location"])
clean_text_columns(all_species_df, ["Water", "Property name"])

In [276]:
def add_df_cols(feature_vectors: pd.DataFrame, left: pd.DataFrame, right: pd.DataFrame, left_columns: [str], right_columns: [str]):
    """ When a comparison is done. It adds columns from the dataframes used to generate the comparison.
    """
    feature_vectors[left_columns] = left.loc[feature_vectors.level_0.to_list()][left_columns].reset_index(drop=True)
    feature_vectors[right_columns] = right.loc[feature_vectors.level_1.to_list()][right_columns].reset_index(drop=True)
    

In [311]:
indexer = recordlinkage.Index()
indexer.block('Water_clean', 'Location_clean')
indexer.block('Property name_clean', 'Location_clean')
indexer.block('Main Species', 'Species')
candidate_links = indexer.index(all_species_df, master_trout_df)

c = recordlinkage.Compare()

c.string('Water_clean', 'Location_clean', method='jarowinkler', label="Jaro Comparison")
c.string('Water_clean', 'Location_clean', method='damerau_levenshtein', label="Levenshtien Comparison")

# The comparison vectors
feature_vectors = c.compute(candidate_links, all_species_df, master_trout_df)

feature_vectors = feature_vectors.reset_index()
add_df_cols(feature_vectors, all_species_df, master_trout_df, ["Water", "Water_clean"], ["Location", "Location_clean", "Angler", "Length", "Date", "Species"])

# drop duplicate matches for each master angler entry
key_cols = [
    'level_1',
    'Jaro Comparison',
    'Levenshtien Comparison',
    'Water',
    'Water_clean',
    'Location',
    'Location_clean',
    'Angler',
    'Length',
    'Date'
 ]
feature_vectors = feature_vectors.drop_duplicates(subset=key_cols)

feature_vectors["combined"] = feature_vectors["Jaro Comparison"] + feature_vectors["Levenshtien Comparison"]

feature_vectors

Unnamed: 0,level_0,level_1,Jaro Comparison,Levenshtien Comparison,Water,Water_clean,Location,Location_clean,Angler,Length,Date,Species,combined
0,0,198,0.416667,0.000000,Alberta Park Reservoir,alberta park,Dillon Reservior,dillon,Candace JoAnne Rich,18,February/2021,Brook,0.416667
1,0,202,0.505556,0.083333,Alberta Park Reservoir,alberta park,Grand Mesa,grand mesa,Cera Nieto,16,August/2021,Brook,0.588889
2,0,210,0.436508,0.000000,Alberta Park Reservoir,alberta park,Cottonwood Lake #1,cottonwood #1,Chandra Sanderson,18,January/2021,Brook,0.436508
3,0,222,0.583333,0.250000,Alberta Park Reservoir,alberta park,Antero Reservoir,antero,Chris Gerk,17,January/2021,Brook,0.833333
4,0,276,0.494444,0.000000,Alberta Park Reservoir,alberta park,Tumblesome Lake,tumblesome,Chuck Kubin,17,July/2021,Brook,0.494444
...,...,...,...,...,...,...,...,...,...,...,...,...,...
369529,1927,3863,0.427579,0.125000,Wrights Lake,wrights,Spinney Mountain Reservoir,spinney mountain,Tyler Finley,25,August/2023,Cutbow,0.552579
369530,1927,3877,0.000000,0.000000,Wrights Lake,wrights,Eleven Mile Reservoir,eleven mile,Victoria Nuanes,23,February/2023,Cutbow,0.000000
369531,1927,3899,0.000000,0.000000,Wrights Lake,wrights,Elevenmile Reservoir,elevenmile,Wyatt Ivey,25,January/2023,Cutbow,0.000000
369532,1927,3909,0.417989,0.000000,Wrights Lake,wrights,Steamboat Lake,steamboat,Zachary Baerg,23,January/2023,Cutbow,0.417989


### Picking a winning match

In [331]:
# We want to end with 1 match of location to out master angler dataset
accounts_with_perfect_match = feature_vectors.loc[(feature_vectors["Levenshtien Comparison"] == 1.0) & (feature_vectors["Jaro Comparison"] == 1.0)].level_1.unique()

In [321]:
# 709 don't have a match, 1847 have a perfect match
print(f"total ids {feature_vectors.level_1.nunique()}")
print(f"Ids with a match {len(accounts_with_perfect_match)}")
print(f"Ids without a perfect match {feature_vectors.loc[~feature_vectors.level_1.isin(accounts_with_perfect_match)].level_1.nunique()}")


total ids 1303
Ids with a match 689
Ids without a perfect match 614


In [317]:
all_species_df.loc[all_species_df.Water.str.contains("Windsor", case=False)]

Unnamed: 0,Main Species,Fish Species,Water,County,Property name,Ease of access,Boating,Fishing pressure,Stocked,Elevation(ft),Latitude,Longitude,Notes,Water_clean,Property name_clean
1030,Cutthroat,"Cutthroat, Golden, Lake",Windsor Lake,Lake,Mount Massive Wilderness Area,Difficult,,Low,Sub-catchables,11629,39.24234 N,-106.48675 W,,windsor,mount massive wilderness area
1038,Golden,"Cutthroat, Golden, Lake",Windsor Lake,Lake,Mount Massive Wilderness Area,Difficult,,Low,Sub-catchables,11629,39.24234 N,-106.48675 W,,windsor,mount massive wilderness area
1070,Lake,"Cutthroat, Golden, Lake",Windsor Lake,Lake,Mount Massive Wilderness Area,Difficult,,Low,Sub-catchables,11629,39.24234 N,-106.48675 W,,windsor,mount massive wilderness area
1695,Rainbow,"Largemouth,Bluegill, Common, Black, Cutbow, Ra...",Windsor Lake,Weld,Boardwalk Community Park,Easy,Motor boats,Medium,"Catchable trout, Sub-catchables and Warmwater",4793,40.48499 N,-104.90111 W,,windsor,boardwalk community park
1925,Cutbow,"Largemouth,Bluegill, Common, Black, Cutbow, Ra...",Windsor Lake,Weld,Boardwalk Community Park,Easy,Motor boats,Medium,"Catchable trout, Sub-catchables and Warmwater",4793,40.48499 N,-104.90111 W,,windsor,boardwalk community park


In [363]:
# Use one or both scores?

# Get all the coin tosses and see if a fixed point one works
# 963,95,287,360,467- has match, 182- should have a match but it doesn't even show, 372- has mach, 642 has a match but it's not showing up, 824 has match, 894- has match, 
# 931 - has match, 1003 has match but is 2nd number

# A 1.4 would successfully account for most of these places
#
# For almost all pulling the largest score works
feature_vectors[(feature_vectors.level_1 == 3720)].sort_values(by="Levenshtien Comparison", ascending=False)

Unnamed: 0,level_0,level_1,Jaro Comparison,Levenshtien Comparison,Water,Water_clean,Location,Location_clean,Angler,Length,Date,Species,combined
15840,204,3720,0.651587,0.380952,Los Pinos River,los pinos river,Allen Basin Reserovir,allen basin reserovir,Samuel Ortiz,19,July/2023,Brook,1.032540
557,5,3720,0.648629,0.380952,Arkansas River,arkansas river,Allen Basin Reserovir,allen basin reserovir,Samuel Ortiz,19,July/2023,Brook,1.029582
11248,144,3720,0.635201,0.333333,Glacier Springs Retention Pond,glacier springs retention pond,Allen Basin Reserovir,allen basin reserovir,Samuel Ortiz,19,July/2023,Brook,0.968534
146,1,3720,0.608466,0.333333,Alexander Lake,alexander,Allen Basin Reserovir,allen basin reserovir,Samuel Ortiz,19,July/2023,Brook,0.941799
10712,137,3720,0.580952,0.333333,Fryingpan River,fryingpan river,Allen Basin Reserovir,allen basin reserovir,Samuel Ortiz,19,July/2023,Brook,0.914286
...,...,...,...,...,...,...,...,...,...,...,...,...,...
15764,203,3720,0.531746,0.095238,Long Lake,long,Allen Basin Reserovir,allen basin reserovir,Samuel Ortiz,19,July/2023,Brook,0.626984
17146,221,3720,0.519841,0.095238,Mesa Lake,mesa,Allen Basin Reserovir,allen basin reserovir,Samuel Ortiz,19,July/2023,Brook,0.615079
6523,82,3720,0.408730,0.047619,Cottonwood Lake #1,cottonwood #1,Allen Basin Reserovir,allen basin reserovir,Samuel Ortiz,19,July/2023,Brook,0.456349
8734,111,3720,0.000000,0.047619,Dowdy Lake,dowdy,Allen Basin Reserovir,allen basin reserovir,Samuel Ortiz,19,July/2023,Brook,0.047619


In [360]:
all_species_df.loc[all_species_df.Water.str.contains('Cascade', case=False)]

Unnamed: 0,Main Species,Fish Species,Water,County,Property name,Ease of access,Boating,Fishing pressure,Stocked,Elevation(ft),Latitude,Longitude,Notes,Water_clean,Property name_clean


In [5]:
greater_than_df = (feature_vectors.groupby(by='level_1').combined.max() > 1.3).reset_index()

no_match_ids = greater_than_df[greater_than_df.combined == False].level_1.unique()

print(len(no_match_ids))
no_match_ids

NameError: name 'feature_vectors' is not defined

### Registering a sproc

In [4]:
from src.snowflake_ import snowflake_writer

from snowflake.snowpark import stored_procedure

sproc_fpath = r"C:\Users\hanna\Desktop\Python\scraping_master_angler_data\src\snowflake_\test_function.py"

session_builder = snowflake_writer.SnowflakeDfWriter().build_session()

with session_builder.create() as session:

    session.sproc.register_from_file(
        file_path=sproc_fpath,
        name=["STORAGE_DATABASE", "CPW_DATA", "test2"],
        func_name="main",
        is_permanent=True,
        replace=True,
        stage_location="STORAGE_DATABASE.CPW_DATA.TEST_STAGE",
        packages=[
            "snowflake-snowpark-python",
            "pandas"
        ]
    )

2023-12-16 16:57:46,624 [session.py] [            __init__()] [INFO] - Snowpark Session information: 
"version" : 1.10.0,
"python.version" : 3.10.11,
"python.connector.version" : 3.4.1,
"python.connector.session.id" : 16311285063151650,
"os.name" : Windows

2023-12-16 16:57:50,409 [session.py] [               close()] [INFO] - Closing session: 16311285063151650
2023-12-16 16:57:50,410 [session.py] [          cancel_all()] [INFO] - Canceling all running queries
2023-12-16 16:57:50,584 [session.py] [               close()] [INFO] - Closed session: 16311285063151650


In [26]:
import toml  

with open('snowflake_setup.toml', 'r') as f:
    setup_file = toml.load(f)

setup_file

{'procedures': [{'fname': 'test_function.py',
   'function_name': 'main',
   'procedure_name': ['STORAGE_DATABASE', 'CPW_DATA', 'test2'],
   'packages': ['snowflake-snowpark-python', 'pandas']},
  {'fname': 'sproc_elt.py',
   'function_name': 'combine_trout_tables',
   'procedure_name': ['STORAGE_DATABASE', 'CPW_DATA', 'sproc_elt'],
   'packages': ['snowflake-snowpark-python', 'pandas']}],
 'test_stage': {'fully_qualified_name': 'STORAGE_DATABASE.CPW_DATA.TEST_STAGE',
  'stage_database': 'STORAGE_DATABASE',
  'stage_schema': 'CPW_DATA',
  'stage_name': 'TEST_STAGE'}}

In [7]:
import os 
import json
import toml  
import logging

from snowflake.snowpark import Session

logging.getLogger("snowflake.connector").setLevel(logging.WARNING)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(filename)s] [%(funcName)20s()] [%(levelname)s] - %(message)s",
    # stream=sys.stdout,
)
logger = logging.getLogger("STORAGE_DATABASE.CPW_DATA.LOG_OUTPUTS")

with open('snowflake_setup.toml', 'r') as f:
    setup_file = toml.load(f)


# Create the base path for procedures
working_directory = os.getcwd()
snowflake_folder = os.path.join(working_directory, "src\\snowflake_")

# Connec to the db
current_directory = os.getcwd()

config_file_path = os.path.join(current_directory, "config.json")

with open(config_file_path, "r") as config_file:
    config_data = json.load(config_file)

builder_obj = Session.builder.configs(config_data)

with builder_obj.create() as session:
    # Create stage if it does not exist
    create_statement_result = session.sql("CREATE STAGE IF NOT EXISTS {}".format(setup_file["stage"]["fully_qualified_name"])).collect()

    logger.info(f"Successfully created table. Statement result: {create_statement_result[0]}")

    # Deploy Procedures
    for procedure in setup_file['procedures']:

        session.use_database(setup_file["stage"]["stage_database"])
        session.use_schema(setup_file["stage"]["stage_schema"])

        new_procedure = session.sproc.register_from_file(
            file_path=os.path.join(snowflake_folder, procedure["fname"]),
            name= procedure["procedure_name"],
            func_name= procedure["function_name"],
            stage_location= setup_file["stage"]["fully_qualified_name"],
            packages= procedure['packages'],
            is_permanent= True,
            replace= True,
            source_code_display=True,
            execute_as= 'owner'
        )

        logger.info(f"Created procedure: {new_procedure.name} for function {new_procedure.name}")