In [None]:
import requests
import datetime
import zipfile
import os
import datetime
import pandas as pd

# USA Spending API endpoint and parameters
api_endpoint = "https://api.usaspending.gov/api/v2/bulk_download/awards/"
api_params = {
    "filters": {
        "prime_award_types":[
            "02",
            "03",
            "04",
            "05",
            "06",
            "07",
            "08",
            "09",
            "10",
            "11",
            "-1"
        ],
        "date_type": "action_date",
        "date_range": {
            "start_date":(datetime.datetime.now() - datetime.timedelta(days=7)).strftime("%Y-%m-%d"),
            "end_date":datetime.datetime.now().strftime("%Y-%m-%d")
        }
    },
    "file_format": "csv",
}

# Retrieve federal spending data from the USA Spending API
response = requests.post(api_endpoint, json=api_params)
response_data = response.json()

if response.status_code == 200:
    download_url = response_data["file_url"]
    download_response = requests.get(download_url)
    if download_response.status_code == 200:
        file_name = response_data["file_name"]
        with open(file_name, "wb") as file:
            file.write(download_response.content)
        print("Data file downloaded successfully.")

        # Unzip the downloaded file
        with zipfile.ZipFile(file_name, "r") as zip_ref:
            zip_ref.extractall("extracted_data")
        print("Data file unzipped successfully.")

        # Remove the downloaded zip file
        os.remove(file_name)
        print("Downloaded zip file removed.")
    else:
        print("Error downloading data file.")
else:
    print("Error:", response.status_code)
    print("Message:", response_data["message"])

In [74]:
import zipfile
import pandas as pd
import os
import glob

