In [None]:
import os
import pandas as pd
import psycopg2
from psycopg2 import sql
import dotenv
from pathlib import Path
from io import StringIO
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed

# Set the root directory assuming the script is run from within the 'functions' directory.
ROOT = Path().resolve().parent

dotenv.load_dotenv()

# Retrieve the database connection URL from the environment variables.
db_url = os.getenv('DATABASE_URI')

# Define the clean_column_names function.
def clean_column_names(col_name):
    col_name = col_name.split(':', 1)[-1]
    col_name = col_name.split(';', 1)[0]
    return col_name.strip()

# Define a function to hash column names if they exceed the length limit
def hash_column_name(col_name, max_length=63):
    if len(col_name) > max_length:
        hash_object = hashlib.sha256(col_name.encode())
        hashed_col_name = "hashed_" + hash_object.hexdigest()[:max_length-7]
        return hashed_col_name
    return col_name

def create_mappings_table(cursor):
    create_table_query = """
    CREATE TABLE IF NOT EXISTS column_name_mappings (
        original_name TEXT PRIMARY KEY,
        shortened_name TEXT NOT NULL
    );
    """
    cursor.execute(create_table_query)

def load_existing_mappings(cursor):
    cursor.execute("SELECT original_name, shortened_name FROM column_name_mappings")
    return {row[0]: row[1] for row in cursor.fetchall()}

def insert_mapping(cursor, original_name, shortened_name):
    insert_query = """
    INSERT INTO column_name_mappings (original_name, shortened_name)
    VALUES (%s, %s)
    ON CONFLICT (original_name) DO NOTHING;
    """
    cursor.execute(insert_query, (original_name, shortened_name))

def process_csv_file(csv_file, existing_mappings):
    print(f"Processing file {csv_file}")
    try:
        # Connect to the PostgreSQL database
        conn = psycopg2.connect(db_url)
        cursor = conn.cursor()
        
        # Read the file into a pandas DataFrame.
        df = pd.read_csv(csv_file)
        
        # Drop the 'date' column.
        if 'date' in df.columns:
            print(f"Dropping 'date' column")
            df = df.drop(columns=['date'])
        
        # Clean the column names using the clean_column_names function.
        df.columns = [clean_column_names(col) for col in df.columns]
        
        # Rename specific columns.
        if 'geography code' in df.columns:
            print(f"Renaming 'geography code' to 'geocode'")
            df = df.rename(columns={'geography code': 'geocode'})
        if 'geography' in df.columns:
            print(f"Renaming 'geography' to 'geoname'")
            df = df.rename(columns={'geography': 'geoname'})
        
        # Shorten column names if necessary and update the mapping
        shortened_columns = []
        for col in df.columns:
            if col in existing_mappings:
                shortened_col = existing_mappings[col]
            else:
                shortened_col = hash_column_name(col)
                if col != shortened_col:
                    insert_mapping(cursor, col, shortened_col)
                    existing_mappings[col] = shortened_col
            shortened_columns.append(shortened_col)
        
        df.columns = shortened_columns
        
        # Reorder the columns.
        cols = df.columns.tolist()
        if 'geocode' in cols and 'geoname' in cols and 'Total' in cols:
            cols = ['geocode', 'geoname', 'Total'] + [col for col in cols if col not in ['geocode', 'geoname', 'Total']]
            df = df[cols]
        
        # Print the table name and columns to be inserted.
        table_name = csv_file.stem
        print(f"Table name: {table_name}")
        print(f"Columns: {', '.join(df.columns)}")
        
        # Drop the existing table if it exists.
        cursor.execute(sql.SQL("DROP TABLE IF EXISTS {}").format(sql.Identifier(table_name)))
        
        # Create a new table with the appropriate schema based on the DataFrame.
        create_table_query = sql.SQL(
            "CREATE TABLE {} ({}, UNIQUE (geocode), UNIQUE (geoname))"
        ).format(
            sql.Identifier(table_name),
            sql.SQL(', ').join(
                sql.SQL("{} {}").format(
                    sql.Identifier(col),
                    sql.SQL("TEXT") if col in ['geocode', 'geoname'] else sql.SQL("INTEGER")
                ) for col in df.columns
            )
        )
        cursor.execute(create_table_query)
        
        # Convert all columns except 'geocode' and 'geoname' to integers
        for col in df.columns:
            if col not in ['geocode', 'geoname']:
                df[col] = df[col].astype(int)
        
        # Use StringIO to create an in-memory CSV
        csv_buffer = StringIO()
        df.to_csv(csv_buffer, index=False)
        csv_buffer.seek(0)
        
        # Use the COPY command for efficient bulk loading of the CSV data into the table.
        cursor.copy_expert(
            sql.SQL("COPY {} FROM STDIN WITH CSV HEADER").format(sql.Identifier(table_name)),
            csv_buffer
        )
        
        # Commit the transaction.
        conn.commit()
        print(f"Table {table_name} created and data inserted successfully.")
        
        # Fetch and print the updated column names from the database
        cursor.execute(sql.SQL("SELECT column_name FROM information_schema.columns WHERE table_name = {}").format(sql.Literal(table_name)))
        columns = cursor.fetchall()
        print(f"Updated columns in table {table_name}: {', '.join(col[0] for col in columns)}")
        
    except Exception as e:
        print(f"Error processing file {csv_file}: {e}")
        if conn:
            conn.rollback()
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

