# Elections Database Application
## Implementation and Testing

Author: Eric Martin   

This Jupyter notebook implements all functional requirements for the Elections Department database application, including administrative activities and reporting queries.

### Database Connection Setup

In [None]:
import mysql.connector
from mysql.connector import Error
import pandas as pd
import getpass
from datetime import datetime, date
import warnings
warnings.filterwarnings('ignore')


# Database connection configuration


In [None]:

def get_database_connection():
    """Establish connection to the elections database"""
    try:
        # Get connection parameters
        host = input("Enter MySQL host (default: localhost): ") or "localhost"
        user = input("Enter MySQL username (default: elections_user): ") or "elections_user"
        password = getpass.getpass("Enter MySQL password: ")
        database = input("Enter database name (default: elections_db): ") or "elections_db"
        
        connection = mysql.connector.connect(
            host=host,
            user=user,
            password=password,
            database=database,
            autocommit=False
        )
        
        if connection.is_connected():
            print(f"Successfully connected to {database} database")
            return connection
            
    except Error as e:
        print(f"Error connecting to database: {e}")
        return None

# Establish connection
db_connection = get_database_connection()

### Utility Functions

In [None]:
def execute_query(connection, query, params=None, fetch=True):
    """Execute SQL query with error handling"""
    try:
        cursor = connection.cursor()
        cursor.execute(query, params or ())
        
        if fetch:
            columns = [desc[0] for desc in cursor.description] if cursor.description else []
            results = cursor.fetchall()
            cursor.close()
            return pd.DataFrame(results, columns=columns) if results else pd.DataFrame()
        else:
            connection.commit()
            cursor.close()
            return True
            
    except Error as e:
        print(f"Database error: {e}")
        connection.rollback()
        if 'cursor' in locals():
            cursor.close()
        return None if fetch else False

def call_procedure(connection, procedure_name, params):
    """Call stored procedure with error handling"""
    try:
        cursor = connection.cursor()
        cursor.callproc(procedure_name, params)
        
        # Get results if any
        results = []
        for result in cursor.stored_results():
            results.extend(result.fetchall())
            
        cursor.close()
        return results
        
    except Error as e:
        print(f"Procedure error: {e}")
        if 'cursor' in locals():
            cursor.close()
        return None

---
## Administrative Activities

### Poll Management

In [None]:
def create_new_poll():
    """Clerk creating a new poll"""
    print("=== A.1: CREATE NEW POLL ===")
    
    # Get poll details from user
    short_name = input("Enter poll short name (4 alphanumeric characters): ").upper()
    question_text = input("Enter poll question: ")
    
    print("\nEnter availability period:")
    start_date = input("Start date and time (YYYY-MM-DD HH:MM:SS): ")
    end_date = input("End date and time (YYYY-MM-DD HH:MM:SS): ")
    
    # Validate short name format
    if len(short_name) != 4 or not short_name.isalnum():
        print("Error: Short name must be exactly 4 alphanumeric characters")
        return False
    
    # Check if poll already exists
    check_query = "SELECT short_name FROM poll WHERE short_name = %s"
    existing = execute_query(db_connection, check_query, (short_name,))
    
    if not existing.empty:
        print(f"Error: Poll '{short_name}' already exists")
        return False
    
    # Insert new poll
    insert_query = """
        INSERT INTO poll (short_name, question_text, availability_start, availability_end)
        VALUES (%s, %s, %s, %s)
    """
    
    success = execute_query(db_connection, insert_query, 
                           (short_name, question_text, start_date, end_date), fetch=False)
    
    if success:
        print(f"✅ Poll '{short_name}' created successfully!")
        
        # Display the new poll
        display_query = "SELECT * FROM poll WHERE short_name = %s"
        new_poll = execute_query(db_connection, display_query, (short_name,))
        print("\nNew poll details:")
        print(new_poll.to_string(index=False))
        return True
    else:
        print("❌ Failed to create poll")
        return False

# Example usage
create_new_poll()

### Voter Registration

