In [None]:
import os
import json
from google.oauth2 import service_account 
import requests
import zipfile
import pandas as pd
import numpy as np

In [None]:
import csv
import datetime
import difflib
import glob
import os

import pandas as pd

from datetime import timedelta, datetime

from fuzzywuzzy import fuzz
from fuzzywuzzy import process


In [None]:
bq_project_id = os.environ.get('BQ_PROJECT_ID')
# Establish BigQuery credentials
bq_account_creds = json.loads(os.environ.get('BQ_ACCOUNT_CREDS'))
bq_credentials = service_account.Credentials.from_service_account_info(bq_account_creds, scopes=["https://www.googleapis.com/auth/cloud-platform"])

In [None]:
panda_dtypes_dict = {
    'STRING': 'str',
    'INT64': 'int64',
    'FLOAT64': 'float64',
    'DATETIME': 'datetime'
}

contribution_schema_lst = [
    {'name': 'RecordID', 'type': 'INT64', 'mode': 'REQUIRED'},
    {'name': 'AmendedRecordID', 'type': 'INT64', 'mode': 'NULLABLE'},
    {'name': 'Amended', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Amendment', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'CO_ID', 'type': 'INT64', 'mode': 'REQUIRED'},
    {'name': 'CommitteeType', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'CommitteeName', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'CandidateName', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'ContributionType', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'ReceiptType', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'ContributionAmount', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
    {'name': 'ContributionDate', 'type': 'DATETIME', 'mode': 'REQUIRED'},
    {'name': 'FiledDate', 'type': 'DATETIME', 'mode': 'REQUIRED'},
    {'name': 'LastName', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'FirstName', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'MI', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Suffix', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Address1', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Address2', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'City', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'State', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Zip', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'ContributorType', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Employer', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Occupation', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'OccupationComments', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Electioneering', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Jurisdiction', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Explanation', 'type': 'STRING', 'mode': 'NULLABLE'}
]

expenditure_schema_lst = [
    {'name': 'RecordID', 'type': 'INT64', 'mode': 'REQUIRED'},
    {'name': 'AmendedRecordID', 'type': 'INT64', 'mode': 'NULLABLE'},
    {'name': 'Amended', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Amendment', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'CO_ID', 'type': 'INT64', 'mode': 'REQUIRED'},
    {'name': 'CommitteeType', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'CommitteeName', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'CandidateName', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'ExpenditureType', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'PaymentType', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'ExpenditureAmount', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
    {'name': 'ExpenditureDate', 'type': 'DATETIME', 'mode': 'REQUIRED'},
    {'name': 'FiledDate', 'type': 'DATETIME', 'mode': 'REQUIRED'},
    {'name': 'LastName', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'FirstName', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'MI', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Suffix', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Address1', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Address2', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'City', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'State', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Zip', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'DisbursementType', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Employer', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Occupation', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Electioneering', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Jurisdiction', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Explanation', 'type': 'STRING', 'mode': 'NULLABLE'}
]

loan_schema_lst = [
    {'name': 'RecordID', 'type': 'INT64', 'mode': 'REQUIRED'},
    {'name': 'AmendedRecordID', 'type': 'INT64', 'mode': 'NULLABLE'},
    {'name': 'Amended', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Amendment', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'CO_ID', 'type': 'INT64', 'mode': 'REQUIRED'},
    {'name': 'CommitteeType', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'CommitteeName', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'CandidateName', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'PaymentAmount', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
    {'name': 'PaymentDate', 'type': 'DATETIME', 'mode': 'REQUIRED'},
    {'name': 'FiledDate', 'type': 'DATETIME', 'mode': 'REQUIRED'},
    {'name': 'Name', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Address1', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Address2', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'City', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'State', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Zip', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Type', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Jurisdiction', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'Description', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'LoanSourceType', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'LoanAmount', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
    {'name': 'LoanBalance', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
    {'name': 'LoanDate', 'type': 'DATETIME', 'mode': 'REQUIRED'},
    {'name': 'LoanSource', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'InterestRate', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
    {'name': 'InterestPayment', 'type': 'FLOAT64', 'mode': 'NULLABLE'},
]

In [None]:
def get_sos_files(category_str: str, end_year_int: int):
    for i in range(1, (end_year_int - 2000)):
        print(f"Collecting {str(2000 + i)} {category_str} data")
        url_stem = 'https://tracer.sos.colorado.gov/PublicSite/Docs/BulkDataDownloads/'
        url = url_stem + str(2000 + i) + '_' + category_str + 'Data.csv.zip'

        req = requests.get(url)
        filename = url.split('/')[-1]

        with open(filename, 'wb') as output_file:
            output_file.write(req.content)
        print(f"Finished downloading {str(2000 + i)} data")
    
    return(print(f"Finished downloading all {category_str} data for 2000 to {str(end_year_int)}"))


In [None]:
def unzip_sos_files(category_str: str, end_year_int: int):
    for i in range(1, (end_year_int - 2000)):
        filename = str(2000 + i) +  '_' + category_str + 'Data.csv.zip'
        print(f"Unzipping {filename}")

        zip_ref = zipfile.ZipFile(filename)
        zip_ref.extractall('.')
        zip_ref.close()
    
    return(f"Finished unzipping all {category_str} data.")

In [None]:
def set_dtypes(df, schema_lst):
    col_dtypes_dict = {x.get('name'): panda_dtypes_dict.get(x.get('type')) for x in schema_lst}
    
    for col in list(df):
        print(f"Setting values on {col} column.")
        try:
            df[col] = df[col].astype('str')
            df = df.sort_values(col)

            if col_dtypes_dict.get(col) == 'datetime':
                date_pre_int = len(df)
                df = df.drop((df.index[(df[col] >= '2100-01-01 00:00:00')  & (df[col] != 'nan')].tolist()))
                print(f"Purged {date_pre_int - len(df)} from column {col}")
                df[col] = pd.to_datetime(df[col])
            elif col_dtypes_dict.get(col) == 'int64':
                df[col] = df[col].replace('nan', '0')
                df[col] = df[col].str.replace('.', '').astype('int64')
            else:
                df[col] = df[col].astype(col_dtypes_dict.get(col), errors='raise')
        except Exception as e:
            if 'nan' in str(e).lower():
                print("Bad nan value")
                df = df.drop(df.index[df[col].isnull()].tolist())

                if col_dtypes_dict.get(col) == 'datetime':
                    df[col] = pd.to_datetime(df[col])
                else:
                    df[col] = df[col].astype(col_dtypes_dict.get(col), errors='ignore')

            elif ('invalid' in str(e)) | ('format' in str(e)) | ('timestamp' in str(e)):
                bad_value = str(e).split(': ')[1]
                print(f"Bad data point, {bad_value} in column {col}. Skipping.")

                if df[col].astype('str').min() == bad_value.replace("'", ""):
                    df = df.drop(df.index[df[col] == bad_value.replace("'", "")])
                    df = df.drop(df.index[df[col] == df[col].astype('str').max()])
                else:
                    df = df.drop(df.index[df[col] >= bad_value.replace("'", "")].tolist())

                if col_dtypes_dict.get(col) == 'datetime':
                    df[col] = pd.to_datetime(df[col])
                else:
                    df[col] = df[col].astype(col_dtypes_dict.get(col), errors='raise')
            else:
                raise
    
    return(df)


In [None]:
def sos_to_gbq(category_str: str, schema_lst: list, end_year_int: int):
    _totals = pd.DataFrame(columns=['year', 'csv', 'uploaded'])

    for i in range(1, (end_year_int - 2000)):
        if i == 1:
            mode_str = 'replace'
        else:
            mode_str = 'append'

        filename = category_str + '/' + str(2000 + i) + '_' + category_str + 'Data.csv'
        print(f"Reading {filename} to DataFrame.")
        _df = pd.read_csv(filename, encoding='cp437', on_bad_lines='warn', low_memory=False)
        _totals.loc[i, 'year'] = str(2000 + i)
        _totals.loc[i, 'csv'] = len(_df)

        print(f"Setting {filename} data types.")
        _df = set_dtypes(_df, schema_lst)

        print(f"Uploading {filename} to BigQuery.")
        _df.to_gbq(destination_table='co_campaign_finance.' + category_str.lower(), project_id=bq_project_id, if_exists=mode_str, table_schema=schema_lst, credentials=bq_credentials)
        _totals.loc[i, 'uploaded'] = len(_df)

    return(_totals)

In [None]:
get_sos_files('Contribution', 2021)

In [None]:
unzip_sos_files('Contribution', 2021)

In [None]:
totals_df = sos_to_gbq('Contribution', contribution_schema_lst, 2021)

In [None]:
totals_df

In [None]:
get_sos_files('Expenditure', 2021)

In [None]:
unzip_sos_files('Expenditure', 2021)

In [None]:
totals_df = sos_to_gbq('Expenditure', expenditure_schema_lst, 2021)

In [None]:
totals_df

In [None]:
get_sos_files('Loan', 2021)

In [None]:
unzip_sos_files('Loan', 2021)

In [None]:
totals_df = sos_to_gbq('Loan', loan_schema_lst, 2021)

In [None]:
totals_df

In [None]:
loans_df = pd.read_csv(r'loan/2001_LoanData.csv', encoding='cp437', on_bad_lines='warn', low_memory=False)

In [None]:
loans_df.index[(loans_df['PaymentDate'].astype('str') >= '2100-01-01 00:00:00') & (loans_df['PaymentDate'].astype('str') != 'nan')].tolist()


In [None]:
df.index[(df[col] >= '2100-01-01 00:00:00') & (df.index[df[col] != 'nan'])].tolist()

In [None]:
loans_df