# Set the path to the directory containing subdirectories with CSV files.
csv_dir = ROOT / 'data' / 'census' / 'EW'

# Collect all CSV files from all subdirectories
csv_files = [csv_file for sub_dir in csv_dir.iterdir() if sub_dir.is_dir() for csv_file in sub_dir.glob('*.csv')]

# Specify the number of worker threads
max_workers = 10

# Use ThreadPoolExecutor to process CSV files in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
    # Connect to the database once to create the mappings table and load existing mappings
    conn = psycopg2.connect(db_url)
    cursor = conn.cursor()
    create_mappings_table(cursor)
    existing_mappings = load_existing_mappings(cursor)
    conn.commit()
    cursor.close()
    conn.close()

    futures = [executor.submit(process_csv_file, csv_file, existing_mappings) for csv_file in csv_files]
    for future in as_completed(futures):
        try:
            future.result()  # This will raise any exceptions caught during the execution
        except Exception as e:
            print(f"Error processing file: {e}")

print("All files processed.")

In [None]:
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.exc import IntegrityError
import dotenv
from pathlib import Path
import os

# Set the root directory assuming the script is run from within the 'functions' directory.
ROOT = Path().resolve().parent

dotenv.load_dotenv()

# Retrieve the database connection URL from the environment variables.
db_url = os.getenv('DATABASE_URI')

# Create a SQLAlchemy engine
engine = create_engine(db_url)

# Load the CSV file into a DataFrame
df = pd.read_csv(ROOT / 'data' / 'census' / 'EW' / 'ew_all_geographies.csv')

# Drop the empty columns
columns_to_drop = ['ltla22nmw', 'utla22nmw', 'rgn22nmw']
df.drop(columns=columns_to_drop, inplace=True, errors='ignore')

# Define a function to determine the partition based on region code
def get_partition(rgn22cd):
    partitions = {
        'E12000001': 'ew_geographies_region1',
        'E12000002': 'ew_geographies_region2',
        'E12000003': 'ew_geographies_region3',
        'E12000004': 'ew_geographies_region4',
        'E12000005': 'ew_geographies_region5',
        'E12000006': 'ew_geographies_region6',
        'E12000007': 'ew_geographies_region7',
        'E12000008': 'ew_geographies_region8',
        'E12000009': 'ew_geographies_region9',
        'W99999999': 'ew_geographies_region10'
    }
    return partitions.get(rgn22cd, None)