In [None]:
def folk_register_to_vote():
    """Flok registering to vote at a center"""
    print("=== A.2: FOLK REGISTRATION ===")
    
    # Get registration details
    folk_id = input("Enter your 16-digit personal ID: ")
    
    # Validate folk exists
    folk_check = "SELECT personal_id, first_name, last_name FROM folk WHERE personal_id = %s"
    folk_data = execute_query(db_connection, folk_check, (folk_id,))
    
    if folk_data.empty:
        print("❌ Folk ID not found")
        return False
    
    print(f"Welcome, {folk_data.iloc[0]['first_name']} {folk_data.iloc[0]['last_name']}!\n")
    
    # Show available polls
    polls_query = "SELECT short_name, question_text, availability_start, availability_end FROM poll ORDER BY short_name"
    polls = execute_query(db_connection, polls_query)
    print("Available polls:")
    print(polls.to_string(index=False))
    
    poll_id = input("\nEnter poll short name: ").upper()
    voting_date = input("Enter desired voting date (YYYY-MM-DD): ")
    
    # Show voting centers with current registrations for the date
    centers_query = """
        SELECT 
            vc.acronym,
            p.city,
            p.state,
            COUNT(r.folk_id) AS current_registrations
        FROM voting_center vc
        JOIN place p ON vc.place_id = p.place_id
        LEFT JOIN registration r ON vc.place_id = r.center_id 
            AND r.voting_date = %s 
            AND r.is_valid = TRUE
        GROUP BY vc.place_id, vc.acronym, p.city, p.state
        ORDER BY current_registrations, vc.acronym
    """
    
    centers = execute_query(db_connection, centers_query, (voting_date,))
    print("\nVoting centers and current registrations for", voting_date, ":")
    print(centers.to_string(index=False))
    
    center_acronym = input("\nChoose voting center acronym: ").upper()
    
    # Get center ID
    center_query = "SELECT place_id FROM voting_center WHERE acronym = %s"
    center_data = execute_query(db_connection, center_query, (center_acronym,))
    
    if center_data.empty:
        print("❌ Invalid voting center acronym")
        return False
    
    center_id = center_data.iloc[0]['place_id']
    
    # Check for existing registration
    existing_query = """
        SELECT registration_id FROM registration 
        WHERE folk_id = %s AND poll_id = %s AND center_id = %s AND voting_date = %s
    """
    existing = execute_query(db_connection, existing_query, (folk_id, poll_id, center_id, voting_date))
    
    if not existing.empty:
        print("❌ You are already registered for this poll at this center on this date")
        return False
    
    # Insert registration
    register_query = """
        INSERT INTO registration (folk_id, poll_id, center_id, voting_date)
        VALUES (%s, %s, %s, %s)
    """
    
    success = execute_query(db_connection, register_query, 
                           (folk_id, poll_id, center_id, voting_date), fetch=False)
    
    if success:
        print("✅ Registration successful!")
        
        # Show registration details
        details_query = """
            SELECT r.registration_id, r.folk_id, r.poll_id, vc.acronym, 
                   r.voting_date, r.is_valid
            FROM registration r
            JOIN voting_center vc ON r.center_id = vc.place_id
            WHERE r.folk_id = %s AND r.poll_id = %s AND r.center_id = %s AND r.voting_date = %s
            ORDER BY r.registration_datetime DESC LIMIT 1
        """
        details = execute_query(db_connection, details_query, (folk_id, poll_id, center_id, voting_date))
        print("\nRegistration details:")
        print(details.to_string(index=False))
        return True
    else:
        print("❌ Registration failed")
        return False

# Example usage
folk_register_to_vote()

### Poll Administration

