In [None]:
import pandas as pd
import os
import re
import warnings
import sqlite3
import numpy as np
import gradio as gr

class create_TW_presidentional_election_2024_DB:
  def __init__(self, folder_path ='data'):
    """
      Extracts county names from file names in a specified folder.

      Args:
          folder_path (str): The path to the folder. Defaults to 'data'.

      Returns:
          list: A list of extracted county names.
      """
    try: 
      file_names_lst = os.listdir(folder_path)
    # The regular expression pattern
    # This pattern looks for:
    # \(  - an opening parenthesis (needs to be escaped with \)
    # (.+) - a capturing group that matches any character (.) one or more times (+)
    # \)  - a closing parenthesis
    # when we use the full pattern \((.+)\), we are telling the computer: "Find an opening parenthesis \(, then capture whatever follows (.+), 
    # and stop when you find a closing parenthesis \). The captured part is what you need."
      pattern = r"\((.+)\)"
      county_names_lst = []
      for file_name in file_names_lst:
        match = re.search(pattern, file_name)
        if match:
          extracted_name = match.group(1)
        else:
          print(f"City name not found in the {file_name}.")
        county_names_lst.append(extracted_name)
        
      self.county_names = county_names_lst
      print(f">> county list saved to '_self_.county_names_lst'\n")
    
    except FileNotFoundError:
      print(f"The folder '{folder_path}' does not exist.\n")
      self.county_names = []

  def tidy_county_df(self, county_name:str):
    """
    Extracts and tidies election vote data for a specified county.

    Args:
        county_name (str): The name of the county for which to process data.

    Returns:
        pd.DataFrame: A cleaned DataFrame in a long format, ready for analysis.
    """
    
    # --- Import and tidy up up each county's votes data
    file_path = f'data\總統-A05-4-候選人得票數一覽表-各投開票所({county_name}).xlsx'
    city_df = pd.read_excel(file_path,skiprows=[0, 3, 4])

    #  The headers of the raw data includes combined rows, modify the import method to streamline the data processing
    city_df = city_df.iloc[:,:6]
    candidates_names = city_df.iloc[0, 3:6].values.tolist()
    city_df.columns = ['town', 'village', 'polling_place'] + candidates_names # rename the headers

    # Foreward fill the town's name the villages inside the same town
    city_df['town'] = city_df['town'].ffill()
    city_df['town'] = city_df['town'].str.strip()

    # Drop the rows that contain only the candidates' names and those contains the subtotal votes for each town
    city_df = city_df.dropna()
    city_df['polling_place'] = city_df['polling_place'].astype(int)

    # --- Melt the DataFrame into a long data form ---
    id_vars = ['town', 'village', 'polling_place']
    melt_df = pd.melt(city_df, id_vars=id_vars, var_name='candidates_names',value_name='votes')
    melt_df['county'] = county_name
    return melt_df
  
  def aggregate_county_dataframe(self):    
    # Aggregate all the counties into one Dataframe 
    agg_county_df = pd.DataFrame()
    for county_name in self.county_names:
      county_df = self.tidy_county_df(county_name)
      agg_county_df = pd.concat([agg_county_df, county_df])
    agg_county_df = agg_county_df.reset_index(drop=True) # drop the old indices
    print(">> Finished aggregating the county data and put it into the 'agg_county_df'.")
    self.agg_county_df = agg_county_df
    print(f">> aggregated dataframe saved to '_self_.agg_county_df'\n")
    return agg_county_df
  
  def extract_columns_to_df (self, data, cols:list):
    """
    Split the dfs into several databases based on the columns needed:.

    Args:
        data (pd.DataFrame): The aggregated data from all the polling stations.
        cols (list) : A list of the column names to extract from the aggregated dataframe.

    Returns:
        pd.DataFrame: A cleaned DataFrame in a long format, ready for analysis.
    """
    new_df =  data.groupby(cols).count().reset_index()
    new_df = new_df[cols].reset_index()
    new_df['id'] = new_df.index + 1
    print(f"  Successifully extracted {cols} to a new DataFrame.\n") 
    return new_df
  