# Group the DataFrame by partition
df['partition'] = df['rgn22cd'].apply(get_partition)
partitions = df.groupby('partition')

# Insert data into the appropriate partition in bulk
for partition, data in partitions:
    if partition:
        data.drop(columns=['partition'], inplace=True)
        try:
            data.to_sql(partition, engine, if_exists='append', index=False, method='multi')
        except IntegrityError as e:
            print(f"IntegrityError for partition {partition}: {e}")
            # Optionally, you can handle the error by logging or taking other actions

print("Data loaded successfully.")

In [None]:
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.exc import IntegrityError
import dotenv
from pathlib import Path
import os

# Set the root directory assuming the script is run from within the 'functions' directory.
ROOT = Path().resolve().parent

dotenv.load_dotenv()

# Retrieve the database connection URL from the environment variables.
db_url = os.getenv('DATABASE_URI')

# Create a SQLAlchemy engine
engine = create_engine(db_url)

# Load the CSV file into a DataFrame
df = pd.read_csv(ROOT / 'data' / 'elections' / 'ew_political_boundaries.csv')

# Drop the 'ObjectId' column
df.drop(columns=['ObjectId'], inplace=True, errors='ignore')

# Lowercase all column names
df.columns = df.columns.str.lower()

df.drop_duplicates(subset=['wd22cd'], inplace=True)

# Define the metadata
metadata = MetaData()

# Define the ew_political_boundaries table
ew_political_boundaries = Table(
    'ew_political_boundaries', metadata,
    sqlalchemy.Column('wd22cd', sqlalchemy.String, primary_key=True),
    sqlalchemy.Column('wd22nm', sqlalchemy.String),
    sqlalchemy.Column('pcon22cd', sqlalchemy.String),
    sqlalchemy.Column('pcon22nm', sqlalchemy.String),
    sqlalchemy.Column('lad22cd', sqlalchemy.String),
    sqlalchemy.Column('lad22nm', sqlalchemy.String),
    sqlalchemy.Column('utla22cd', sqlalchemy.String),
    sqlalchemy.Column('utla22nm', sqlalchemy.String)
)

# Create the ew_political_boundaries table in the database
metadata.create_all(engine)

# Insert data into the ew_political_boundaries table
try:
    df.to_sql('ew_political_boundaries', engine, if_exists='append', index=False, method='multi')
except IntegrityError as e:
    print(f"IntegrityError: {e}")
    # Optionally, you can handle the error by logging or taking other actions

print("Data loaded successfully.")

In [6]:
import pandas as pd
from sqlalchemy import create_engine, Table, MetaData, Column, String, Integer, Float
from sqlalchemy.dialects.postgresql import insert
import logging
import dotenv
from pathlib import Path
import os

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Load environment variables
dotenv.load_dotenv()

# Retrieve the database connection URL from the environment variables.
db_url = os.getenv('DATABASE_URI')
engine = create_engine(db_url)

# Function to convert to proper noun casing
def proper_noun_casing(name):
    if isinstance(name, str):
        words = name.split()
        return ' '.join([word.capitalize() if word.lower() != 'and' else word for word in words])
    return name

# Function to make vote_share columns unique by appending the previous column name
def make_vote_share_columns_unique(df):
    new_columns = []
    for i, col in enumerate(df.columns):
        if 'vote share' in col.lower():
            if i > 0:
                new_col = f"vote_share_{df.columns[i-1].lower().replace(' ', '_')}"
            else:
                new_col = f"vote_share_{i}"
            new_columns.append(new_col)
        else:
            new_columns.append(col.lower().replace(' ', '_'))
    df.columns = new_columns

# Function to remove unnamed columns
def remove_unnamed_columns(df):
    unnamed_columns = [col for col in df.columns if col.lower().startswith('unnamed') or col.strip() == '']
    if unnamed_columns:
        df.drop(columns=unnamed_columns, inplace=True)

ROOT = Path().resolve().parent
excel_file = ROOT / 'data' / 'elections' / 'election-results.xlsx'