In [None]:
def modify_poll_availability():
    """Clerk modifying availability period of a current poll"""
    print("=== A.3: MODIFY POLL AVAILABILITY ===")
    
    # Show current polls
    polls_query = "SELECT * FROM poll ORDER BY short_name"
    polls = execute_query(db_connection, polls_query)
    print("Current polls:")
    print(polls.to_string(index=False))
    
    poll_id = input("\nEnter poll short name to modify: ").upper()
    
    # Check if poll exists
    poll_check = "SELECT * FROM poll WHERE short_name = %s"
    poll_data = execute_query(db_connection, poll_check, (poll_id,))
    
    if poll_data.empty:
        print("❌ Poll not found")
        return False
    
    print("\nCurrent poll details:")
    print(poll_data.to_string(index=False))
    
    # Check for existing ballots (will be prevented by trigger)
    ballot_check = "SELECT COUNT(*) as ballot_count FROM ballot WHERE poll_id = %s"
    ballot_count = execute_query(db_connection, ballot_check, (poll_id,))
    
    if ballot_count.iloc[0]['ballot_count'] > 0:
        print(f"❌ Cannot modify poll: {ballot_count.iloc[0]['ballot_count']} ballots have been cast")
        return False
    
    # Get new availability period
    print("\nEnter new availability period:")
    new_start = input("New start date and time (YYYY-MM-DD HH:MM:SS): ")
    new_end = input("New end date and time (YYYY-MM-DD HH:MM:SS): ")
    
    # Update poll
    update_query = """
        UPDATE poll 
        SET availability_start = %s, availability_end = %s
        WHERE short_name = %s
    """
    
    success = execute_query(db_connection, update_query, 
                           (new_start, new_end, poll_id), fetch=False)
    
    if success:
        print("✅ Poll availability period updated successfully!")
        
        # Show updated poll
        updated_poll = execute_query(db_connection, poll_check, (poll_id,))
        print("\nUpdated poll details:")
        print(updated_poll.to_string(index=False))
        return True
    else:
        print("❌ Failed to update poll")
        return False

# Example usage
modify_poll_availability()

### Ballot Casting (with Transaction Support)

In [None]:
def cast_ballot_with_transaction():
    """Voter casting a ballot while confirming valid registration"""
    print("=== A.4: CAST BALLOT (TRANSACTIONAL) ===")
    
    # Get voter details
    folk_id = input("Enter your 16-digit personal ID: ")
    
    # Validate folk exists
    folk_check = "SELECT personal_id, first_name, last_name FROM folk WHERE personal_id = %s"
    folk_data = execute_query(db_connection, folk_check, (folk_id,))
    
    if folk_data.empty:
        print("❌ Folk ID not found")
        return False
    
    print(f"Welcome, {folk_data.iloc[0]['first_name']} {folk_data.iloc[0]['last_name']}!\n")
    
    # Show their valid registrations
    registrations_query = """
        SELECT r.registration_id, r.poll_id, p.question_text, vc.acronym, 
               r.voting_date, r.center_id
        FROM registration r
        JOIN poll p ON r.poll_id = p.short_name
        JOIN voting_center vc ON r.center_id = vc.place_id
        WHERE r.folk_id = %s AND r.is_valid = TRUE
        ORDER BY r.voting_date
    """
    
    registrations = execute_query(db_connection, registrations_query, (folk_id,))
    
    if registrations.empty:
        print("❌ No valid registrations found")
        return False
    
    print("Your valid registrations:")
    print(registrations.to_string(index=False))
    
    # Get voting choice
    poll_id = input("\nEnter poll ID to vote on: ").upper()
    center_id = int(input("Enter center ID: "))
    voting_date = input("Enter voting date (YYYY-MM-DD): ")
    
    print("\nVote choices: YES, NO, ABSTAIN")
    vote_choice = input("Enter your vote: ").upper()
    
    if vote_choice not in ['YES', 'NO', 'ABSTAIN']:
        print("❌ Invalid vote choice")
        return False
    
    # Use the transactional stored procedure
    try:
        cursor = db_connection.cursor()
        
        # Call the transactional procedure
        args = [folk_id, poll_id, vote_choice, center_id, voting_date, 0, '']
        cursor.callproc('cast_ballot_transaction', args)
        
        # Get the result
        cursor.execute("SELECT @_cast_ballot_transaction_5 AS result_code, @_cast_ballot_transaction_6 AS result_message")
        result = cursor.fetchone()
        
        result_code = result[0]
        result_message = result[1]
        
        cursor.close()
        
        if result_code == 0:
            print("✅", result_message)
            
            # Show the cast ballot
            ballot_query = """
                SELECT b.folk_id, b.poll_id, b.vote_choice, b.cast_datetime,
                       vc.acronym AS center
                FROM ballot b
                JOIN registration r ON b.registration_id = r.registration_id
                JOIN voting_center vc ON r.center_id = vc.place_id
                WHERE b.folk_id = %s AND b.poll_id = %s
            """
            ballot = execute_query(db_connection, ballot_query, (folk_id, poll_id))
            print("\nBallot details:")
            print(ballot.to_string(index=False))
            return True
        else:
            print("❌", result_message)
            return False
            
    except Error as e:
        print(f"❌ Transaction error: {e}")
        return False

