In [1]:
import pandas as pd

# Function to load and process the dictionaries
def load_dictionaries(supersector_dict_path, industry_dict_path, datatype_dict_path, period_dict_path):
    # Helper function to clean and load dictionary files
    def load_clean_dict(file_path, key_col, value_col):
        df = pd.read_csv(file_path, sep='\t', header=None, usecols=[key_col, value_col])
        df.columns = ['key', 'value']  # Rename for clarity
        df['key'] = df['key'].str.strip()  # Strip whitespace from keys
        df['value'] = df['value'].str.strip()  # Strip whitespace from values
        return df.set_index('key')['value'].to_dict()  # Convert to dictionary

    # Load all dictionaries
    supersector_dict = load_clean_dict(supersector_dict_path, 0, 1)
    industry_dict = load_clean_dict(industry_dict_path, 0, 3)
    datatype_dict = load_clean_dict(datatype_dict_path, 0, 1)
    period_dict = load_clean_dict(period_dict_path, 0, 2)  

    return supersector_dict, industry_dict, datatype_dict, period_dict


# Function to process the series file
def process_series_file(series_file, supersector_dict, industry_dict, datatype_dict):
    try:
        series_data = pd.read_csv(series_file, sep='\t', skipinitialspace=True)

        # Strip any whitespace characters from the column names
        series_data.columns = series_data.columns.str.strip()
        
        # Convert necessary columns to string and strip any whitespace from series values
        series_data['series_id'] = series_data['series_id'].astype(str).str.strip()
        series_data['supersector_code'] = series_data['supersector_code'].astype(str).str.strip().str.zfill(2) 
        series_data['industry_code'] = series_data['industry_code'].astype(str).str.strip().str.zfill(8)
        series_data['data_type_code'] = series_data['data_type_code'].astype(str).str.strip().str.zfill(2)
        
        # Replace placeholder '-' with NA
        series_data.replace('-', pd.NA, inplace=True)

        # Map dictionary values to their names
        series_data['supersector'] = series_data['supersector_code'].map(supersector_dict)
        series_data['industry'] = series_data['industry_code'].map(industry_dict)
        series_data['datatype'] = series_data['data_type_code'].map(datatype_dict)
        
        # Keep only the necessary columns
        series_data = series_data[['series_id', 
                                   'supersector_code', 'supersector', 
                                   'industry_code', 'industry', 
                                   'data_type_code', 'datatype']]
        
        return series_data

    except KeyError as e:
        print(f"KeyError encountered: {e}. Check column names or data.")
        raise  # Re-raise the error after logging the message


# Function to process the allearnAE data file
def process_AllEarnAE_file(AllEarnAE_file, series_data, period_dict, start_year):
    try:
        # Read the data file
        data = pd.read_csv(AllEarnAE_file, sep='\t', engine='python')

        # Strip any whitespace characters from the column names
        data.columns = data.columns.str.strip()

        # Explicitly clean the series_id column for consistency
        data['series_id'] = data['series_id'].astype(str).str.strip()

        # Convert the year column to numeric for filtering and filter rows based on the start_year
        data['year'] = pd.to_numeric(data['year'], errors='coerce')
        data = data[data['year'] >= start_year]

        # Map period code to name using period_dict
        data['period'] = data['period'].map(period_dict)

        # Merge with series data based on series_id
        merged1_data = pd.merge(data, series_data, on='series_id', how='left')

        # Keep only the relevant columns
        final1_data = merged1_data[['series_id', 'year', 'value', 
                                   'supersector', 'industry', 'datatype', 'period']]

        return final1_data

    except KeyError as e:
        print(f"KeyError encountered: {e}. Check column names or data.")
        raise  # Re-raise the error after logging the message

# Function to process the allearnAE data file
def process_TotalPrivateAEHrsEarn_file(TotalPrivateAEHrsEarn_file, series_data, period_dict, start_year):
    try:
        # Read the data file
        data = pd.read_csv(TotalPrivateAEHrsEarn_file, sep='\t', engine='python')

        # Strip any whitespace characters from the column names
        data.columns = data.columns.str.strip()

        # Explicitly clean the series_id column for consistency
        data['series_id'] = data['series_id'].astype(str).str.strip()

        # Convert the year column to numeric for filtering and filter rows based on the start_year
        data['year'] = pd.to_numeric(data['year'], errors='coerce')
        data = data[data['year'] >= start_year]

        # Map period code to name using period_dict
        data['period'] = data['period'].map(period_dict)

        # Merge with series data based on series_id
        merged2_data = pd.merge(data, series_data, on='series_id', how='left')

        # Keep only the relevant columns
        final2_data = merged2_data[['series_id', 'year', 'value', 
                                   'supersector', 'industry', 'datatype', 'period']]

        # Filter out rows where datatype starts with "INDEXES"
        final2_data = final2_data[~final2_data['datatype'].str.startswith("INDEXES", na=False)]

        return final2_data

    except KeyError as e:
        print(f"KeyError encountered: {e}. Check column names or data.")
        raise  # Re-raise the error after logging the message


