In [None]:

import csv
from datetime import datetime
from email.utils import parsedate_to_datetime
import os
import pandas as pd
import psycopg
from sqlalchemy import create_engine, text
import shutil
import urllib.request
import zipfile

#####################################################
# NORTH CAROLINA STATE BOARD OF ELECTIONS PUBLIC DATA
#####################################################

# URLs for the voter registration and history files
VREG_URL = "https://s3.amazonaws.com/dl.ncsbe.gov/data/ncvoter_Statewide.zip"
VHIS_URL = "https://s3.amazonaws.com/dl.ncsbe.gov/data/ncvhis_Statewide.zip"
fileUrls = [VREG_URL, VHIS_URL]

# Specifiy the encoding of the files
FILE_ENCODING = "latin-1"

# Get the download filenames, which are zip files
vRegZipName = VREG_URL.split("/")[-1]
vHisZipName = VHIS_URL.split("/")[-1]
fileZips = [vRegZipName, vHisZipName]

# Get the stems of the filenames
vRegFileStem = vRegZipName.split(".")[0]
vHisFileStem = vHisZipName.split(".")[0]
vFileStems = [vRegFileStem]

# print(f"vRegZipName: {vRegZipName}")
# print(f"vHisZipName: {vHisZipName}")
# print(f"vRegFileStem: {vRegFileStem}")
# print(f"vHisFileStem: {vHisFileStem}")

##################################
# POSTGRESQL DB CONNECTION DETAILS
##################################

# Your PostgreSQL username
DB_USER = ""
# Tour PostgreSQL password
DB_PASSWORD = ""
# Your actual host
DB_HOST = ""
# Default PostgreSQL port
DB_PORT = ""
# Your target database name
DB_NAME = ""
# Your target database schema
DB_SCHEMA = ""
# Naming schema for your target table(s)
TABLE_PREFIX = "nc_"
TABLE_SUFFIX = "_history"

dbPath = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
vRegTablename = TABLE_PREFIX + "vreg" + TABLE_SUFFIX
vHisTablename = TABLE_PREFIX + "vhis" + TABLE_SUFFIX
vRegTablePath = dbPath + "/" + vRegTablename
vHisTablePath = dbPath + "/" + vHisTablename
dbTablePaths = [vRegTablePath, vHisTablePath]

# print(f"dbPath: {dbPath}")
# print(f"vRegTablename: {vRegTablename}")
# print(f"vHisTablename: {vHisTablename}")
# print(f"vRegTablePath: {vRegTablePath}")
# print(f"vHisTablePath: {vHisTablePath}")
# print(f"dbTablePaths: {dbTablePaths}")

##########################
# FILE TO SQL RELATIONSHIP
##########################

voterFilesDict = {vRegFileStem: vRegTablePath, vHisFileStem: vHisTablePath}

# print(voterFilesDict)

################
# DIRECTORY INFO
################

# Download folder path
DOWNLOAD_PATH = r""
# Archive path
ARCHIVE_PATH = r""

# print(f"DOWNLOAD_PATH: {DOWNLOAD_PATH}")
# print(f"ARCHIVE_PATH: {ARCHIVE_PATH}")

############
# CORE LOGIC
############

def main():
    # Conditional Bypass!
    # If you're updating the voter history/registration files for the week and have already run the script but had errors after:
    # a) successfully running the copySqlTablesToArchive() function the full way through
    # b) sucessfully running the downloadFiles() function the full way through
    # You can skip those functions by assigning the fileDate variable with the data_date for the current run
    # The date should be formatted as a string and follow the pattern YYYYMMDD; for example, "20250101" or "20241031"
    # If the fileDate is equal to "YYYYMMDD" then the code will run all the functions
    # Otherwise, it will pick up with the processFiles() function
    fileDate = "YYYYMMDD"
    if fileDate == "YYYYMMDD":
        # Run the full pipeline; archive >>> download >>> process
        copySqlTablesToArchive(dbPath, dbTablePaths, ARCHIVE_PATH)
        fileDate = downloadFiles(fileUrls, DOWNLOAD_PATH, ARCHIVE_PATH)
        processFiles(DOWNLOAD_PATH, voterFilesDict, FILE_ENCODING, fileDate)
        print("Processing complete!")
    else:
        try:
            # Validate the fileDate is in YYYYMMDD format
            datetime.strptime(fileDate, "%Y%m%d")
            # If valid, continue straight to the processFiles() function
            processFiles(DOWNLOAD_PATH, voterFilesDict, FILE_ENCODING, fileDate)
            print("Processing complete!")
        except ValueError:
            print(f"Invalid fileDate: {fileDate}. Must be a string in YYYYMMDD format.")

