In [29]:
import pandas as pd
from glob import glob
import csv
import numpy as np

In [30]:
def checkIndexValues(dataSlice):
    """
        Checks if a slice of a pandas data series is simply the row numbers, exported previously
        as an index column.
        
        Parameters
        ----------
        dataSlice: pandas data series
            A slice of data or list
            
        Returns
        --------
        boolean
            True if data values are indices based on row number
            False if data values are not indices based on row number
    """
    for idx, val in enumerate(dataSlice):
        if val != idx+1:
            return False
    return True

In [31]:
# Test
print([1,2,3,4])
print(f"Index values of list: {checkIndexValues([1,2,3,4])}")
print([5,2,1,0])
print(f"Index values of list: {checkIndexValues([5,2,1,0])}")

[1, 2, 3, 4]
Index values of list: True
[5, 2, 1, 0]
Index values of list: False


In [32]:
def checkIfHasLeadingG(value):
    """
        Checks if the first character in a string value is a "G"
        and if the rest of the string is an otherwise valid integer. 
        Used to remove artifact leading "G"s from join columns.
        
        Parameters
        ----------
        value: string
            Value to test
            
        Returns
        --------
        boolean
            True if has a G leading a valid integer 
            False if not
        
    """
    # This only applies to string values
    if not isinstance(value, str):
        return False
    
    # If length is zero, this throws an error
    try:
        if value[0] != "G":
            return False
    except:
        return False
    
    # Try to convert the value, sliced from the first index onward to an int
    # If succeeds, return true, all other conditions return false
    try:
        int(value[1:])
        return True
    except:
        return False

In [33]:
# Test
print('G1234')
print(f"Has leading G of example value: {checkIfHasLeadingG('G1234')}")
print('GX1234')
print(f"Has leading G of example value: {checkIfHasLeadingG('GX1234')}")
print('1234')
print(f"Has leading G of example value: {checkIfHasLeadingG('1234')}")

G1234
Has leading G of example value: True
GX1234
Has leading G of example value: False
1234
Has leading G of example value: False


In [80]:
def inferTypes(file):
    """
        Thoroughly checks for numeric values that should remain as strings due to 
        leading 0s that would otherwise get dropped in pandas import.
        
        Parameters
        ----------
        file: string
            filepath to CSV to open
            
        Returns
        --------
        cols: dictionary
            Relevant columns and string labels for optional pandas import parameter
        
    """
    # Declare outside of scope, since I'm still living in a javascript kinda mind
    cols = {}
    colNames = []

    # Open CSV plainly, no type inference
    with open(file, newline='') as csvfile:
        # Access and loop through rows
        data = csv.reader(csvfile, delimiter=' ', quotechar='|')
        for idx, row in enumerate(data): 
            entries = row[0].split(',')
            # Loop through entries, ignore blank values (index column)
            # Trim extraneous single or double quotes eg: ' " Value " '
            for entry in entries:
                if len(entry) == 0:
                    continue 
                    
                if entry[0] == '"' or entry[0] == '"':
                    entry = entry[1:]
                    
                if entry[-1] == '"' or entry[-1] == '"':
                    entry = entry[:-1]
            # If on first iteration, use values as column names, and iterate
            if idx == 0:
                colNames = entries
                continue
            # Check for string values with leading 0, declare as strings in cols dictionary
            for i, val in enumerate(entries):
                val = str(val)[1:-1]
                if isinstance(val, str) and len(val) > 0 and val[0] == '0':
                    try:
                        int(val)
                        cols[colNames[i]] = 'str'
                    except:
                        continue
    return cols

In [35]:
# Use glob to find all files that match file pattern
files = glob('../data_final/*.csv')
print(files[0:5])

['../data_final/PS05_2017_S.csv', '../data_final/PS06_2019_S.csv', '../data_final/DS01_S.csv', '../data_final/BE03_T.csv', '../data_final/BE03_C.csv']


In [36]:
# Test
print(f'Types for {files[0]}')
print(inferTypes(files[0]))

Types for ../data_final/PS05_2017_S.csv
{'Year': 'str'}