# ---- The main funciton of this class -----
  def create_national_database(self, path='taiwan_presidential_election_2024.db'):
    # Separate the candidate numbers from the names in column = candidates_names
    numbers, candidates = [], []
    agg_county_df = self.aggregate_county_dataframe()
    col_to_split = agg_county_df['candidates_names']
    for row in col_to_split:
      number_pattern = r"\((.+)\)" 
      number_match = re.search(number_pattern, row)
      number = number_match.group(1)
      numbers.append(number)
      name_pattern = row.split('\n')
      candidate = name_pattern[1] + '/' + name_pattern[2]
      candidates.append(candidate)
    print(">> Finished tidy up the candidate information, including names and ids.")

    # Drop the candidate_names and join the newly split numbers and names
    updated_national_df = agg_county_df.copy()
    updated_national_df['candidate_number'] = numbers
    updated_national_df['candidate_name'] = candidates
    updated_national_df = updated_national_df.drop(columns=["candidates_names"])
    updated_national_df['votes'] = agg_county_df['votes'].values
    print(">> Updated the candidates' information to the aggregated dataframe.")
    
    # Extract different pooling locations
    print(">> Extracting Dataframes...\n")
    groupby_polling_places = self.extract_columns_to_df(updated_national_df,['town', 'village', 'polling_place', 'county'])
    # Extract the candidates information
    groupby_candidates = self.extract_columns_to_df(updated_national_df,['candidate_number','candidate_name'])
    
    # Join the two dfs
    keys = ['county', 'town', 'village', 'polling_place']
    national_votes_by_polling_places = pd.merge(
                                                updated_national_df, 
                                                groupby_polling_places,
                                                left_on=keys,
                                                right_on=keys,
                                                how = 'left') # updated_national_df on the left
    national_votes_by_polling_places = national_votes_by_polling_places[['id','candidate_number','votes']]
    national_votes_by_polling_places = national_votes_by_polling_places.rename(columns={'id':'polling_place_id','candidate_number':'candidate_id'})
    print(">> Finished joining the two Dataframes: national voting data & polling places.\n")
    
    # Create a SQLite database by sending query syntax to the app
    absolute_path = os.path.abspath(path)
    
    # ADDED TIMEOUT: Gives other apps 20 seconds to finish before crashing
    connection = sqlite3.connect(absolute_path, timeout=20.0)
    connection = sqlite3.connect(absolute_path)
    self.dfs = {
            'polling_places': groupby_polling_places,
            'candidates': groupby_candidates,
            'votes': national_votes_by_polling_places
              }
    
    print(f">> Dataframes saved to '_self_.dfs'. Database at: {absolute_path}")
    
    try: # Added try block to ensure closure
      self.path = absolute_path 
      connection.execute("PRAGMA journal_mode=WAL;") # ENABLE WAL MODE: Crucial for concurrency (reading while writing)
      connection.execute("BEGIN IMMEDIATE;") # START AN IMMEDIATE TRANSACTION: Locks the write-path early to prevent "No such table" errors
      
      groupby_polling_places.to_sql("polling_places", con=connection,if_exists='replace', index=False)
      groupby_candidates.to_sql("candidates", con=connection,if_exists='replace', index=False)
      national_votes_by_polling_places.to_sql("votes", con=connection,if_exists='replace', index=False)
      connection.commit() # commit the changes to make the new tables permanent
      print(">> Successfully imported tables: polling_places, candidates, and votes.\n")
      
      ### SQL syntax 
      cur = connection.cursor()
      cur.execute("BEGIN IMMEDIATE;")
      cur.execute("DROP VIEW IF EXISTS votes_by_village;")
      create_view_sql = """
      CREATE VIEW votes_by_village AS 
      SELECT 
        polling_places.county,
        polling_places.town, 
        polling_places.village,
        candidates.candidate_number AS number,
        SUM(votes."votes") AS sum_votes
        
      FROM votes
      LEFT JOIN polling_places
        ON votes.polling_place_id = polling_places.id
      LEFT JOIN candidates
        ON votes.candidate_id = candidates.id
        
      GROUP BY 
        polling_places.county,
        polling_places.town, 
        polling_places.village,
        candidates.candidate_number;
      """
      cur.execute(create_view_sql)
      connection.commit()
      print(">> Successfully created view: votes_by_village.\n")
      
      # Verification: Check if the view actually returns rows
      cur.execute("SELECT * FROM votes_by_village LIMIT 5;")
      sample_rows = cur.fetchall()

      if sample_rows:
          print(f">> View Verified! Sample data: {sample_rows[0]}")
      else:
          print("!! View created, but it appears to be empty. Check join keys.")
      
    except Exception as e: # Catch errors to see exactly why it failed
      # If anything fails, try to roll back to keep the DB stable
      try:
          connection.rollback()
      except:
          pass
      print(f"!! An error occurred: {e}")
      raise
  
    finally: #This ensures the connection closes even if the code crashes
      connection.close()
      print(">> Database connection closed.")
  # Filtering function to select a certain county, town, and village.

  def create_cosine_df(self):
    # Load the previously created view from sql
    query = """
    SELECT * 
    FROM votes_by_village; 
    """
    connection = sqlite3.connect(self.path)
    votes_by_village = pd.read_sql(query,con=connection)
    connection.close()

    # Calculate the total polling percentage 
    total_votes = votes_by_village['sum_votes'].sum()
    country_percentage = votes_by_village.groupby('number')['sum_votes'].sum()/total_votes
    vector_a = country_percentage.values # translate the series to a one-dimensional ndarray

    # Calculate the poling percentage by different municiapal combinations that include: county, town, and village
    grouping_vars = ['county', 'town','village']
    village_total_votes= votes_by_village.groupby(grouping_vars)['sum_votes'].sum()
    add_village_total_votes = pd.merge(
      votes_by_village, 
      village_total_votes, 
      left_on=grouping_vars,
      right_on=grouping_vars,
      how='left')

    votes_by_candidates = 'sum_votes_x'
    total_votes_per_village = 'sum_votes_y'
    add_village_total_votes['village_percentage'] = add_village_total_votes[votes_by_candidates]/add_village_total_votes[total_votes_per_village]


    # Transpose the Dataframe from long format to wide format so that every municipal combination will has a column for each candidate's votes in total polling precentage
    pivot_df = add_village_total_votes.pivot(
                                            index=grouping_vars,
                                            columns='number',
                                            values='village_percentage',
                                            ).reset_index()


    # Calculate the 'Cosine Similarity' for each municipal combinations
    cosine_similarities = []
    length_vector_a = pow((vector_a**2).sum(), 0.5)

    cols_dict = {
    "candidate_pair1": "1",
    "candidate_pair2": "2",
    "candidate_pair3": "3",
    }

    for row in pivot_df.iterrows():
      data_series = row[1]
      vector_bi = np.array(
                          [data_series.loc[cols_dict["candidate_pair1"]], 
                          data_series.loc[cols_dict["candidate_pair2"]],
                          data_series.loc[cols_dict["candidate_pair3"]]])
      vector_a_dot_vector_bi = np.dot(vector_a, vector_bi)
      length_vector_bi = pow((vector_bi**2).sum(), 0.5)
      cosine_similarity = vector_a_dot_vector_bi/(length_vector_a * length_vector_bi)
      cosine_similarities.append(cosine_similarity)

    # Cosine Similarity DataFrame
    cosine_similarity_df = pivot_df.copy()
    cosine_similarity_df['cosine_similarity'] = cosine_similarities
    
    cosine_similarity_df = cosine_similarity_df.sort_values(
                                                            ['cosine_similarity','county','town','village'],
                                                            ascending=[False, True, True, True]
                                                            ).reset_index(drop=True) # removed the previously set indices
    
    cosine_similarity_df.reset_index() # create the new index after value sorting
    cosine_similarity_df['similarity_rank'] = cosine_similarity_df.index + 1
    cosine_similarity_df = cosine_similarity_df.rename(columns=cols_dict)
    return vector_a, cosine_similarity_df


  def filter_county_town_village(self, df, county_name:str, town_name:str, village_name:str):
    county_condition =  df["county"] == county_name
    town_condition =  df["town"] == town_name
    village_condition =  df["village"] == village_name
    return  df[county_condition & town_condition & village_condition]
  
  def launch_radio_filter_UI(self):
    country_percentage, gradio_df = self.create_cosine_df()
    ko_wu, lai_hsiao, hou_chao = country_percentage

    interfacr = gr.Interface(
                            fn= self.filter_county_town_village,
                            inputs=[
                                    gr.Dataframe(gradio_df),
                                    "text",
                                    "text",
                                    "text"
                                    ],
                            outputs= "dataframe",
                            title="找出章魚里",
                            description=f"輸入欲篩選的縣市、鄉鎮區與村鄰里: (柯吳配，賴蕭配，侯趙配) = ({ko_wu:.6f}, {lai_hsiao:.6f}, {hou_chao:.6f})",  
                            )
    interfacr.launch()