def usa_spending_df (financial_year):
    print(financial_year)
    zip_file_path = f'/Users/jack.mccormick/Library/CloudStorage/OneDrive-Bonterra/USA Spending Data/All_{financial_year}_PrimeTransactionsAndSubawards.zip'

    data_folder = f'../data/usa_spending_data/{financial_year}'

    prime_dtypes= {
        'assistance_transaction_unique_key':'object',
        'assistance_award_unique_key':'object',
        'award_id_fain':'object',
        'modification_number':'object',
        'award_id_uri':'object',
        'sai_number':'object',
        'federal_action_obligation':'float64',
        'total_obligated_amount':'float64',
        'total_outlayed_amount_for_overall_award':'float64',
        'indirect_cost_federal_share_amount':'float64',
        'non_federal_funding_amount':'float64',
        'total_non_federal_funding_amount':'float64',
        'face_value_of_loan':'float64',
        'original_loan_subsidy_cost':'float64',
        'total_face_value_of_loan':'float64',
        'total_loan_subsidy_cost':'float64',
        'generated_pragmatic_obligations':'float64',
        'disaster_emergency_fund_codes_for_overall_award':'object',
        'outlayed_amount_from_COVID-19_supplementals_for_overall_award':'float64',
        'obligated_amount_from_COVID-19_supplementals_for_overall_award':'float64',
        'outlayed_amount_from_IIJA_supplemental_for_overall_award':'float64',
        'obligated_amount_from_IIJA_supplemental_for_overall_award':'float64',
        'action_date':'object',
        'action_date_fiscal_year':'int64',
        'period_of_performance_start_date':'object',
        'period_of_performance_current_end_date':'object',
        'awarding_agency_code':'int64',
        'awarding_agency_name':'object',
        'awarding_sub_agency_code':'object',
        'awarding_sub_agency_name':'object',
        'awarding_office_code':'object',
        'awarding_office_name':'object',
        'funding_agency_code':'float64',
        'funding_agency_name':'object',
        'funding_sub_agency_code':'object',
        'funding_sub_agency_name':'object',
        'funding_office_code':'object',
        'funding_office_name':'object',
        'treasury_accounts_funding_this_award':'object',
        'federal_accounts_funding_this_award':'object',
        'object_classes_funding_this_award':'object',
        'program_activities_funding_this_award':'object',
        'recipient_uei':'object',
        'recipient_duns':'float64',
        'recipient_name':'object',
        'recipient_name_raw':'object',
        'recipient_parent_uei':'object',
        'recipient_parent_duns':'float64',
        'recipient_parent_name':'object',
        'recipient_parent_name_raw':'object',
        'recipient_country_code':'object',
        'recipient_country_name':'object',
        'recipient_address_line_1':'object',
        'recipient_address_line_2':'object',
        'recipient_city_code':'object',
        'recipient_city_name':'object',
        'prime_award_transaction_recipient_county_fips_code':'float64',
        'recipient_county_name':'object',
        'prime_award_transaction_recipient_state_fips_code':'float64',
        'recipient_state_code':'object',
        'recipient_state_name':'object',
        'recipient_zip_code':'float64',
        'recipient_zip_last_4_code':'float64',
        'prime_award_transaction_recipient_cd_original':'object',
        'prime_award_transaction_recipient_cd_current':'object',
        'recipient_foreign_city_name':'object',
        'recipient_foreign_province_name':'object',
        'recipient_foreign_postal_code':'object',
        'primary_place_of_performance_scope':'object',
        'primary_place_of_performance_country_code':'object',
        'primary_place_of_performance_country_name':'object',
        'primary_place_of_performance_code':'object',
        'primary_place_of_performance_city_name':'object',
        'prime_award_transaction_place_of_performance_county_fips_code':'float64',
        'primary_place_of_performance_county_name':'object',
        'prime_award_transaction_place_of_performance_state_fips_code':'float64',
        'primary_place_of_performance_state_name':'object',
        'primary_place_of_performance_zip_4':'object',
        'prime_award_transaction_place_of_performance_cd_original':'object',
        'prime_award_transaction_place_of_performance_cd_current':'object',
        'primary_place_of_performance_foreign_location':'object',
        'cfda_number':'float64',
        'cfda_title':'object',
        'funding_opportunity_number':'object',
        'funding_opportunity_goals_text':'object',
        'assistance_type_code':'int64',
        'assistance_type_description':'object',
        'transaction_description':'object',
        'prime_award_base_transaction_description':'object',
        'business_funds_indicator_code':'object',
        'business_funds_indicator_description':'object',
        'business_types_code':'object',
        'business_types_description':'object',
        'correction_delete_indicator_code':'object',
        'correction_delete_indicator_description':'object',
        'action_type_code':'object',
        'action_type_description':'object',
        'record_type_code':'int64',
        'record_type_description':'object',
        'highly_compensated_officer_1_name':'object',
        'highly_compensated_officer_1_amount':'float64',
        'highly_compensated_officer_2_name':'object',
        'highly_compensated_officer_2_amount':'float64',
        'highly_compensated_officer_3_name':'object',
        'highly_compensated_officer_3_amount':'float64',
        'highly_compensated_officer_4_name':'object',
        'highly_compensated_officer_4_amount':'float64',
        'highly_compensated_officer_5_name':'object',
        'highly_compensated_officer_5_amount':'float64',
        'usaspending_permalink':'object',
        'initial_report_date':'object',
        'last_modified_date':'object'
        }
    
    sub_dtypes= {
        'prime_award_unique_key':'object',
        'prime_award_fain':'object',
        'prime_award_amount':'float64',
        'prime_award_disaster_emergency_fund_codes':'object',
        'prime_award_outlayed_amount_from_COVID-19_supplementals':'float64',
        'prime_award_obligated_amount_from_COVID-19_supplementals':'float64',
        'prime_award_outlayed_amount_from_IIJA_supplemental':'float64',
        'prime_award_obligated_amount_from_IIJA_supplemental':'float64',
        'prime_award_total_outlayed_amount':'float64',
        'prime_award_base_action_date':'object',
        'prime_award_base_action_date_fiscal_year':'int64',
        'prime_award_latest_action_date':'object',
        'prime_award_latest_action_date_fiscal_year':'int64',
        'prime_award_period_of_performance_start_date':'object',
        'prime_award_period_of_performance_current_end_date':'object',
        'prime_award_awarding_agency_code':'int64',
        'prime_award_awarding_agency_name':'object',
        'prime_award_awarding_sub_agency_code':'object',
        'prime_award_awarding_sub_agency_name':'object',
        'prime_award_awarding_office_code':'object',
        'prime_award_awarding_office_name':'object',
        'prime_award_funding_agency_code':'float64',
        'prime_award_funding_agency_name':'object',
        'prime_award_funding_sub_agency_code':'object',
        'prime_award_funding_sub_agency_name':'object',
        'prime_award_funding_office_code':'object',
        'prime_award_funding_office_name':'object',
        'prime_award_treasury_accounts_funding_this_award':'object',
        'prime_award_federal_accounts_funding_this_award':'object',
        'prime_award_object_classes_funding_this_award':'object',
        'prime_award_program_activities_funding_this_award':'object',
        'prime_awardee_uei':'object',
        'prime_awardee_duns':'float64',
        'prime_awardee_name':'object',
        'prime_awardee_dba_name':'object',
        'prime_awardee_parent_uei':'object',
        'prime_awardee_parent_duns':'float64',
        'prime_awardee_parent_name':'object',
        'prime_awardee_country_code':'object',
        'prime_awardee_country_name':'object',
        'prime_awardee_address_line_1':'object',
        'prime_awardee_city_name':'object',
        'prime_awardee_county_fips_code':'float64',
        'prime_awardee_county_name':'object',
        'prime_awardee_state_fips_code':'float64',
        'prime_awardee_state_code':'object',
        'prime_awardee_state_name':'object',
        'prime_awardee_zip_code':'object',
        'prime_award_summary_recipient_cd_original':'object',
        'prime_award_summary_recipient_cd_current':'object',
        'prime_awardee_foreign_postal_code':'object',
        'prime_awardee_business_types':'object',
        'prime_award_primary_place_of_performance_scope':'object',
        'prime_award_primary_place_of_performance_city_name':'object',
        'prime_award_primary_place_of_performance_county_fips_code':'float64',
        'prime_award_primary_place_of_performance_county_name':'object',
        'prime_award_primary_place_of_performance_state_fips_code':'float64',
        'prime_award_primary_place_of_performance_state_code':'object',
        'prime_award_primary_place_of_performance_state_name':'object',
        'prime_award_primary_place_of_performance_address_zip_code':'object',
        'prime_award_summary_place_of_performance_cd_original':'object',
        'prime_award_summary_place_of_performance_cd_current':'object',
        'prime_award_primary_place_of_performance_country_code':'object',
        'prime_award_primary_place_of_performance_country_name':'object',
        'prime_award_base_transaction_description':'object',
        'prime_award_cfda_numbers_and_titles':'object',
        'subaward_type':'object',
        'subaward_fsrs_report_id':'object',
        'subaward_fsrs_report_year':'int64',
        'subaward_fsrs_report_month':'int64',
        'subaward_number':'object',
        'subaward_amount':'float64',
        'subaward_action_date':'object',
        'subaward_action_date_fiscal_year':'int64',
        'subawardee_uei':'object',
        'subawardee_duns':'float64',
        'subawardee_name':'object',
        'subawardee_dba_name':'object',
        'subawardee_parent_uei':'object',
        'subawardee_parent_duns':'float64',
        'subawardee_parent_name':'object',
        'subawardee_country_code':'object',
        'subawardee_country_name':'object',
        'subawardee_address_line_1':'object',
        'subawardee_city_name':'object',
        'subawardee_state_code':'object',
        'subawardee_state_name':'object',
        'subawardee_zip_code':'object',
        'subaward_recipient_cd_original':'object',
        'subaward_recipient_cd_current':'object',
        'subawardee_foreign_postal_code':'object',
        'subawardee_business_types':'object',
        'subaward_primary_place_of_performance_city_name':'object',
        'subaward_primary_place_of_performance_state_code':'object',
        'subaward_primary_place_of_performance_state_name':'object',
        'subaward_primary_place_of_performance_address_zip_code':'object',
        'subaward_place_of_performance_cd_original':'object',
        'subaward_place_of_performance_cd_current':'object',
        'subaward_primary_place_of_performance_country_code':'object',
        'subaward_primary_place_of_performance_country_name':'object',
        'subaward_description':'object',
        'subawardee_highly_compensated_officer_1_name':'object',
        'subawardee_highly_compensated_officer_1_amount':'float64',
        'subawardee_highly_compensated_officer_2_name':'object',
        'subawardee_highly_compensated_officer_2_amount':'float64',
        'subawardee_highly_compensated_officer_3_name':'object',
        'subawardee_highly_compensated_officer_3_amount':'float64',
        'subawardee_highly_compensated_officer_4_name':'object',
        'subawardee_highly_compensated_officer_4_amount':'float64',
        'subawardee_highly_compensated_officer_5_name':'object',
        'subawardee_highly_compensated_officer_5_amount':'float64',
        'usaspending_permalink':'object',
        'subaward_fsrs_report_last_modified_date':'object'
    }

    if len(os.listdir(data_folder)) == 0:
        with zipfile.ZipFile(zip_file_path, 'r') as zip_file:
            zip_file.extractall(data_folder)

        prime_prefix = 'All_Assistance_Prime'
        subaward_prefix = 'All_Assistance_Subaward'
    
        og_prime_file_path = glob.glob(os.path.join(data_folder, f'{prime_prefix}*.txt'))[0]
        os.rename(og_prime_file_path,os.path.join(data_folder,f'prime_awards_{financial_year}.txt'))
        print('renamed prime award file')

        og_subaward_file_path = glob.glob(os.path.join(data_folder, f'{subaward_prefix}*.txt'))[0]
        os.rename(og_subaward_file_path,os.path.join(data_folder,f'sub_awards_{financial_year}.txt'))
        print('renamed subaward file')

        new_prime_file_name = os.path.join(data_folder,f'prime_awards_{financial_year}.txt')
        new_subaward_file_name = os.path.join(data_folder,f'sub_awards_{financial_year}.txt')

        prime_award_df = pd.read_csv(new_prime_file_name, sep='|')
        sub_award_df = pd.read_csv(new_subaward_file_name, sep='|')
    
    else:
        new_prime_file_name = os.path.join(data_folder,f'prime_awards_{financial_year}.txt')
        new_subaward_file_name = os.path.join(data_folder,f'sub_awards_{financial_year}.txt')

        prime_award_df = pd.read_csv(new_prime_file_name, sep='|', dtype=prime_dtypes)
        sub_award_df = pd.read_csv(new_subaward_file_name, sep='|', dtype=sub_dtypes)

    print('There are',len(prime_award_df),'prime awards')
    print('There are',len(sub_award_df),'subawards')
    
    return prime_award_df, sub_award_df