# At the start of the process, copy the target tables so that in the event something goes wrong while the script is running, the data is preserved
# At the moment, the function requires three arguments:
# The first is a string of the path to the database as a URL
# The second is a list of the tables in the database that are going to be copied/archived; these should also be in URL format
# The third is a path to the directory where the tables are going to be copied/arvhived to
def copySqlTablesToArchive(databasePath, tablePathsList, destination):
    for tablePath in tablePathsList:
        # Get tablename
        table = tablePath.split("/")[-1]
        # Define outfile name
        # outfileName = f"{destination}\{table}.tsv"
        outfileName = os.path.join(destination, f"{table}.tsv")
        # Connect to database
        with psycopg.connect(databasePath) as conn:
            # Open a cursor to perform database operations
            with conn.cursor() as cur, open(outfileName, "wb") as outfile:
                print(f"Preparing to copy and archive the {table} table...")
                # Execute a command: this copies and then exports a table
                cur.execute("START TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY;")
                copySql = f"""
                COPY "{table}"
                TO STDOUT WITH (FORMAT csv, DELIMITER E'\\t', HEADER true, ENCODING 'UTF8')
                """
                with cur.copy(copySql) as cp:
                    for chunk in cp:
                        outfile.write(chunk)
        print("Wrote: ", outfileName)

# Download the zip files from the NC State Board of Elections
# Requires a list of URLs to the zip files
def downloadFiles(urlsList, downloadDir, archiveDir):
    # Get the string that will populate the "data_date" field
    # Both files from the state are refreshed on the same day; only one needs to be referenced for the correct date
    dateStr = getLastModifiedDate(urlsList[0])
    for url in urlsList:
        try:
            # Download the zip file
            zipFilename = os.path.basename(url)
            zipDest = os.path.join(downloadDir, zipFilename)
            print(f"Downloading {zipFilename} to {downloadDir}...")
            urllib.request.urlretrieve(url, zipDest)
            # Create a copy of the download in the archive
            copyDownloadToArchive(zipDest, archiveDir, dateStr)
            # Unzip the download
            unzipFile(zipDest)
            # Delete the zip from the downloads; archive already has a copy
            os.remove(zipDest)
        except Exception:
            print(f"Error downloading:\n{url}")
            print(f"Exception:\n{Exception}")
    return dateStr

# Process the unzipped files
# For the voter history file, this entails using an anti-join against the master table to identify what is new as the data in the public file is only looking back 10 years
# For the voter registration file, this entails going through each record in the latest voter registration file to see if anything doesn't match exactly to the most recent data for each NCID
# These changes can be anything, from a party affiliation change to moving to redistricting to becoming a year older
# In fact, at the start of the year, every record will be treated as different because there is a field that contains the voters age at the end of the year
def processFiles(downloadPath, filesDict, encoding, dateStr):
    # Store the filenames for each file type; they will be processed in different ways
    registrationFile = "ncvoter_Statewide"
    historyFile = "ncvhis_Statewide"
    for stem, sqlTable in filesDict.items():
        # Construction the location of the file so it can be read into a dataframe
        # fileLocation = f"{downloadPath}\{stem}.txt"
        fileLocation = os.path.join(downloadPath, f"{stem}.txt")
        # Define the encoding; historically this has been latin-1 but it's possible this can change in the future
        newDataDf = pd.read_csv(fileLocation, sep="\t", dtype="str", na_filter=False, encoding=encoding)
        print(f"Successfully loaded {stem} into a dataframe.")
        # Delete the .txt file after successfully loading into dataframe
        os.remove(fileLocation)
        # Clean up the extraneous whitespace
        newDataDf = normalizeWhitespace(newDataDf)
        # Drop the table name from the table path in sqlTable
        sqlDb = sqlTable.rsplit("/", 1)[0]
        if stem == registrationFile:
            # Add the 'data_date' field
            newDataDf["data_date"] = dateStr
            # Connect to the right table in the database
            engine = create_engine(sqlDb, pool_pre_ping=True)
            # Query the database; this query will return the latest record, using the data_date field, per NCID
            # This will be used to determine if there have been any changes in the the latest data
            query = text(f"""
            SELECT DISTINCT ON (ncid) *
            FROM {DB_SCHEMA}.{vRegTablename}
            ORDER BY ncid, data_date DESC
            """)
            with engine.connect() as connection:
                masterDataDf = pd.read_sql_query(query, connection)
            # print(masterData.head(10))
            # Ensure the same column order as in the SQL table
            newDataDf = newDataDf[masterDataDf.columns]
            # Append the new data to the master data (the most recent data from the SQL database)
            combinedDataDf = pd.concat([masterDataDf, newDataDf], ignore_index=True)
            # I am only interested in records from the new data that are actually new
            # This will be determined by ignoring the data_date field and searching for duplication
            # First, create a list of the field names except data_date; this will be used as a subset in a drop_duplicates() function call
            fieldSubset = list(combinedDataDf.columns.drop("data_date"))
            # Then deduplicate the data
            dedupedDataDf = combinedDataDf.drop_duplicates(subset=fieldSubset, keep="first")
            # Filter the table to those where the data_date is from the latest batch
            dedupedDataDf = dedupedDataDf[dedupedDataDf["data_date"] == dateStr].reset_index(drop=True)
            # Append the truly new data to the master table in SQL
            with engine.begin() as connection:
                dedupedDataDf.to_sql(
                    name=vRegTablename,
                    schema=DB_SCHEMA,
                    con=connection,
                    if_exists="append",
                    index=False
                )
            print(f"New data from ncvoter_Statewide.txt appended to {sqlTable}.")
        elif stem == historyFile:
            # Connect to the right table in the database
            engine = create_engine(sqlDb, pool_pre_ping=True)
            # Query the database; this query will return everything
            # This will be used to determine if there have been any changes in the the latest data
            query = text(f"""
            SELECT *
            FROM {DB_SCHEMA}.{vHisTablename}
            """)
            with engine.connect() as connection:
                masterDataDf = pd.read_sql_query(query, connection)
            # print(masterData.head(10))
            # Join the new data to the master data (the most recent data from the SQL database)
            # Ensure the columns align exactly (order and names)
            cols = list(masterDataDf.columns)
            newDataDf = newDataDf[cols].copy()
            mergedDf = newDataDf.merge(masterDataDf, how="left", on=cols, indicator=True)
            dedupedDataDf = mergedDf.loc[mergedDf["_merge"] == "left_only", cols].reset_index(drop=True)
            # Append the truly new data to the master table in SQL
            if not dedupedDataDf.empty:
                with engine.begin() as connection:
                    dedupedDataDf.to_sql(
                        name=vHisTablename,
                        schema=DB_SCHEMA,
                        con=connection,
                        if_exists="append",
                        index=False
                    )
                print(f"New data from ncvhis_Statewide.txt appended to {sqlTable}.")

