In [204]:
import pandas as pd
from collections import OrderedDict
from pathlib import Path
import dask.dataframe as ddf

In [205]:
# Set the relative file path for dynamic and status files
opm_dynamic_dir = Path('../rawdata/opm-federal-employment-data/data/1973-09-to-2014-06/non-dod/dynamic')
opm_status_dir = Path('../rawdata/opm-federal-employment-data/data/1973-09-to-2014-06/non-dod/status')
opm_dynamic_dir_dask = Path('../rawdata/opm-federal-employment-data/data/1973-09-to-2014-06/non-dod/dynamic/*.NONDOD.FO05M3.TXT')
opm_status_dir_dask = Path('../rawdata/opm-federal-employment-data/data/1973-09-to-2014-06/non-dod/status/Status_Non_DoD_*_*.txt')

In [206]:
# Define the fwf dict for reading OPM files
opm_dynamic_fwf_dict = {
    'Pseudo-ID': (0, 9),
    'Employee Name': (9, 32),
    'Agency/Subelement': (32, 36),
    'Accession/Separation Indicator': (36, 38),
    'Effective Date (year)': (38, 42),
    'Effective Date (month)': (42, 44),
    'Effective Date (day)': (44, 46),
    'Age': (46, 52),
    'Pay Plan': (52, 54),
    'Grade': (54, 56),
    'LOS Level': (56, 62),
    'Duty Station': (62, 71),
    'Occupation': (71, 75),
    'Occupational Cateogy (PATCO)': (75, 76),
    'Adjusted Basic Pay': (76, 82),
    'Type of Appointment': (82, 84),
    'Work Schedule': (84, 85)
}

opm_status_fwf_dict = {
    'Pseudo-ID': (0, 9),
    'Employee Name': (9, 32),
    'File Date (year)': (32, 36),
    'File Date (month)': (36, 38),
    'File Date (day)': (38, 40),
    'Agency/Subelement': (40, 44),
    'Duty Station': (44, 53),
    'Age Range': (53, 59),
    'Education Level': (59, 61),
    'Pay Plan': (61, 63),
    'Grade': (63, 65),
    'LOS Level': (65, 71),
    'Occupation': (71, 75),
    'Occupational Category (PATCO)': (75, 76),
    'Adjusted Basic Pay': (76, 82),
    'Supervisory Status': (82, 83),
    'Type of Appointment': (83, 85),
    'Work Schedule': (85, 86),
    'NSFTP Indicator': (86, 87)
}

In [207]:
# Define the dicts for column dtypes
opm_dynamic_dtype_dict = {
    'Pseudo-ID': 'Int32',
    'Employee Name': 'string',
    'Agency/Subelement': 'category',
    'Accession/Separation Indicator': 'category',
    'Effective Date (year)': 'Int16',
    'Effective Date (month)': 'Int8',
    'Effective Date (day)': 'Int8',
    'Age': 'category',
    'Pay Plan': 'category',
    'Grade': 'category',
    'LOS Level': 'category',
    'Duty Station': 'category',
    'Occupation': 'category',
    'Occupational Category (PATCO)': 'category',
    'Adjusted Basic Pay': 'Int32',
    'Type of Appointment': 'category',
    'Work Schedule': 'category'
}

opm_status_dtype_dict = {
    'Pseudo-ID': 'Int32',
    'Employee Name': 'string',
    'File Date (year)': 'Int16',
    'File Date (month)': 'Int8',
    'File Date (day)': 'Int8',
    'Agency/Subelement': 'category',
    'Duty Station': 'category',
    'Age Range': 'category',
    'Education Level': 'category',
    'Pay Plan': 'category',
    'Grade': 'category',
    'LOS Level': 'category',
    'Occupation': 'category',
    'Occupational Category (PATCO)': 'category',
    'Adjusted Basic Pay': 'Int32',
    'Supervisory Status': 'category',
    'Type of Appointment': 'category',
    'Work Schedule': 'category',
    'NSFTP Indicator': 'category'
}

In [208]:
# Define functions that return a dict of files in the status or dynamic directory, indexed by year and quarter
def produce_opm_dynamic_path_dict(input_path):
    output_dict = {}
    for file_path in Path(input_path).iterdir():
        file_name = file_path.name
        file_date = pd.to_datetime(file_name[0:7])
        year = file_date.year
        quarter = file_date.quarter
        output_dict[(year, quarter)] = file_path

    return output_dict

def produce_opm_status_path_dict(input_path):
    output_dict = {}
    for file_path in Path(input_path).iterdir():
        file_name = file_path.name
        file_date = pd.to_datetime('-'.join(file_name[15:22].split('_')))
        year = file_date.year
        quarter = file_date.quarter
        output_dict[(year, quarter)] = file_path

    return output_dict