# Example usage
cast_ballot_with_transaction()

### User Management

In [None]:
def remove_folk():
    """Removing a user and all their associated information"""
    print("=== A.5: REMOVE FOLK ===")
    
    # Show all folk
    folk_query = """
        SELECT personal_id, first_name, last_name, 
               CASE WHEN s.personal_id IS NOT NULL THEN 'Staff' ELSE 'Citizen' END AS type
        FROM folk f
        LEFT JOIN staff s ON f.personal_id = s.personal_id
        ORDER BY f.last_name, f.first_name
    """
    
    folk_list = execute_query(db_connection, folk_query)
    print("All folk in database:")
    print(folk_list.to_string(index=False))
    
    folk_id = input("\nEnter personal ID of folk to remove: ")
    
    # Check if folk exists and show their data
    folk_check = "SELECT * FROM folk WHERE personal_id = %s"
    folk_data = execute_query(db_connection, folk_check, (folk_id,))
    
    if folk_data.empty:
        print("❌ Folk ID not found")
        return False
    
    print("\nFolk to be removed:")
    print(folk_data.to_string(index=False))
    
    # Show associated data that will be removed
    print("\nAssociated data that will be removed:")
    
    # Registrations
    reg_query = "SELECT COUNT(*) AS registrations FROM registration WHERE folk_id = %s"
    reg_count = execute_query(db_connection, reg_query, (folk_id,))
    print(f"- {reg_count.iloc[0]['registrations']} registrations")
    
    # Ballots
    ballot_query = "SELECT COUNT(*) AS ballots FROM ballot WHERE folk_id = %s"
    ballot_count = execute_query(db_connection, ballot_query, (folk_id,))
    print(f"- {ballot_count.iloc[0]['ballots']} ballots")
    
    # Emails
    email_query = "SELECT COUNT(*) AS emails FROM email WHERE folk_id = %s"
    email_count = execute_query(db_connection, email_query, (folk_id,))
    print(f"- {email_count.iloc[0]['emails']} email addresses")
    
    # Staff schedules
    schedule_query = "SELECT COUNT(*) AS schedules FROM staff_schedule WHERE staff_id = %s"
    schedule_count = execute_query(db_connection, schedule_query, (folk_id,))
    print(f"- {schedule_count.iloc[0]['schedules']} staff schedules")
    
    confirm = input("\n⚠️  Are you sure you want to remove this folk and all associated data? (yes/no): ")
    
    if confirm.lower() != 'yes':
        print("Operation cancelled")
        return False
    
    # Remove folk (cascading deletes will handle associated data)
    try:
        db_connection.start_transaction()
        
        # Delete folk (cascades to staff, email, registrations, ballots, schedules)
        delete_query = "DELETE FROM folk WHERE personal_id = %s"
        cursor = db_connection.cursor()
        cursor.execute(delete_query, (folk_id,))
        
        if cursor.rowcount > 0:
            db_connection.commit()
            print("✅ Folk and all associated data removed successfully")
            cursor.close()
            return True
        else:
            db_connection.rollback()
            print("❌ No folk was removed")
            cursor.close()
            return False
            
    except Error as e:
        db_connection.rollback()
        print(f"❌ Error removing folk: {e}")
        if 'cursor' in locals():
            cursor.close()
        return False