# Main function to execute the process
def main():
    # Define the start year for filtering to decrease output file size; change as needed
    start_year = 2014  
    
    # Define file paths
    supersector_dict_path = 'dictionaries/ce.supersector.txt'
    industry_dict_path = 'dictionaries/ce.industry.txt'
    datatype_dict_path = 'dictionaries/ce.datatype.txt'
    period_dict_path = 'dictionaries/ce.period.txt'
    series_file = 'ce.series.txt'
    AllEarnAE_file = 'ce.data.02b.AllRealEarningsAE.txt'
    TotalPrivateAEHrsEarn_file = 'ce.data.05b.TotalPrivate.AllEmployeeHoursAndEarnings.txt'
    InfoAEHrsEarn_file ='ce.data.50b.Information.AllEmployeeHoursAndEarnings.txt'
    FinlAEHrsEarn_file ='ce.data.55b.FinancialActivities.AllEmployeeHoursAndEarnings.txt'
    ProfAEHrsEarn_file ='ce.data.60b.ProfessionalBusinessServices.AllEmployeeHoursAndEarnings.txt'
    EducHlthAEHrsEarn_file ='ce.data.65b.EducationAndHealthCare.AllEmployeeHoursAndEarnings.txt'
    LeisAEHrsEarn_file ='ce.data.70b.LeisureAndHospitality.AllEmployeeHoursAndEarnings.txt'
    OtherAEHrsEarn_file ='ce.data.80b.OtherServices.AllEmployeeHoursAndEarnings.txt'
    output_file1 = 'ce_AllEarnAE_processed_data.csv'
    output_file2 = 'ce_TotalPrivateAEHrsEarn_processed_data.csv'
    output_file3 = 'ce_InfoAEHrsEarn_processed_data.csv'
    output_file4 = 'ce_FinlAEHrsEarn_processed_data.csv'
    output_file5 = 'ce_ProfAEHrsEarn_processed_data.csv'
    output_file6 = 'ce_EducHlthAEHrsEarn_processed_data.csv'
    output_file7 = 'ce_LeisAEHrsEarn_processed_data.csv'
    output_file8 = 'ce_OtherAEHrsEarn_processed_data.csv'

    # Load dictionaries
    print("Loading dictionaries...")
    try:
        supersector_dict, industry_dict, datatype_dict, period_dict = load_dictionaries(supersector_dict_path, industry_dict_path, datatype_dict_path, period_dict_path)
    except Exception as e:
        print(f"Error loading dictionaries: {e}")
        return  # Exit if there is an error loading the dictionaries

    # Process series file
    print("Processing series file...")
    try:
        series_data = process_series_file(series_file, supersector_dict, industry_dict, datatype_dict)
    except Exception as e:
        print(f"Error processing series file: {e}")
        return  # Exit the function if error occurs in processing series file

    # Process AllEarnAE data file
    print("Processing AllEarnAE data file...")
    try:
        final1_data = process_AllEarnAE_file(AllEarnAE_file, series_data, period_dict, start_year)
    except Exception as e:
        print(f"Error processing allearnAE data file: {e}")
        return  # Exit the function if error occurs in processing data file

    # Process TotalPrivateAEhrsearn data file
    print("Processing TotalPrivateAEHrsEarn data file...")
    try:
        final2_data = process_TotalPrivateAEHrsEarn_file(TotalPrivateAEHrsEarn_file, series_data, period_dict, start_year)
    except Exception as e:
        print(f"Error processing TotalPrivateAEHrsEarn data file: {e}")
        return  # Exit the function if error occurs in processing data file

    # Save final processed data to csv
    final1_data.to_csv(output_file1, index=False)
    final2_data.to_csv(output_file2, index=False)
    print("Data processing complete!")
    print("Number of rows in final1_data:", len(final1_data))
    print("Number of rows in final2_data:", len(final2_data))

if __name__ == "__main__":
    main()


Loading dictionaries...
Processing series file...
Processing allearnAE data file...
Processing TotalPrivateAEHrsEarn data file...
Data processing complete!
Number of rows in final1_data: 302992
Number of rows in final2_data: 5713