In [209]:
# Create dicts of paths of files
opm_dynamic_path_dict = produce_opm_dynamic_path_dict(opm_dynamic_dir)
opm_status_path_dict = produce_opm_status_path_dict(opm_status_dir)

In [210]:
# Years and quarters we want to load (inclusive)
start_year = 1983
start_qtr = 3
end_year = 1983
end_qtr = 3

# Directory we are storing processed dataframes in binary format, for quick access later
Path('../data/').mkdir(parents = True, exist_ok = True)

In [211]:
# Load fwf
for year in range(start_year, end_year + 1):
    for qtr in range(1, 5):
        if (year == start_year) and (qtr < start_qtr):
            continue
        if (year == end_year) and (qtr > end_qtr):
            continue
        
        try:
            status_df = pd.read_fwf(opm_status_path_dict[(year, qtr)], colspecs = list(opm_status_fwf_dict.values()), names = list(opm_status_fwf_dict.keys()), dtype = opm_status_dtype_dict)
            target_path = f'../data/converted_to_csv/status_{year}_{qtr}.csv'
            #df.to_csv(target_path)
            print(f'Loaded status file for year {year}, quarter {qtr}.')
        except:
            print(f'Status file for year {year}, quarter {qtr} does not exist.')
        try:
            dynamic_df = pd.read_fwf(opm_dynamic_path_dict[(year, qtr)], colspecs = list(opm_dynamic_fwf_dict.values()), names = list(opm_dynamic_fwf_dict.keys()), dtype = opm_dynamic_dtype_dict)
            target_path = f'../data/converted_to_csv/dynamic_{year}_{qtr}.csv'
            #df.to_csv(target_path)
            print(f'Loaded dynamic file for year {year}, quarter {qtr}.')
        except:
            print(f'Dynamic file for year {year}, quarter {qtr} does not exist.')

Loaded status file for year 1983, quarter 3.
Loaded dynamic file for year 1983, quarter 3.


In [213]:
status_df.dtypes

Pseudo-ID                        object
Employee Name                    object
File Date (year)                 object
File Date (month)                object
File Date (day)                  object
Agency/Subelement                object
Duty Station                     object
Age Range                        object
Education Level                  object
Pay Plan                         object
Grade                            object
LOS Level                        object
Occupation                       object
Occupational Category (PATCO)    object
Adjusted Basic Pay               object
Supervisory Status               object
Type of Appointment              object
Work Schedule                    object
NSFTP Indicator                  object
dtype: object

In [10]:
# Define function that imports files into dict within a certain timeframe
def produce_opm_df_dict(opm_path_dict, opm_fwf_dict, start_year, start_qtr, end_year, end_qtr):
    output_df_dict = {}
    for year in range(start_year, end_year + 1):
        for qtr in range(1, 5):
            if (year == start_year) and (qtr < start_qtr):
                continue
            if (year == end_year) and (qtr > end_qtr):
                continue

            output_df_dict[(year, qtr)] = pd.read_fwf(opm_path_dict[(year, qtr)], colspecs = list(opm_fwf_dict.values()), names = list(opm_fwf_dict.keys()), dtype = str)

    return output_df_dict

opm_status_df_dict = {}
opm_dynamic_df_dict = {}
error_status_df_list = []
error_dynamic_df_list = []

# Load dataframes we want
for year in range(start_year, end_year + 1):
    for qtr in range(1, 5):
        if (year == start_year) and (qtr < start_qtr):
            continue
        if (year == end_year) and (qtr > end_qtr):
            continue

        print(f'Loading year {year}, quarter {qtr}.')

        #try:
        #    pd.read_fwf(opm_status_path_dict[(year, qtr)], colspecs = list(opm_status_fwf_dict.values()), names = list(opm_status_fwf_dict.keys()), dtype = str)
        #except:
        #    error_status_df_list.append((year, qtr))
        #    print(f'Error for status file in {year},q{qtr}.')

        #try:
        #    pd.read_fwf(opm_dynamic_path_dict[(year, qtr)], colspecs = list(opm_dynamic_fwf_dict.values()), names = list(opm_dynamic_fwf_dict.keys()), dtype = str)
        #except:
        #    error_dynamic_df_list.append((year, qtr))
        #    print(f'Error for dynamic file in {year},q{qtr}.')
        
        line_counter = 0
        with open(opm_dynamic_path_dict[(year, qtr)], 'r', errors = 'backslashreplace') as f:
            for line in f:
                line_counter = line_counter + 1
                if '\\xff' in line:
                    print(f'Error found')
                    #print(f'Error found at line {line_counter}.')
                    #if line.find('\\xff') != 85:
                    #    print('Error not at position 85')

        #opm_status_df_dict[(year, qtr)].to_csv('test_latin1.csv')

#opm_status_df_dict[(1982, 1)] = opm_status_df_dict[(1982, 1)][:-1] # Drop last row which has name of other files for some reason