# Example usage
remove_folk()

---
## Queries and Analysis

### Voter Information Query

In [None]:
def get_folks_info():
    """Retrieve comprehensive voter contact information"""
    print("=== VOTER INFORMATION QUERY ===")
    
    query = """
        SELECT 
            CONCAT(f.first_name, ' ', f.last_name) AS full_name,
            pl.city,
            e.email_address
        FROM folk f
        JOIN residence r ON f.residence_id = r.place_id
        JOIN place pl ON r.place_id = pl.place_id
        LEFT JOIN email e ON f.personal_id = e.folk_id
        ORDER BY f.last_name, f.first_name, e.email_address
    """
    
    result = execute_query(db_connection, query)
    print(f"Found {len(result)} records")
    print(result.to_string(index=False))
    return result

get_folks_info()

### Population Demographics by City

In [None]:
def get_city_demographics():
    """Retrieve population demographics organized by city and state"""
    print("=== POPULATION DEMOGRAPHICS ===")
    
    query = """
        SELECT 
            pl.city,
            pl.state,
            COUNT(f.personal_id) AS num_residents
        FROM place pl
        JOIN residence r ON pl.place_id = r.place_id
        JOIN folk f ON r.place_id = f.residence_id
        GROUP BY pl.city, pl.state
        HAVING COUNT(f.personal_id) > 0
        ORDER BY num_residents DESC, pl.city
    """
    
    result = execute_query(db_connection, query)
    print(f"Found {len(result)} cities with residents")
    print(result.to_string(index=False))
    return result

get_city_demographics()

### Voting Center Registration Summary

In [None]:
def get_center_registration_summary():
    """List each center with number of registered folks by zipcode (ascending)"""
    print("=== CENTER REGISTRATION SUMMARY ===")
    
    query = """
        SELECT 
            vc.acronym AS center_acronym,
            pl.city,
            pl.state,
            pl.zipcode,
            COUNT(DISTINCT reg.folk_id) AS registered_folks_count
        FROM voting_center vc
        JOIN place pl ON vc.place_id = pl.place_id
        LEFT JOIN registration reg ON vc.place_id = reg.center_id AND reg.is_valid = TRUE
        GROUP BY vc.place_id, vc.acronym, pl.city, pl.state, pl.zipcode
        ORDER BY pl.zipcode ASC
    """
    
    result = execute_query(db_connection, query)
    print(f"Found {len(result)} voting centers")
    print(result.to_string(index=False))
    return result

get_center_registration_summary()

### Center Registration Lookup

In [None]:
def lookup_center_registrations():
    """Find distinct identifiers and names of folks registered at given center in time period"""
    print("=== CENTER REGISTRATION LOOKUP ===")
    
    # Get parameters
    center_acronym = input("Enter voting center acronym: ").upper()
    start_date = input("Enter start date (YYYY-MM-DD): ")
    end_date = input("Enter end date (YYYY-MM-DD): ")
    
    query = """
        SELECT DISTINCT
            f.personal_id,
            CONCAT(f.first_name, ' ', f.last_name) AS full_name
        FROM folk f
        JOIN registration reg ON f.personal_id = reg.folk_id
        JOIN voting_center vc ON reg.center_id = vc.place_id
        WHERE vc.acronym = %s
          AND reg.voting_date BETWEEN %s AND %s
          AND reg.is_valid = TRUE
        ORDER BY f.last_name, f.first_name
    """
    
    result = execute_query(db_connection, query, (center_acronym, start_date, end_date))
    print(f"\nFound {len(result)} folks registered at {center_acronym} between {start_date} and {end_date}")
    print(result.to_string(index=False))
    return result

lookup_center_registrations()

### Geographic Registration Analysis