# Obtain the "Last Modified" date on the new NC voter files
# This is used to populate the 'data_date' field in the destination table(s)
# The 'data_date' is the date that exact record was published
# If "Last Modified" cannot be obtained, the current date will be used instead
def getLastModifiedDate(downloadUrl):
    # Try HEAD first
    try:
        req = urllib.request.Request(downloadUrl, method="HEAD")
        with urllib.request.urlopen(req) as resp:
            lastModified = resp.headers.get("Last-Modified")
    # Fallback to GET
    except Exception:
        with urllib.request.urlopen(downloadUrl) as resp:
            lastModified = resp.headers.get("Last-Modified")
    if lastModified:
        try:
            modificationDate = parsedate_to_datetime(lastModified)
            modificationDate = modificationDate.strftime("%Y%m%d")
            return modificationDate
        except:
            # Proceed to use the current date
            pass
    else:
        print(f"Unable to access/determine the modification date of the file for {downloadUrl}. Using the current date instead.")
        currentDate = datetime.now(datetime.timezone.utc).date()
        currentDate = currentDate.strftime("%Y%m%d")
        return currentDate

# Normalize the whitespace in a dataframe
def normalizeWhitespace(dataframe):
    dataframe = dataframe.apply(lambda col: col.map(lambda x: ' '.join(x.split()) if isinstance(x, str) else x))
    return dataframe

# Copy the downloaded zip file to the archive to have a copy in case it's needed in the future
# Call this before you unzip and process the data
# Requires two argument:
# 1- the downloaded file; a zip in this case
# 2- the path to the directory where the downloaded will be preserved
# 3- the date string that will become the prefix of the filename as it's being copied; will signify the date of the data/download
def copyDownloadToArchive(download, archiveDir, prefixDate):
    filename = os.path.basename(download)
    archivedFilename = f"{prefixDate}_{filename}"
    finalArchivePath = os.path.join(archiveDir, archivedFilename)
    shutil.copy(download, finalArchivePath)
    print(f"A copy of {filename} has been copied to {archiveDir} as {archivedFilename}.")

# Unzips a zip file; the zip file is unzipped in the same location as the zip file
def unzipFile(zipPath):
    zipName = os.path.basename(zipPath)
    zipDir = os.path.dirname(zipPath)
    with zipfile.ZipFile(zipPath, mode='r') as zippedFile:
        zippedFile.extractall(zipDir)
    print(f"{zipName} has been unzipped in {zipDir}")

########################
# CALL THE MAIN FUNCTION
########################

if __name__ == "__main__":
    main()