In [None]:
fy_2023_prime_awards, fy_2023_sub_awards = usa_spending_df(2023)

In [None]:
for x in fy_2023_sub_awards.columns:
    if 'date' in x:
        print(x, 'DATE,')
    elif fy_2023_sub_awards[x].dtype == 'object':
        print(x, 'VARCHAR,')
    elif fy_2023_sub_awards[x].dtype == 'float64' or fy_2023_sub_awards[x].dtype == 'int64':
        print(x, 'NUMBER,')

In [None]:
for x in fy_2023_sub_awards.columns:
    if 'date' in x.lower():
        print(f"StructField('{x}', DateType()),")
    elif fy_2023_sub_awards[x].dtype == 'object':
        print(f"StructField('{x}', StringType()),")
    elif fy_2023_sub_awards[x].dtype == 'float64' or fy_2023_sub_awards[x].dtype == 'int64':
        print(f"StructField('{x}', IntegerType()),")

In [84]:
import configparser
from snowflake.snowpark import Session
from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType, DateType

def push_to_snowflake(financial_year):

    data_folder_path = f'../data/usa_spending/{financial_year}'
        
    prime_awards, sub_awards = usa_spending_df(financial_year)
    
    config = configparser.ConfigParser()
    config_path = os.path.join("..", "credentials.ini")
    config.read(config_path)

    # Create a Snowflake session
    session = Session.builder.configs({
        'account':config["bdw_snowflake"]["ACCOUNT"],
        'user':config["bdw_snowflake"]["USER"],
        'password':config["bdw_snowflake"]["PASSWORD"],
        'role':config["bdw_snowflake"]["ROLE"],
        'warehouse':config["bdw_snowflake"]["WAREHOUSE"],
        'database':'DATA_SCIENCE',
        'schema':'FEDERAL_GRANTS'
    }).create()

    prime_schema = StructType([
        StructField('assistance_transaction_unique_key', StringType()),
        StructField('assistance_award_unique_key', StringType()),
        StructField('award_id_fain', StringType()),
        StructField('modification_number', StringType()),
        StructField('award_id_uri', StringType()),
        StructField('sai_number', StringType()),
        StructField('federal_action_obligation', IntegerType()),
        StructField('total_obligated_amount', IntegerType()),
        StructField('total_outlayed_amount_for_overall_award', IntegerType()),
        StructField('indirect_cost_federal_share_amount', IntegerType()),
        StructField('non_federal_funding_amount', IntegerType()),
        StructField('total_non_federal_funding_amount', IntegerType()),
        StructField('face_value_of_loan', IntegerType()),
        StructField('original_loan_subsidy_cost', IntegerType()),
        StructField('total_face_value_of_loan', IntegerType()),
        StructField('total_loan_subsidy_cost', IntegerType()),
        StructField('generated_pragmatic_obligations', IntegerType()),
        StructField('disaster_emergency_fund_codes_for_overall_award', StringType()),
        StructField('outlayed_amount_from_COVID_19_supplementals_for_overall_award', IntegerType()),
        StructField('obligated_amount_from_COVID_19_supplementals_for_overall_award', IntegerType()),
        StructField('outlayed_amount_from_IIJA_supplemental_for_overall_award', IntegerType()),
        StructField('obligated_amount_from_IIJA_supplemental_for_overall_award', IntegerType()),
        StructField('action_date', DateType()),
        StructField('action_date_fiscal_year', DateType()),
        StructField('period_of_performance_start_date', DateType()),
        StructField('period_of_performance_current_end_date', DateType()),
        StructField('awarding_agency_code', IntegerType()),
        StructField('awarding_agency_name', StringType()),
        StructField('awarding_sub_agency_code', StringType()),
        StructField('awarding_sub_agency_name', StringType()),
        StructField('awarding_office_code', StringType()),
        StructField('awarding_office_name', StringType()),
        StructField('funding_agency_code', IntegerType()),
        StructField('funding_agency_name', StringType()),
        StructField('funding_sub_agency_code', StringType()),
        StructField('funding_sub_agency_name', StringType()),
        StructField('funding_office_code', StringType()),
        StructField('funding_office_name', StringType()),
        StructField('treasury_accounts_funding_this_award', StringType()),
        StructField('federal_accounts_funding_this_award', StringType()),
        StructField('object_classes_funding_this_award', StringType()),
        StructField('program_activities_funding_this_award', StringType()),
        StructField('recipient_uei', StringType()),
        StructField('recipient_duns', IntegerType()),
        StructField('recipient_name', StringType()),
        StructField('recipient_name_raw', StringType()),
        StructField('recipient_parent_uei', StringType()),
        StructField('recipient_parent_duns', IntegerType()),
        StructField('recipient_parent_name', StringType()),
        StructField('recipient_parent_name_raw', StringType()),
        StructField('recipient_country_code', StringType()),
        StructField('recipient_country_name', StringType()),
        StructField('recipient_address_line_1', StringType()),
        StructField('recipient_address_line_2', StringType()),
        StructField('recipient_city_code', StringType()),
        StructField('recipient_city_name', StringType()),
        StructField('prime_award_transaction_recipient_county_fips_code', IntegerType()),
        StructField('recipient_county_name', StringType()),
        StructField('prime_award_transaction_recipient_state_fips_code', IntegerType()),
        StructField('recipient_state_code', StringType()),
        StructField('recipient_state_name', StringType()),
        StructField('recipient_zip_code', IntegerType()),
        StructField('recipient_zip_last_4_code', IntegerType()),
        StructField('prime_award_transaction_recipient_cd_original', StringType()),
        StructField('prime_award_transaction_recipient_cd_current', StringType()),
        StructField('recipient_foreign_city_name', StringType()),
        StructField('recipient_foreign_province_name', StringType()),
        StructField('recipient_foreign_postal_code', StringType()),
        StructField('primary_place_of_performance_scope', StringType()),
        StructField('primary_place_of_performance_country_code', StringType()),
        StructField('primary_place_of_performance_country_name', StringType()),
        StructField('primary_place_of_performance_code', StringType()),
        StructField('primary_place_of_performance_city_name', StringType()),
        StructField('prime_award_transaction_place_of_performance_county_fips_code', IntegerType()),
        StructField('primary_place_of_performance_county_name', StringType()),
        StructField('prime_award_transaction_place_of_performance_state_fips_code', IntegerType()),
        StructField('primary_place_of_performance_state_name', StringType()),
        StructField('primary_place_of_performance_zip_4', StringType()),
        StructField('prime_award_transaction_place_of_performance_cd_original', StringType()),
        StructField('prime_award_transaction_place_of_performance_cd_current', StringType()),
        StructField('primary_place_of_performance_foreign_location', StringType()),
        StructField('cfda_number', IntegerType()),
        StructField('cfda_title', StringType()),
        StructField('funding_opportunity_number', StringType()),
        StructField('funding_opportunity_goals_text', StringType()),
        StructField('assistance_type_code', IntegerType()),
        StructField('assistance_type_description', StringType()),
        StructField('transaction_description', StringType()),
        StructField('prime_award_base_transaction_description', StringType()),
        StructField('business_funds_indicator_code', StringType()),
        StructField('business_funds_indicator_description', StringType()),
        StructField('business_types_code', StringType()),
        StructField('business_types_description', StringType()),
        StructField('correction_delete_indicator_code', StringType()),
        StructField('correction_delete_indicator_description', StringType()),
        StructField('action_type_code', StringType()),
        StructField('action_type_description', StringType()),
        StructField('record_type_code', IntegerType()),
        StructField('record_type_description', StringType()),
        StructField('highly_compensated_officer_1_name', StringType()),
        StructField('highly_compensated_officer_1_amount', IntegerType()),
        StructField('highly_compensated_officer_2_name', StringType()),
        StructField('highly_compensated_officer_2_amount', IntegerType()),
        StructField('highly_compensated_officer_3_name', StringType()),
        StructField('highly_compensated_officer_3_amount', IntegerType()),
        StructField('highly_compensated_officer_4_name', StringType()),
        StructField('highly_compensated_officer_4_amount', IntegerType()),
        StructField('highly_compensated_officer_5_name', StringType()),
        StructField('highly_compensated_officer_5_amount', IntegerType()),
        StructField('usaspending_permalink', StringType()),
        StructField('initial_report_date', DateType()),
        StructField('last_modified_date', DateType())
    ])

    sub_schema = StructType([
        StructField('prime_award_unique_key', StringType()),
        StructField('prime_award_fain', StringType()),
        StructField('prime_award_amount', IntegerType()),
        StructField('prime_award_disaster_emergency_fund_codes', StringType()),
        StructField('prime_award_outlayed_amount_from_COVID_19_supplementals', IntegerType()),
        StructField('prime_award_obligated_amount_from_COVID_19_supplementals', IntegerType()),
        StructField('prime_award_outlayed_amount_from_IIJA_supplemental', IntegerType()),
        StructField('prime_award_obligated_amount_from_IIJA_supplemental', IntegerType()),
        StructField('prime_award_total_outlayed_amount', IntegerType()),
        StructField('prime_award_base_action_date', DateType()),
        StructField('prime_award_base_action_date_fiscal_year', DateType()),
        StructField('prime_award_latest_action_date', DateType()),
        StructField('prime_award_latest_action_date_fiscal_year', DateType()),
        StructField('prime_award_period_of_performance_start_date', DateType()),
        StructField('prime_award_period_of_performance_current_end_date', DateType()),
        StructField('prime_award_awarding_agency_code', IntegerType()),
        StructField('prime_award_awarding_agency_name', StringType()),
        StructField('prime_award_awarding_sub_agency_code', StringType()),
        StructField('prime_award_awarding_sub_agency_name', StringType()),
        StructField('prime_award_awarding_office_code', StringType()),
        StructField('prime_award_awarding_office_name', StringType()),
        StructField('prime_award_funding_agency_code', IntegerType()),
        StructField('prime_award_funding_agency_name', StringType()),
        StructField('prime_award_funding_sub_agency_code', StringType()),
        StructField('prime_award_funding_sub_agency_name', StringType()),
        StructField('prime_award_funding_office_code', StringType()),
        StructField('prime_award_funding_office_name', StringType()),
        StructField('prime_award_treasury_accounts_funding_this_award', StringType()),
        StructField('prime_award_federal_accounts_funding_this_award', StringType()),
        StructField('prime_award_object_classes_funding_this_award', StringType()),
        StructField('prime_award_program_activities_funding_this_award', StringType()),
        StructField('prime_awardee_uei', StringType()),
        StructField('prime_awardee_duns', IntegerType()),
        StructField('prime_awardee_name', StringType()),
        StructField('prime_awardee_dba_name', StringType()),
        StructField('prime_awardee_parent_uei', StringType()),
        StructField('prime_awardee_parent_duns', IntegerType()),
        StructField('prime_awardee_parent_name', StringType()),
        StructField('prime_awardee_country_code', StringType()),
        StructField('prime_awardee_country_name', StringType()),
        StructField('prime_awardee_address_line_1', StringType()),
        StructField('prime_awardee_city_name', StringType()),
        StructField('prime_awardee_county_fips_code', IntegerType()),
        StructField('prime_awardee_county_name', StringType()),
        StructField('prime_awardee_state_fips_code', IntegerType()),
        StructField('prime_awardee_state_code', StringType()),
        StructField('prime_awardee_state_name', StringType()),
        StructField('prime_awardee_zip_code', StringType()),
        StructField('prime_award_summary_recipient_cd_original', StringType()),
        StructField('prime_award_summary_recipient_cd_current', StringType()),
        StructField('prime_awardee_foreign_postal_code', StringType()),
        StructField('prime_awardee_business_types', StringType()),
        StructField('prime_award_primary_place_of_performance_scope', StringType()),
        StructField('prime_award_primary_place_of_performance_city_name', StringType()),
        StructField('prime_award_primary_place_of_performance_county_fips_code', IntegerType()),
        StructField('prime_award_primary_place_of_performance_county_name', StringType()),
        StructField('prime_award_primary_place_of_performance_state_fips_code', IntegerType()),
        StructField('prime_award_primary_place_of_performance_state_code', StringType()),
        StructField('prime_award_primary_place_of_performance_state_name', StringType()),
        StructField('prime_award_primary_place_of_performance_address_zip_code', StringType()),
        StructField('prime_award_summary_place_of_performance_cd_original', StringType()),
        StructField('prime_award_summary_place_of_performance_cd_current', StringType()),
        StructField('prime_award_primary_place_of_performance_country_code', StringType()),
        StructField('prime_award_primary_place_of_performance_country_name', StringType()),
        StructField('prime_award_base_transaction_description', StringType()),
        StructField('prime_award_cfda_numbers_and_titles', StringType()),
        StructField('subaward_type', StringType()),
        StructField('subaward_fsrs_report_id', StringType()),
        StructField('subaward_fsrs_report_year', IntegerType()),
        StructField('subaward_fsrs_report_month', IntegerType()),
        StructField('subaward_number', StringType()),
        StructField('subaward_amount', IntegerType()),
        StructField('subaward_action_date', DateType()),
        StructField('subaward_action_date_fiscal_year', DateType()),
        StructField('subawardee_uei', StringType()),
        StructField('subawardee_duns', IntegerType()),
        StructField('subawardee_name', StringType()),
        StructField('subawardee_dba_name', StringType()),
        StructField('subawardee_parent_uei', StringType()),
        StructField('subawardee_parent_duns', IntegerType()),
        StructField('subawardee_parent_name', StringType()),
        StructField('subawardee_country_code', StringType()),
        StructField('subawardee_country_name', StringType()),
        StructField('subawardee_address_line_1', StringType()),
        StructField('subawardee_city_name', StringType()),
        StructField('subawardee_state_code', StringType()),
        StructField('subawardee_state_name', StringType()),
        StructField('subawardee_zip_code', StringType()),
        StructField('subaward_recipient_cd_original', StringType()),
        StructField('subaward_recipient_cd_current', StringType()),
        StructField('subawardee_foreign_postal_code', StringType()),
        StructField('subawardee_business_types', StringType()),
        StructField('subaward_primary_place_of_performance_city_name', StringType()),
        StructField('subaward_primary_place_of_performance_state_code', StringType()),
        StructField('subaward_primary_place_of_performance_state_name', StringType()),
        StructField('subaward_primary_place_of_performance_address_zip_code', StringType()),
        StructField('subaward_place_of_performance_cd_original', StringType()),
        StructField('subaward_place_of_performance_cd_current', StringType()),
        StructField('subaward_primary_place_of_performance_country_code', StringType()),
        StructField('subaward_primary_place_of_performance_country_name', StringType()),
        StructField('subaward_description', StringType()),
        StructField('subawardee_highly_compensated_officer_1_name', StringType()),
        StructField('subawardee_highly_compensated_officer_1_amount', IntegerType()),
        StructField('subawardee_highly_compensated_officer_2_name', StringType()),
        StructField('subawardee_highly_compensated_officer_2_amount', IntegerType()),
        StructField('subawardee_highly_compensated_officer_3_name', StringType()),
        StructField('subawardee_highly_compensated_officer_3_amount', IntegerType()),
        StructField('subawardee_highly_compensated_officer_4_name', StringType()),
        StructField('subawardee_highly_compensated_officer_4_amount', IntegerType()),
        StructField('subawardee_highly_compensated_officer_5_name', StringType()),
        StructField('subawardee_highly_compensated_officer_5_amount', IntegerType()),
        StructField('usaspending_permalink', StringType()),
        StructField('subaward_fsrs_report_last_modified_date', DateType())
    ])

    # CREATE SNOWPARK DATAFRAME
    prime_snowpark_df = session.create_dataframe(prime_awards)
    sub_snowpark_df = session.create_dataframe(sub_awards)

    # COPY INTO STAGE
    prime_snowpark_df.write.copy_into_location(f"@federal_grant_stage/usa_spending_prime/fy_{financial_year}_data", overwrite=True)
    sub_snowpark_df.write.copy_into_location(f"@federal_grant_stage/usa_spending_subaward/fy_{financial_year}_data", overwrite=True)

    # DELTE FILES FROM LOCAL DIRECTORY
    if os.path.exists(data_folder_path):
        for filename in os.listdir(data_folder_path):
            file_path = os.path.join(data_folder_path, filename)
            if os.path.isfile(file_path):
                os.remove(file_path)
    else:
            os.makedirs(data_folder_path, exist_ok=True)

    print("Old files deleted.")

    # Specify the stage and pattern for table names
    prime_stage_name = "@federal_grant_stage/usa_spending_prime"
    sub_stage_name = "@federal_grant_stage/usa_spending_subaward"

    # Load tables from the stage into a single DataFrame
    prime_snowpark_df = session.read.options({"field_delimiter": ","}).schema(prime_schema).csv(prime_stage_name)
    subaward_snowpark_df = session.read.options({"field_delimiter": ","}).schema(sub_schema).csv(sub_stage_name)

    prime_snowpark_df.copy_into_table('federal_prime_awards')
    print(prime_snowpark_df.count(), "new rows added to existing federal_prime_awards table.")
    subaward_snowpark_df.copy_into_table('federal_sub_awards')
    print(subaward_snowpark_df.count(), "new rows added to existing federal_sub_awards table.")

In [85]:
push_to_snowflake(2021)

2334580 new rows added to existing federal_prime_awards table.
1504293 new rows added to existing federal_sub_awards table.