# Read all sheet names
sheet_names = pd.ExcelFile(excel_file).sheet_names

all_data = []

for sheet in sheet_names:
    if sheet == 'University Seats':
        continue
    # Read the sheet into a DataFrame
    df = pd.read_excel(excel_file, sheet_name=sheet, header=None)
    
    # Remove empty columns & rows
    df = df.dropna(axis=1, how='all')
    df = df.dropna(axis=0, how='all')
    df = df.dropna(thresh=3, axis=1)
    df = df.dropna(thresh=3, axis=0)
    
    # Merge the first and second rows to create column headers
    new_headers = df.iloc[0].fillna('') + ' ' + df.iloc[1].fillna('')
    df.columns = new_headers.str.strip()  # Remove leading and trailing whitespace
    df = df[2:]
    
    # Drop the 'id' column if it exists
    df.drop(columns=['id'], inplace=True, errors='ignore')
    
    # Ensure all column names are strings to avoid AttributeError
    df.columns = df.columns.map(str)
    
    # Rename columns
    df = df.rename(columns={
        'ONS id': 'pconyycd',
        'Constituency': 'pconyynm',
        'Country/Region': 'rgn',
        'Country': 'ctry'
    })
    
    # Apply proper noun casing to 'pconyynm' column
    if 'pconyynm' in df.columns:
        df['pconyynm'] = df['pconyynm'].apply(lambda x: proper_noun_casing(str(x)))
    
    # Make vote_share columns unique
    make_vote_share_columns_unique(df)
    
    # Remove unnamed columns
    remove_unnamed_columns(df)
    
    # Add a 'year' column
    df['year'] = sheet
    
    # Create a unique 'pconyynm_year' column
    if 'pconyynm' in df.columns:
        df['pconyynm_year'] = df['pconyynm'] + '_' + df['year'].astype(str)
    
    # Append the DataFrame to the list
    all_data.append(df)

# Concatenate all DataFrames into a single DataFrame
combined_df = pd.concat(all_data, ignore_index=True)

# Ensure all column names are in lowercase
combined_df.columns = combined_df.columns.str.lower()

# Reorder the columns
ordered_columns = ['pconyynm_year', 'pconyynm', 'year', 'pconyycd'] + [col for col in combined_df.columns if col not in ['pconyynm_year', 'pconyynm', 'year', 'pconyycd']]
combined_df = combined_df[ordered_columns]

# Remove any columns with blank names
combined_df = combined_df.loc[:, combined_df.columns.str.strip() != '']

# Check for duplicates in 'pconyynm_year' and append 'rgn' to make them unique
duplicates = combined_df[combined_df.duplicated('pconyynm_year', keep=False)]
if not duplicates.empty:
    combined_df.loc[combined_df.duplicated('pconyynm_year', keep=False), 'pconyynm_year'] = (
        combined_df['pconyynm_year'] + '_' + combined_df['rgn']
    )

# Save the combined DataFrame to a single CSV file
combined_csv_file = ROOT / 'data' / 'elections' / 'election-results-combined.csv'
combined_df.to_csv(combined_csv_file, index=False)

# Define the metadata
metadata = MetaData()

# Define the election_results table
election_results = Table(
    'election_results', metadata,
    Column('pconyynm_year', String, primary_key=True),
    Column('pconyynm', String),
    Column('year', String),
    Column('pconyycd', String),
    # Add other columns as needed
    *(Column(col, String) for col in combined_df.columns if col not in ['pconyynm_year', 'pconyynm', 'year', 'pconyycd'])
)

# Create the table in the remote database
metadata.create_all(engine)

# Use COPY to load data into the PostgreSQL database
with engine.connect() as connection:
    with open(combined_csv_file, 'r') as f:
        # Skip the header row for COPY command
        next(f)
        # COPY command to load the data from the CSV file
        copy_sql = f"""
        COPY election_results FROM stdin WITH CSV HEADER
        DELIMITER as ','
        """
        try:
            connection.connection.cursor().copy_expert(sql=copy_sql, file=f)
            connection.connection.commit()
            logging.info("Data loaded successfully.")
        except Exception as e:
            logging.error(f"Error loading data: {e}")
            connection.connection.rollback()