In [None]:
def analyze_geographic_registrations():
    """Find registrations within 3 miles from Megapolis, excluding given centers"""
    print("=== GEOGRAPHIC REGISTRATION ANALYSIS ===")
    
    # Get parameters
    target_month = input("Enter target month (YYYY-MM): ")
    exclusion_list = input("Enter centers to exclude (comma-separated, e.g., NRTH,SVAL): ")
    
    query = """
        SELECT 
            COUNT(DISTINCT reg.registration_id) AS unique_registrations_count
        FROM registration reg
        JOIN voting_center vc ON reg.center_id = vc.place_id
        JOIN place pl_vc ON vc.place_id = pl_vc.place_id
        JOIN place pl_megapolis ON pl_megapolis.city = 'Megapolis' AND pl_megapolis.place_type = 'VOTING_CENTER'
        WHERE DATE_FORMAT(reg.voting_date, '%Y-%m') = %s
          AND reg.is_valid = TRUE
          AND SQRT(POW(pl_vc.x_coordinate - pl_megapolis.x_coordinate, 2) + 
                   POW(pl_vc.y_coordinate - pl_megapolis.y_coordinate, 2)) <= 3
          AND FIND_IN_SET(vc.acronym, %s) = 0
    """
    
    result = execute_query(db_connection, query, (target_month, exclusion_list))
    print(f"\nRegistrations within 3 miles of Megapolis in {target_month}, excluding {exclusion_list}:")
    print(result.to_string(index=False))
    return result

analyze_geographic_registrations()

### Center Popularity Analytics

In [None]:
def get_popular_centers():
    """Find most popular voting center(s) in given city for time period"""
    print("=== CENTER POPULARITY ANALYTICS ===")
    
    # Get parameters
    city_name = input("Enter city name: ")
    start_period = input("Enter start date (YYYY-MM-DD): ")
    end_period = input("Enter end date (YYYY-MM-DD): ")
    
    query = """
        SELECT 
            vc.acronym,
            pl.city,
            COUNT(reg.registration_id) AS total_registrations
        FROM voting_center vc
        JOIN place pl ON vc.place_id = pl.place_id
        JOIN registration reg ON vc.place_id = reg.center_id
        WHERE pl.city = %s
          AND reg.voting_date BETWEEN %s AND %s
          AND reg.is_valid = TRUE
        GROUP BY vc.place_id, vc.acronym, pl.city
        HAVING COUNT(reg.registration_id) = (
            SELECT MAX(reg_count) 
            FROM (
                SELECT COUNT(r.registration_id) AS reg_count
                FROM voting_center vc2
                JOIN place pl2 ON vc2.place_id = pl2.place_id  
                JOIN registration r ON vc2.place_id = r.center_id
                WHERE pl2.city = %s
                  AND r.voting_date BETWEEN %s AND %s
                  AND r.is_valid = TRUE
                GROUP BY vc2.place_id
            ) AS subquery
        )
        ORDER BY vc.acronym
    """
    
    result = execute_query(db_connection, query, (city_name, start_period, end_period, city_name, start_period, end_period))
    print(f"\nMost popular voting centers in {city_name} between {start_period} and {end_period}:")
    print(result.to_string(index=False))
    return result

get_popular_centers()

### Universal Registration Report

In [None]:
def get_universal_registrants():
    """Find folks with valid registrations at every voting center in given state"""
    print("=== UNIVERSAL REGISTRATION REPORT ===")
    
    # Get parameter
    state_name = input("Enter state name: ")
    
    query = """
        SELECT DISTINCT
            f.personal_id,
            CONCAT(f.first_name, ' ', f.last_name) AS full_name
        FROM folk f
        WHERE NOT EXISTS (
            -- Find voting centers in the state where this folk is NOT registered
            SELECT vc.place_id
            FROM voting_center vc
            JOIN place pl ON vc.place_id = pl.place_id
            WHERE pl.state = %s
            AND NOT EXISTS (
                SELECT 1
                FROM registration reg
                WHERE reg.folk_id = f.personal_id
                  AND reg.center_id = vc.place_id
                  AND reg.is_valid = TRUE
            )
        )
        -- Ensure the folk has at least one registration in the state
        AND EXISTS (
            SELECT 1
            FROM registration reg
            JOIN voting_center vc ON reg.center_id = vc.place_id
            JOIN place pl ON vc.place_id = pl.place_id
            WHERE reg.folk_id = f.personal_id
              AND pl.state = %s
              AND reg.is_valid = TRUE
        )
        ORDER BY f.last_name, f.first_name
    """
    
    result = execute_query(db_connection, query, (state_name, state_name))
    print(f"\nFolks registered at every voting center in {state_name}:")
    print(result.to_string(index=False))
    return result