In [2]:
# Functions Call

## Temporarily filter the specific UserWarning from openpyxl
warnings.filterwarnings(
                        "ignore", 
                        message="Workbook contains no default style, apply openpyxl's default", 
                        category=UserWarning
                      )

## Data cleaning 
election_db = create_TW_presidentional_election_2024_DB()
election_db.create_national_database()
election_db.launch_radio_filter_UI()

>> county list saved to '_self_.county_names_lst'

>> Finished aggregating the county data and put it into the 'agg_county_df'.
>> aggregated dataframe saved to '_self_.agg_county_df'

>> Finished tidy up the candidate information, including names and ids.
>> Updated the candidates' information to the aggregated dataframe.
>> Extracting Dataframes...

  Successifully extracted ['town', 'village', 'polling_place', 'county'] to a new DataFrame.

  Successifully extracted ['candidate_number', 'candidate_name'] to a new DataFrame.

>> Finished joining the two Dataframes: national voting data & polling places.

>> Dataframes saved to '_self_.dfs'. Database at: g:\My Drive\{Research} Projects\2025_7 data analytic projects\4_taiwan_presidential_election_2024\taiwan_presidential_election_2024.db
>> Successfully imported tables: polling_places, candidates, and votes.

>> Successfully created view: votes_by_village.

>> View Verified! Sample data: ('南投縣', '中寮鄉', '中寮村', '1', 81)
>> Database conne