# Verify the data has been added
query = "SELECT * FROM election_results LIMIT 10;"
with engine.connect() as connection:
    df = pd.read_sql(query, connection)
    print(df.head())

2024-06-14 18:07:43,570 - INFO - Data loaded successfully.


                     pconyynm_year                    pconyynm  year pconyycd  \
0             Battersea South_1918             Battersea South  1918     None   
1      Bermondsey Rotherhithe_1918      Bermondsey Rotherhithe  1918     None   
2  Bermondsey West Bermondsey_1918  Bermondsey West Bermondsey  1918     None   
3    Bethnal Green North East_1918    Bethnal Green North East  1918     None   
4    Bethnal Green South West_1918    Bethnal Green South West  1918     None   

  seats  county     rgn electorate conservative_party_votes  \
0     1  London  London      43036                    15670   
1     1  London  London      25008                     5639   
2     1  London  London      23100                     None   
3     1  London  London      25253                     None   
4     1  London  London      19510                     4240   

  vote_share_conservative_party_votes  ... vote_share_uu_votes  \
0                  0.6818082930861942  ...                None   
1 

In [31]:
#  Not implemented - university seats are currently excluded
import pandas as pd
from pathlib import Path

def format_university_seats_csv(file_path):
    # Read the CSV file
    df = pd.read_csv(file_path, header=None)
    
    # Remove empty rows and columns
    df = df.dropna(axis=0, how='all').dropna(axis=1, how='all')
    
    # Save the formatted DataFrame back to CSV
    df.to_csv(file_path, index=False)

def main():
    ROOT = Path().resolve().parent
    csv_file = ROOT / 'data' / 'elections' / 'election-results-University Seats.csv'
    
    format_university_seats_csv(csv_file)
    print(f"Formatted {csv_file}")

if __name__ == "__main__":
    main()

Formatted /Users/cardigan/llm-datawarehouse/data/elections/election-results-University Seats.csv


In [11]:
import json
import psycopg2
from psycopg2 import sql
import dotenv
from pathlib import Path

# Load environment variables
dotenv.load_dotenv()

# Retrieve the database connection URL from the environment variables.
db_url = os.getenv('DATABASE_URI')

# Path to the JSON file
json_file_path = Path('../api/table_titles.json')

# Load JSON data
with open(json_file_path, 'r') as file:
    data = json.load(file)

# Function to create the table_titles table
def create_table_titles_table(cursor):
    create_table_query = """
    CREATE TABLE IF NOT EXISTS table_titles (
        country TEXT NOT NULL,
        code TEXT NOT NULL,
        name TEXT NOT NULL,
        PRIMARY KEY (country, code)
    );
    """
    cursor.execute(create_table_query)

# Function to insert data into the table_titles table
def insert_table_titles_data(cursor, country, name, code):
    insert_query = """
    INSERT INTO table_titles (country, code, name)
    VALUES (%s, %s, %s)
    ON CONFLICT (country, code) DO NOTHING;
    """
    cursor.execute(insert_query, (country, code.lower(), name))

# Main function to process the JSON data and insert it into the table
def process_json_data(data):
    try:
        # Connect to the PostgreSQL database
        conn = psycopg2.connect(db_url)
        cursor = conn.cursor()
        
        # Create the table_titles table if it doesn't exist
        create_table_titles_table(cursor)
        
        # Insert data into the table_titles table
        for country, titles in data.items():
            for name, code in titles.items():
                insert_table_titles_data(cursor, country, code, name)
        
        # Commit the transaction
        conn.commit()
        print("Data inserted successfully into table_titles.")
        
    except Exception as e:
        print(f"Error processing JSON data: {e}")
        if conn:
            conn.rollback()
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