get_universal_registrants()

### Non-Optimal Registration Analysis

In [None]:
def analyze_distant_registrations():
    """Find folks registered at centers farther than closest center to their residence"""
    print("=== NON-OPTIMAL REGISTRATION ANALYSIS ===")
    
    query = """
        SELECT DISTINCT
            f.personal_id,
            CONCAT(f.first_name, ' ', f.last_name) AS full_name,
            reg_vc.acronym AS registered_center,
            ROUND(SQRT(POW(reg_pl.x_coordinate - res_pl.x_coordinate, 2) + 
                       POW(reg_pl.y_coordinate - res_pl.y_coordinate, 2)), 3) AS registered_distance
        FROM folk f
        JOIN residence res ON f.residence_id = res.place_id
        JOIN place res_pl ON res.place_id = res_pl.place_id
        JOIN registration reg ON f.personal_id = reg.folk_id
        JOIN voting_center reg_vc ON reg.center_id = reg_vc.place_id  
        JOIN place reg_pl ON reg_vc.place_id = reg_pl.place_id
        WHERE reg.is_valid = TRUE
          AND SQRT(POW(reg_pl.x_coordinate - res_pl.x_coordinate, 2) + 
                   POW(reg_pl.y_coordinate - res_pl.y_coordinate, 2)) > (
                SELECT MIN(SQRT(POW(pl2.x_coordinate - res_pl.x_coordinate, 2) + 
                               POW(pl2.y_coordinate - res_pl.y_coordinate, 2)))
                FROM voting_center vc2
                JOIN place pl2 ON vc2.place_id = pl2.place_id
                JOIN operating_period op2 ON vc2.place_id = op2.center_id
                WHERE reg.voting_date >= DATE(op2.period_start)
                  AND reg.voting_date <= DATE(op2.period_end)
            )
        ORDER BY f.last_name, f.first_name, reg_vc.acronym
    """
    
    result = execute_query(db_connection, query)
    print(f"Found {len(result)} folks registered at centers farther than their closest center")
    print(result.to_string(index=False))
    return result

analyze_distant_registrations()

### Geographic Center Assignment

In [None]:
def test_center_assignment_function():
    """Test SQL function to get closest voting center"""
    print("=== GEOGRAPHIC CENTER ASSIGNMENT ===")
    
    # Get parameters
    folk_id = input("Enter folk ID: ")
    target_date = input("Enter target date (YYYY-MM-DD): ")
    
    query = """
        SELECT 
            %s AS folk_id,
            %s AS target_date,
            get_closest_voting_center(%s, %s) AS closest_center_acronym
    """
    
    result = execute_query(db_connection, query, (folk_id, target_date, folk_id, target_date))
    print(f"\nClosest center result:")
    print(result.to_string(index=False))
    
    # Test with multiple folks
    print("\nTesting with all folks for date 2025-02-15:")
    multi_query = """
        SELECT 
            f.personal_id,
            CONCAT(f.first_name, ' ', f.last_name) AS full_name,
            '2025-02-15' AS test_date,
            get_closest_voting_center(f.personal_id, '2025-02-15') AS closest_center
        FROM folk f
        ORDER BY f.last_name, f.first_name
    """
    
    multi_result = execute_query(db_connection, multi_query)
    print(multi_result.to_string(index=False))
    return result, multi_result

test_center_assignment_function()

### Voting Pattern Analysis