In [37]:
expectedLengths = {
    'C':5,
    'Z':5,
    'T':11,
    'S':2
}

joinCols = {
    'C':'COUNTYFP',
    'Z':'ZCTA',
    'T':'GEOID',
    'S':'STATEFP',
}

expectedColumnLengths = [
    {
        "name":'COUNTYFP',
        "length":5
    },
    {
        "name":'STATEFP',
        "length":2
    },
    {
        "name":'ZCTA',
        "length":5
    },
    {
        "name":'GEOID',
        "length":11
    },
]


def pad(x, length):
    try:
        if len(f"{x}") == length:
            return x
        
        if len(f"{x}") > length:
            return f"{x}"[len(f"{x}")-length:]
        
        if len(f"{x}") != length:
            return f"0{x}"
    except:
        return x
    return x

In [85]:
files = ['../data_final/EC04_T.csv']

In [164]:
# The main loop!
# High level: Open each file, infer types. 
# Find index column, drop it. 
# Find leading G's, remove them.

# Start by looping through each fie
for file in files:
    
    # Infer types in order to figure out which numeric fields need to remain strings
    types = inferTypes(file)
    # Use those types and read in to pandas
    raw = pd.read_csv(file, encoding='latin-1', dtype=types)
    
    # Find the join column based on the last character before the file extension
    try:
        joinCol = joinCols[file.split('.csv')[0][-1]]
    # Others have it in the middle - (ツ)_/¯
    except:
        joinCol = joinCols[file.split('_')[2]]
    
    # IF the join column is not as expected, something is wrong. Some have a vestigial GISJOIN column
    # Otherwise, flag this to check out after the back
    if joinCol not in list(raw.columns):
        print(f'Warning - {file} does not have the proper join column')
        if 'GISJOIN' in list(raw.columns):
            pass
        else:
            continue
            
    for column in raw.columns:
        # If the dreaded GISJOIN, change the column name to the appropriate geographic join
        if column == 'GISJOIN':
            raw = raw.drop(columns=[joinCol])
            tempColumns = list(raw.columns)
            tempColumns[list(raw.columns).index('GISJOIN')] = joinCol
            raw.columns = tempColumns
            column = joinCol
            
        # Markers of an unnamed index column to be removed
        if (column == 'Unnamed: 0' or column == 'X') and checkIndexValues(raw[column][0:10]):
            raw = raw.drop(columns=[column])
            continue
            
        # Find leading G
        if checkIfHasLeadingG(raw[column][0]):
            raw.loc[:,column] = raw.loc[:,column].str.slice(1,-1)
    
    # This section is meant to appropriately pad with leading zeroes
    # However, some columns are not formatted consistently, such as just a county code, not state+county for countyFP
    # Better solution needed.
#     for col in expectedColumnLengths:
#         if col['name'] in list(raw.columns):
#             raw.loc[:, col['name']] = raw.loc[:, col['name']].apply(lambda x: pad(x, col['length']))
    
#     # Check for un-concatenated county and state fp columns
#     if 'STATEFP' in list(raw.columns) and 'COUNTYFP' in list(raw.columns):
#         if len(raw.iloc[0].COUNTYFP) == 3:
#             raw['COUNTYFP'] = raw['STATEFP'].astype(str) + raw['COUNTYFP']
            
#     # Correct missing or null STATEFP based on geoid or countyfp
#     if 'STATEFP' in list(raw.columns):
#         if 'GEOID' in list(raw.columns):
#             raw['STATEFP'] = raw['GEOID'].astype(str).str.slice(0,2)
#         elif 'COUNTYFP' in list(raw.columns):
#             raw['STATEFP'] = raw['COUNTYFP'].astype(str).str.slice(0,2)
            
#     # Correct missing or null STATEFP based on geoid or countyfp
#     if 'COUNTYFP' in list(raw.columns):
#         if 'GEOID' in list(raw.columns):
#             raw['COUNTYFP'] = raw['GEOID'].astype(str).str.slice(0,5)
            
    raw.round(2).to_csv(file, index=False)