# Process the JSON data
process_json_data(data)

Data inserted successfully into table_titles.


In [4]:
import psycopg2
from psycopg2 import sql
import bcrypt
import dotenv
import os

dotenv.load_dotenv()

# Retrieve the database connection URL from the environment variables.
db_url = os.getenv('DATABASE_URI')
# Connect to the database
conn = psycopg2.connect(db_url, sslmode='require')
cur = conn.cursor()

# Create the 'users' table with a larger password column
create_table_query = """
CREATE TABLE IF NOT EXISTS users (
    username VARCHAR(50) PRIMARY KEY,
    password VARCHAR(60) NOT NULL
);
"""
cur.execute(create_table_query)
conn.commit()

# Hash the password
password = 'password'
hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())

# Insert entries into the 'users' table
insert_query = """
INSERT INTO users (username, password) VALUES (%s, %s)
ON CONFLICT (username) DO NOTHING;
"""
cur.execute(insert_query, ('admin', hashed_password.decode('utf-8')))
conn.commit()

# Verify the insertion
cur.execute("SELECT * FROM users;")
rows = cur.fetchall()
for row in rows:
    print(row)

# Close the cursor and connection
cur.close()
conn.close()

('admin', '$2b$12$xKWIW1h1Jx46supKSJmmV.ccYW68XrO6i9Tma8Z/nJUP7qbdDKtgC')


In [None]:
import csv

with open('data/Scotland/OA_TO_HIGHER_AREAS.csv', 'r') as file:
    reader = csv.reader(file)
    
    # Read header row to get column names
    headers = next(reader)
    
    # Initialize dictionary to store unique values for each column
    unique_values = {col: set() for col in headers}
    
    # Iterate over each row
    for row in reader:
        # Iterate over each column in the row
        for col, value in zip(headers, row):
            unique_values[col].add(value)

# Print number of unique values for each column            
for col, values in unique_values.items():
    print(f"{col}: {len(values)} unique values")

In [8]:
# Define the list of historic UK political parties
party_columns = [
    'independent_labour', 'independent_conservative', 'independent_unionist', 'labour_unionist_(ireland)', 'independent_liberal', 'independent', 'national',
'sinn_fein', 'nationalist_(ireland)', 'independent_nationalist', 'other',
'national_liberal', 'communist', 'constitutionalist', 'national_labour',
'independent_labour_party', 'pc/snp', 'national_independent', 'common_wealth_movement',
'northern_ireland_labour_party', 'national_liberal/national_liberal_and_conservative',
'nationalist(wales/scotland)', 'snp/plaid_cymru',
'unionist(pro-assembly)', 'dup_(uuuc)', 'vanguard_unionist_progressive_party_(uuuc)',
'unionist_(uuuc)', 'sdlp', 'alliance', 'oup_(uuuc)', 'oup',
'alliance_(northern_ireland)', 'workers_party', 'uup',
'sf', 'ukup', 'uu'
]

# Sort the list alphabetically
sorted_party_columns = sorted(party_columns)

# Print the sorted list
print(sorted_party_columns)


['alliance', 'alliance_(northern_ireland)', 'common_wealth_movement', 'communist', 'constitutionalist', 'dup_(uuuc)', 'independent', 'independent_conservative', 'independent_labour', 'independent_labour_party', 'independent_liberal', 'independent_nationalist', 'independent_unionist', 'labour_unionist_(ireland)', 'national', 'national_independent', 'national_labour', 'national_liberal', 'national_liberal/national_liberal_and_conservative', 'nationalist(wales/scotland)', 'nationalist_(ireland)', 'northern_ireland_labour_party', 'other', 'oup', 'oup_(uuuc)', 'pc/snp', 'sdlp', 'sf', 'sinn_fein', 'snp/plaid_cymru', 'ukup', 'unionist(pro-assembly)', 'unionist_(uuuc)', 'uu', 'uup', 'vanguard_unionist_progressive_party_(uuuc)', 'workers_party']