In [None]:
def analyze_voting_patterns():
    """Cross-tabulation of voting centers to poll answers for given poll"""
    print("=== VOTING PATTERN ANALYSIS ===")
    
    # Show available polls with ballots
    polls_query = """
        SELECT p.short_name, p.question_text, COUNT(b.folk_id) AS ballot_count
        FROM poll p
        LEFT JOIN ballot b ON p.short_name = b.poll_id
        GROUP BY p.short_name, p.question_text
        ORDER BY p.short_name
    """
    
    polls = execute_query(db_connection, polls_query)
    print("Available polls:")
    print(polls.to_string(index=False))
    
    poll_id = input("\nEnter poll ID for cross-tabulation: ").upper()
    
    query = """
        SELECT 
            vc.acronym AS voting_center,
            SUM(CASE WHEN b.vote_choice = 'YES' THEN 1 ELSE 0 END) AS YES_votes,
            SUM(CASE WHEN b.vote_choice = 'NO' THEN 1 ELSE 0 END) AS NO_votes,
            SUM(CASE WHEN b.vote_choice = 'ABSTAIN' THEN 1 ELSE 0 END) AS ABSTAIN_votes,
            COUNT(b.folk_id) AS total_ballots
        FROM voting_center vc
        LEFT JOIN registration reg ON vc.place_id = reg.center_id AND reg.poll_id = %s
        LEFT JOIN ballot b ON reg.folk_id = b.folk_id AND reg.poll_id = b.poll_id
        GROUP BY vc.place_id, vc.acronym
        ORDER BY vc.acronym
    """
    
    result = execute_query(db_connection, query, (poll_id,))
    print(f"\nCross-tabulation for poll {poll_id}:")
    print(result.to_string(index=False))
    
    # Calculate totals
    if not result.empty:
        totals = result[['YES_votes', 'NO_votes', 'ABSTAIN_votes', 'total_ballots']].sum()
        print("\nTotals:")
        for col, val in totals.items():
            print(f"{col}: {val}")
    
    return result

analyze_voting_patterns()

---
## Database Summary and Statistics

In [None]:
def database_summary():
    """Display comprehensive database summary"""
    print("=== DATABASE SUMMARY ===")
    
    # Table counts
    tables = ['folk', 'staff', 'place', 'voting_center', 'residence', 
              'poll', 'registration', 'ballot', 'email', 'staff_schedule', 'operating_period']
    
    print("\nTable Record Counts:")
    for table in tables:
        count_query = f"SELECT COUNT(*) AS count FROM {table}"
        result = execute_query(db_connection, count_query)
        print(f"{table}: {result.iloc[0]['count']}")
    
    # Geographic distribution
    geo_query = """
        SELECT state, city, COUNT(*) AS place_count,
               GROUP_CONCAT(DISTINCT place_type) AS types
        FROM place
        GROUP BY state, city
        ORDER BY state, city
    """
    
    geo_result = execute_query(db_connection, geo_query)
    print("\nGeographic Distribution:")
    print(geo_result.to_string(index=False))
    
    # Voting statistics
    stats_query = """
        SELECT 
            COUNT(DISTINCT r.folk_id) AS unique_registered_voters,
            COUNT(r.registration_id) AS total_registrations,
            COUNT(DISTINCT b.folk_id) AS unique_voters_who_cast_ballots,
            COUNT(b.folk_id) AS total_ballots_cast,
            ROUND(COUNT(b.folk_id) / COUNT(r.registration_id) * 100, 2) AS turnout_percentage
        FROM registration r
        LEFT JOIN ballot b ON r.folk_id = b.folk_id AND r.poll_id = b.poll_id
        WHERE r.is_valid = TRUE
    """
    
    stats_result = execute_query(db_connection, stats_query)
    print("\nVoting Statistics:")
    print(stats_result.to_string(index=False))

database_summary()

---
## Cleanup and Connection Management

In [None]:
def close_connection():
    """Safely close database connection"""
    if db_connection and db_connection.is_connected():
        db_connection.close()
        print("Database connection closed successfully")
    else:
        print("No active database connection to close")

# Uncomment the line below to close the connection
# close_connection()