In [1]:
from typing import Dict

import pandas as pd
import os

from datetime import datetime
from pytz import timezone

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
if not os.path.exists('/content/drive/MyDrive/'):
    raise Exception("Error: Mount Google Drive before continuing!")

BASE_DIR = '/content/drive/MyDrive/Data Science for Social Good - Spring 2022/data/'
SCRAPE_DIR = BASE_DIR + 'scraped_files/'
DATA_DIR = SCRAPE_DIR + 'DATA/'
CLEAN_DIR = SCRAPE_DIR + 'CLEAN/'

CURRENT_DATE = datetime.now(timezone('US/Eastern')).strftime("%m-%d-%Y")

# Formatter for Consistency

---

We aren't sure if there can be keys other than the ones listed below. 

For previously unseen keys, we use str.upper() to give them a default value because we want to be able to automate the web scraper, so we want to handle errors without stopping.

Any previously unseen keys should be added to the dictionary along with a value to ensure consistency.

---

## **Ensure all tuples end with a comma**

If we have a tuple of a single element, it should look like:  
(MY_ELEMENT,)

If we do (MY_ELEMENT) this will get parsed character by character, since (MY_ELEMENT) is equivalent to the string MY_ELEMENT.

In [4]:
INCONSISTENT_COLUMNS = ['Race', 'Sex', 'Eye Color', 'Hair Color']
UNKNOWN_VAL = 'N/A'

# We use many-to-one mappings (Dict[Tuple, str]) for convenience - easier to add mappings
# We convert them to one-to-one dictionaries later
RACE_MAPPINGS = {
    ('B', 'Black', 'BLACK',): 'BLACK',
    ('W', 'White', 'WHITE',): 'WHITE',
    ('H', 'Hispanic', 'HISPANIC',): 'HISPANIC',
    ('A', 'INDIAN',): 'ASIAN',
    ('Other',): 'OTHER',
    ('U', 'UNKNOWN', 'N/A',): UNKNOWN_VAL,
}

GENDER_MAPPINGS = {
    ('F', 'Female', 'FEMALE',): 'FEMALE',
    ('M', 'Male', 'MALE',): 'MALE',
    ('',): UNKNOWN_VAL,
}

EYE_COLOR_MAPPINGS = {
    ('GRN', 'Green', 'GREEN',): 'GREEN',
    ('HAZ', 'Hazel', 'HAZEL',): 'HAZEL',
    ('BLU', 'Blue', 'BLUE',): 'BLUE',
    ('BRO', 'Brown', 'BROWN',): 'BROWN',
    ('DARK BROWN',): 'DARK BROWN',
    ('GRY', 'Gray', 'GRAY',): 'GREY',
    ('BLK', 'Black', 'BLACK',): 'BLACK',
    ('MAROON',): 'MAROON',
    ('UNKN', 'Unknown', 'UNKNOWN',): UNKNOWN_VAL,
}

HAIR_COLOR_MAPPINGS = {
    ('BLK', 'Black', 'BLACK',): 'BLACK',
    ('BRO', 'Brown', 'BROWN',): 'BROWN',
    ('Blue', 'BLUE',): 'BLUE',
    ('GRY', 'Gray', 'GRAY',): 'GREY',
    ('WHI', 'White', 'WHITE',): 'WHITE',
    ('Red', 'RED',): 'RED',
    ('BAL', 'Bald', 'BALD',): 'BALD',
    ('Auburn', 'AUBURN',): 'AUBURN',
    ('Sandy', 'SANDY',): 'SANDY',
    ('BLN', 'Blond', 'BLOND', 'Blonde', 'BLONDE',): 'BLONDE',
    ('GREEN',): 'GREEN',
    ('Pink', 'PINK',): 'PINK',
    ('MULTICOLORED',): 'MULTICOLORED',
    ('XXX', 'Unknown', 'NONE',): UNKNOWN_VAL,
}

In [5]:
class Formatter():
    class FormatterDict(dict):
        """Custom class to handle missing keys by subclassing dict. Currently returns str.upper() for missing keys."""
        def __missing__(self, key: str) -> str:
            print("WARNING: Missing key:", key)
            return str.upper(key)

    def __init__(self) -> None:
        self.race_mappings = self._convert_mapping_to_dict(RACE_MAPPINGS)
        self.gender_mappings = self._convert_mapping_to_dict(GENDER_MAPPINGS)
        self.eye_color_mappings = self._convert_mapping_to_dict(EYE_COLOR_MAPPINGS)
        self.hair_color_mappings = self._convert_mapping_to_dict(HAIR_COLOR_MAPPINGS)

    def get_mappings(self, name: str) -> Dict:
        if name == 'Race':
            return self.race_mappings
        elif name == 'Sex':
            return self.gender_mappings
        elif name == 'Eye Color':
            return self.eye_color_mappings
        elif name == 'Hair Color':
            return self.hair_color_mappings
        else:
            raise Exception("ERROR: Unrecognized name", name)

    def format_column(self, column: pd.Series) -> pd.Series:
        # Remove any trailing whitespace
        column = column.str.rstrip()

        mappings = self.get_mappings(column.name)
        return column.map(mappings, na_action='ignore')

    def format_df(self, df: pd.DataFrame) -> pd.DataFrame:
        # Remove any columns not found in the dataframe
        columns_to_fix = [col for col in INCONSISTENT_COLUMNS if col in df.columns]

        # Fix inconsistent data formatting
        df[columns_to_fix] = df[columns_to_fix].apply(self.format_column)
        return df

    def _convert_mapping_to_dict(self, many_to_one: Dict) -> Dict:
        """Convert a many-to-one dictionary to a one-to-one dictionary.
        For example:
        {('key_1', 'key_2'): 'val'} -> {'key_1': 'val', 'key_2': 'val'}
        """
        one_to_one = self.FormatterDict()
        for key_tuple, val in many_to_one.items():
            # Safety check for user error
            if type(key_tuple) != tuple:
                raise Exception("ERROR: Key tuple entered incorrectly!", key_tuple)

            for key in key_tuple:
                one_to_one[key] = val
        return one_to_one

# Clean Data

---

This will automatically clean the data for the current day. If needed, this can be expanded to clean data from previous days.

In [6]:
DATES_TO_CLEAN = [CURRENT_DATE]
formatter = Formatter()

for date_to_clean in DATES_TO_CLEAN:
    print("Cleaning date:", date_to_clean)
    dir_to_clean = f'{DATA_DIR}{CURRENT_DATE}'
    for sub_dir, dirs, files in os.walk(dir_to_clean):
        for filename in files:
            cur_file = f'{sub_dir}/{filename}'

            df = pd.read_csv(cur_file)
            df = formatter.format_df(df)

            # Create directory if needed
            new_dir = f'{CLEAN_DIR}{date_to_clean}'
            os.makedirs(new_dir, exist_ok=True)

            df.to_csv(f'{new_dir}/{filename}')

Cleaning date: 03-20-2022
