In [719]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}



<IPython.core.display.Javascript object>

# Project: Adopt a Drain Data
 * Author: James Wilfong, wilfongjt@gmail.com

# Goals
* Merge drains from multiple communities across the watershed
* Resolve dataset conflicts i.e., drain identifyer uniqueness, data type, column names, and value range.

## Limits
 * Outputs from this script need to be manually pushed to the data.world repository (repo)
 * Doesn't handle deleted drains
 
## Assumptions
* Assume the community-dataset is a subset of all drains in the universe ;)
* Assume record identifiers are not unique across communities 
* Assume the community-dataset's column names are not the same across communities 
* Assume the community-dataset's data types are not the same across communities
* Assume the community-dataset's data-values ranges are not the same across communities
* Assume the community-dataset has duplicate records
* Assume the community-dataset's format is Comma Seperated Values (CSV) or Excel.

## Inputs
 * The current live dataset is downloaded from data.world 
 * Updates are put into the raw-data/adopt-a-drain of the data.world repo.

## Process
  The process is initiated by running this Jupyter Notebook.
 * load current drains dataset from data.world
 * load dataset(s) from raw-data/adopt-a-drain folder
 * clean data: create unique id from facility id etal.
 * clean data: remove characters from facility ids
 * clean data: map input columns to expected output columns
 * clean data: fix common data problems
 * condense data: remove drains with no facility id
 * condense data: remove unneeded columns
 * condense data: remove outliers
 * condense: merge dataworld and new data 
 * condense data: remove duplicate drains (keep the first duplicate)
 * concat: Make one big dataset from one or many

## Outputs
 * Clean data is output to the clean-data/adopt-a-drain folder.
 * save big dataset to clean-data/adopt-a-drain/grb_drains.csv
 * save big dataset to clean-data/grb_drains-2019-08-020.csv
 * 
 
## Next Steps
 * Push updates to repo
 * Sync data from github to dataworld
     * Sync Manually
     * or Wait for the weekly auto-sync
 * Update the Adopt a Drain database
     * Run Ruby rake process in Heroku 

In [720]:
# %matplotlib notebook
import settings
from IPython.display import display, HTML
from IPython.display import Markdown
from lib.p3_ProcessLogger import ProcessLogger

import helper

In [721]:
import os.path
from os import path
import sys
import time
import numpy as np 
import pandas as pd
import csv # read and write csv files
from pprint import pprint
import os

import datadotworld as dw
from shutil import copyfile
# import subprocess

# convenience functions -- cleaning
# cell_log.collect('* Import custom packages')
from lib.p3_CellCounts import CellCounts
# import lib.p3_clean as clean
from lib.p3_configuration import get_configuration
import lib.p3_explore as explore
#import lib.p3_gather as gather # gathering functions
# import lib.p3_helper_functions as helper
import lib.p3_map as maps
from lib.p3_ProcessLogger import ProcessLogger

# IMPORTANT
## Configuring the Data Transfer
Configure before running "RUN All" in the Cell menu

In [722]:


'''
    Input: CSV file in raw_data/ folder
    Process: clean (conform, condence)
    Output: is directed to the clean-data/ folder
    
    Name the output file using OUTPUT_FILE
    OUTPUT_FILELOCAL_CLEAN_NAME is used to name the data.world table
    Table names should start with letter, may contain letters, numbers, underscores
    
'''
cell_log = ProcessLogger() 
cell_log.clear()
table_name='grb_drains'

metadata = {
    'output_file_name': '{}-{}.csv'.format(table_name, helper.get_daystamp(),'csv'),
    'copy_file_name': '{}.csv'.format(table_name),
    'gh_file_type': 'csv',
    'title': 'GRB Storm Drains',
    'desc': 'Storm Drains of the Grand River Basin, Michigan',
    'table_name': table_name
}

helper.exportMaintainerConfig(metadata['output_file_name'], 
                              metadata['gh_file_type'], 
                              metadata['title'], 
                              metadata['desc'], 
                              metadata['table_name'])    
print('Metadata =============')
pprint(metadata)


kill log:  ./maintainer/maintainer-config.json
{'copy_file_name': 'grb_drains.csv',
 'desc': 'Storm Drains of the Grand River Basin, Michigan',
 'gh_file_type': 'csv',
 'output_file_name': 'grb_drains-2019-09-026.csv',
 'table_name': 'grb_drains',
 'title': 'GRB Storm Drains'}


In [723]:
'''
    Assemble Names of:
        Application,
        Raw data file,
        Clean data file
'''

local_config = { 
                 "app_name": helper.get_app_name(),
                 "local_raw": None,
                 "local_clean": None
               }

'''
    ------------- configure outliers
'''
# _outliers = {
outlier_settings = {
  'outliers': [
    {'column':'dr_lon',
     'range':(-90.0, -80.0),
     'reason':'Remove {} observations too far west or east.',
     'count': 0
    },  
    {'column':'dr_lat',
     'range':(40.0, 50.0),
     'reason':'Remove {} observations too far north or south.',
     'count': 0
    }
  ]
}  
ENV_ERROR=False
print("local_config ===========")
pprint(local_config)
print("outlier_settings ===========")
pprint(outlier_settings)
#print('Local_RAW_FILE: ', LOCAL_RAW_FILE)
#print('LOCAL_CLEAN_FILE: ',LOCAL_CLEAN_FILE )

{'app_name': 'adopt-a-drain', 'local_clean': None, 'local_raw': None}
{'outliers': [{'column': 'dr_lon',
               'count': 0,
               'range': (-90.0, -80.0),
               'reason': 'Remove {} observations too far west or east.'},
              {'column': 'dr_lat',
               'count': 0,
               'range': (40.0, 50.0),
               'reason': 'Remove {} observations too far north or south.'}]}


In [724]:
if ENV_ERROR:
    cell_log.collect("# Script Failure!!")
    cell_log.collect("# !!! Missing Environment Variables !!!")
    cell_log.collect("### see [Environment Variable Setup](#env-setup)")

Markdown('''{}'''.format(cell_log.getMarkdown()))



In [725]:
# common names from imported files and how then map to actual names

maps ={
    "commonNameMap": { 
        "subtype": "dr_subtype",
        "jurisdicti": "dr_jurisdiction",
        "drain__owner": "dr_owner",
        "owner":"dr_owner",
        "local__id": "dr_local_id",
        "facilityid": "dr_facility_id",
        "drain__jurisdiction": "dr_jurisdiction",
        "subwatershed": "dr_subwatershed",
        "subbasin": "dr_subwatershed",
        "point__x":"dr_lon", 
        "long": "dr_lon",
        "point__y":"dr_lat",
        "lat":"dr_lat",
        "soure__id": "del_source_id"
    },

    "region_map": {
        "Kent County Road Commission": "KCRC",
        "KENT COUNTY ROAD COMMISSION":"KCRC",
        "City of East Grand Rapids": "EGR",
        "City of Grandville": "GRANDV",
        "City of Wyoming": "CWY",
        "City of Kentwood": "CK",
        "Grand Rapids Township": "GRTWP",
        "City of Walker": "CW",
        "CGR": "CGR",
        "City of Grand Rapids": "CGR",
        "Georgetown Township": "GTWP",
        "City of Hudsonville": "CHV",
        "Jamestown Township": "JTTWP",
        "Cascade Township": "CASTWP",
        "Algoma Township": "ALGTWP",
        "Grattan Township": "GRATWP",
        "Gaines Township": "GAITWP",
        "Vergennes Township": "VERTWP",
        "Lowell Township": "LOWTWP",
        "Oakfield Township": "OAKTWP",
        "Cannon Township": "CANTWP",
        "Sparta Township": "SPATWP",
        "Solon Township": "SOLTWP",
        "Ada Township": "ADATWP",
        "City of Lowell": "CLO",
        "Bowne Township": "BOWTWP",
        "Tyrone Township": "TYRTWP",
        "Caledonia Township": "CALTWP",
        "Courtland Township": "COUTWP",
        "Spencer Township": "SPETWP",
        "Village of Sparta": "VSP",
        "BYRON TOWNSHIP": "BYRTWP",
        "CALEDONIA TOWNSHIP": "CALETWP",
        "City of Rockford": "CRF",
        "Alpine Township": "ALPTWP",
        "Plainfield Township": "PLATWP",
        "Byron Township": "BYRTWP",
        "OCWRC": "OCWRC",
        "City of Grand Haven DPW":"CGH",
        "Village of Spring Lake DPW": "VSL",
        "Ottawa County Road Commission": "OCRC",
        "OCRC": "OCRC",
        "Ottawa County Water Resource Commissioner":"OCRC",
        "Village of Fruitport":"VF"
    }
}
# if you add a temporary column, then add to this list remove before saving 
extraColumns = ['del_source',
              'del_fid',
              'del_gid',
              'source_code',
              'dr_local_id',
              'dr_facility_id',
              'dr_location',
              'dup_coordinate']
# 
expected_process_columns_list = ['facility_prefix', 
                         'dr_subtype', 
                         'dr_jurisdiction', 
                         'dr_owner', 
                         'dr_subwatershed', 
                         'dr_lon', 
                         'dr_lat', 
                         'dr_asset_id', 
                         'dr_type', 
                         'dr_sync_id',
                         'dr_discharge']

# add column 

expected_output_columns_list= ['dr_asset_id',
                              'dr_jurisdiction',
                              'dr_lat',
                              'dr_lon',
                              'dr_owner',
                              'dr_subtype',
                              'dr_subwatershed',
                              'dr_type',
                              'dr_discharge' 
                             ]
    


In [726]:
class Process():
    def __init__(self):
        self.summary={} # stash process results, counts, here
        self.logger = ProcessLogger('./logs/{}.log'.format(self.getClassName()))
        
    def getClassName(self):
        return self.__class__.__name__
    
    def getLogger(self):
        return self.logger
    
    def getSummary(self):
        return self.summary
    
    def setSummary(self, summary):
        self.summary = summary
        return self
    
    def filename(self, in_f):
        ps = in_f.split('/')
        return ps[len(ps)-1]
    
    def process(self):
        raise Exception('Overload process() in {}'.format(self.getClassName())) 
        
    def run(self):
        self.process()
        return self   

class Load(Process):
    def __init__(self, import_file_name):
        Process.__init__(self)
        # import_file_name is  full local file name or url to source
        self.import_file_name=import_file_name
        self.dataframe=None
        
    def set_dataframe(self, dataframe):
        self.dataframe = dataframe
    
    def get_dataframe(self):
        return self.dataframe
        
class LoadDrains(Load):
    def __init__(self, import_file_name):
        Load.__init__(self,import_file_name)
        # self.import_file_name=import_file_name
        self.summary_key='02'
    
    def get_class_key(self):
        return '{}.{}'.format(self.summary_key, self.getClassName())
    
    def get_app_name(self):
        '''
        returns application name from script path
        '''
        scripts_path = os.getcwd()
        rc = ''
        pth = scripts_path.split('/')
        rc = pth[len(pth)-1]
        return rc

    def get_repo_folder(self):
        '''
        returns path to the repo folder from script path
        '''
        scripts_path = os.getcwd()
        rc = ''
        rc = scripts_path.replace('/' + self.get_app_name(), '').replace('/scripts','')
        return rc
    
    def get_raw_data_folder(self):
        '''
        returns path to raw data from script path
        '''
        scripts_path = os.getcwd()
        return self.get_repo_folder() + '/raw-data/' + self.get_app_name()
    
    def filename(self, in_f):
        ps = in_f.split('/')
        return ps[len(ps)-1]
    

    
    def process(self):
        print('* Load Drains', self.filename(self.import_file_name))
        self.getSummary()[self.get_class_key()]={}
        self.getSummary()[self.get_class_key()]['name']= self.filename(self.import_file_name) 
        self.getSummary()[self.get_class_key()]['before']=0

        '''
        import_file_name is the full path and name of import file
        returns the original raw data as pandas dataframe
        '''
       
            
        self.dataframe = pd.read_csv(self.import_file_name)
        
        #if 'load' not in self.getSummary():
        #    self.getSummary()['load']=[]
            
               
        self.getSummary()[self.get_class_key()]['after']= len(self.dataframe)
        #self.getSummary()['load'].append({'name': self.filename(self.import_file_name), 'records': len(self.dataframe)})
        diff = self.getSummary()[self.get_class_key()]['after']  - self.getSummary()[self.get_class_key()]['before']
        self.getSummary()[self.get_class_key()]['diff'] = diff

class LoadDataWorld(Load):
    '''
    creates a dataframe with a fresh copy of the data.world dataset 
    dont forget to run
    '''
    def __init__(self, import_file_name):
        Load.__init__(self,import_file_name)
        self.summary_key='01'
    
    def get_class_key(self):
        return '{}.{}'.format(self.summary_key, self.getClassName())
    #def get_dataframe(self):
    #    return self.dataframe
    
    def addColumns(self):
        if 'dr_discharge' not in self.dataframe.columns.values:
            self.dataframe['dr_discharge'] = self.dataframe['dr_subwatershed']
        
    def process(self):
        print('* Load Data.World')
        self.getSummary()[self.get_class_key()]={}
        self.getSummary()[self.get_class_key()]['name']= self.filename(self.import_file_name) 
        self.getSummary()[self.get_class_key()]['before']=0
        '''
        import_file_name is the full path and name of import file
        returns the original raw data as pandas dataframe
        '''
        # download to ~/.dw/cache/{}/latest/data/grb_drains.csv
        self.dataframe = dw.load_dataset(self.import_file_name, auto_update=True)
        fstr = '~/.dw/cache/{}/latest/data/grb_drains.csv'.format('citizenlabs/grb-storm-drains-2019-04-03')
        # 
        self.dataframe = pd.read_csv(fstr)
        
        self.addColumns()
        
        for col in self.get_dataframe().columns.values:
            print( ' -- column:', col)
            
        # SUMMARIZE
        self.getSummary()[self.get_class_key()]['after']= len(self.dataframe)
        diff = self.getSummary()[self.get_class_key()]['after']  - self.getSummary()[self.get_class_key()]['before']
        self.getSummary()[self.get_class_key()]['diff'] = diff

        #if 'load' not in self.getSummary():
        #    self.getSummary()['load']=[]

        #self.getSummary()['load'].append({'name': self.filename(self.import_file_name), 'start': len(self.dataframe)})
        
#LoadDataWorld('xxx.xxx').getLogger().log('hi') 
        
#test_import_file_name = '/Users/jameswilfong/Documents/Github/Wilfongjt/01-AAD-data-world/01-In-Progress/00-03-load-spring-lake/data.world/raw-data/adopt-a-drain/CatchBasins_7_17_2019.xls.csv'
# assert Load(testfile).get_app_name() == 'adopt-a-drain'
#print(Load().get_repo_folder())
#print(Load().get_raw_data_folder())
#print(helper.get_raw_files('csv'))
# assert Load().get_repo_folder() == 'adopt-a-drain'
#
#df=LoadCSV(test_import_file_name).run().get_dataframe()
#df.info
#df.head


#df=LoadDW('citizenlabs/grb-storm-drains-2019-04-03').run().get_dataframe()

#print(df)
#df.info
#df.head


          
process= Process()
load = Load('afilename')
loadDrains = LoadDrains('afilename')
loadDataWorld = LoadDataWorld('afilename')
assert loadDataWorld.import_file_name == 'afilename'
assert loadDataWorld.getSummary() == {} 
assert loadDrains.import_file_name == 'afilename'
assert loadDrains.getSummary() == {}  
assert load.import_file_name == 'afilename'
assert load.getSummary() == {}  
assert process.getSummary() == {} 

In [727]:
class PatchDW20190925(LoadDataWorld):
    '''
    Delete records with duplicate coordinates from data.world dataset
    * These records were inserted from file CatchBasins_7_17_2019.xls
    * This patch was run only once on 09-25-2019
    * This patch affects the dataworld file citizenlabs/grb-storm-drains-2019-04-03
    * The CatchBasins_7_17_2019.xls were replaced by AAD_Grandville_fruitport_SpringLake_OCRC9162019
    * A record of this patch can be found in Patch20190925.log
    * Patch will not run if PatchDW20190925.log is found in logs/Patch20190925.log 
    * replace with LoadDataWord once everything passes
    '''    
    def __init__(self, import_file_name):
        LoadDataWorld.__init__(self, import_file_name)
        self.summary_key='01'
    
    def get_class_key(self):
        return '{}.{}'.format(self.summary_key, self.getClassName())

    def process(self):
        super().process()
        self.getLogger().kill()
        # skip if already run
        if path.exists('./logs/{}.log'.format(self.getClassName())):
            print('Already ran {}...skipping'.format(self.getClassName()))
            return 
    
        self.getSummary()[self.get_class_key()]={}
        self.getSummary()[self.get_class_key()]['name']= self.filename(self.import_file_name) 
        self.getSummary()[self.get_class_key()]['before']=len(self.get_dataframe())
        
        data = self.get_dataframe().set_index("dr_jurisdiction")
        
        data = data.drop("Ottawa County Road Commission", axis=0) # Delete all rows
        
        data = data.drop("Village of Spring Lake DPW", axis=0) # Delete all rows 
        
        data = data.drop("City of Grandville", axis=0) # Delete all rows 
        
        self.set_dataframe(data)
        #print(self.get_dataframe())
        self.getSummary()[self.get_class_key()]['after']=len(self.get_dataframe())
        diff = self.getSummary()[self.get_class_key()]['after']  - self.getSummary()[self.get_class_key()]['before']
        self.getSummary()[self.get_class_key()]['diff'] = diff
        self.getLogger().log('Run')
        
#patch = PatchDW20190925('ssssss.ss').run()
#patch.getLogger().log('some')

In [728]:
class Clean(Process):
    def __init__(self, df_source, commonNameMap, region_map):
        Process.__init__(self)
        self.dataframe = df_source
        self.commonNameMap = commonNameMap
        self.region_map=region_map
        
class CleanDrains(Clean):
    def __init__(self, df_source, commonNameMap, region_map):
        Clean.__init__(self,df_source, commonNameMap, region_map)
        self.summary_key='03'
    
    def get_class_key(self):
        return '{}.{}'.format(self.summary_key, self.getClassName())
    
    def clean_column_names(self):
        '''
        convert each column to lowercase with underscore seperation

        e.g., ID to id
        e.g., County ID to county_id
        e.g., County-ID to county_id
        :param actual_col_list: list of column names
        :return: clean list of column names

        {
          'field-name': {}
        }

        '''
        # start_time = time.time()
        
        actual_col_list = self.dataframe.columns
        clean_column_names = {}
        for cn in actual_col_list:
        
            ncn = cn
            # get rid of some unwanted characters

            if ' ' in cn:
                ncn = cn.replace(' ','_')

            if '-' in cn:
                ncn = cn.replace('-', '_')

            # force first char to lower case
            nncn = ncn
            ncn = ''
            prev_upper = True #False
            case = False
            camelcase = False
            for c in nncn:
                if c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
                    case = True
                    if prev_upper:
                        ncn += c.lower()
                    else:
                        ncn += '_' + c.lower()
                        camelcase = True
                    prev_upper = True
                else:
                    ncn += c
                    prev_upper = False

            clean_column_names[cn]=ncn

        return self.dataframe.rename(columns=clean_column_names)
        # print('* clean_column_names: {} sec'.format(time.time() - start_time))  # time_taken is in seconds
    
    
    def inferName(self, col_name):
        '''
        select a column name based on previous names found in file
        '''
        names = { 
            "subtype": "dr_subtype",
            "jurisdicti": "dr_jurisdiction",
            "drain__owner": "dr_owner",
            "owner":"dr_owner",
            "local__id": "dr_local_id",
            "facilityid": "dr_facility_id",
            "drain__jurisdiction": "dr_jurisdiction",
            "subwatershed": "dr_subwatershed",
            "subbasin": "dr_subwatershed",
            "point__x":"dr_lon", 
            "long": "dr_lon",
            "point__y":"dr_lat",
            "lat":"dr_lat",
            "soure__id": "del_source_id"}

        if not col_name in self.commonNameMap:
            # mark madeup names for easy id later
            return 'del_{}'.format(col_name)
        
        return self.commonNameMap[col_name]

    def getColumnDict(self):

        col_dict = {}
        for nm in self.dataframe.columns.values:
            col_dict[nm]=self.inferName(nm)  
        return col_dict
    
    def remove_char(self,columnList):
        '''
        some facillity ids have characters mixed wtih number
        we need just the number part
        this function removes all characters from the facility id
        '''
        newList = []

        for item in columnList:
            fi = ''
            for ch in str(item):
                if ch in '0123456789':
                    fi += ch
                else:
                    fi += '0'
            newList.append(fi)

        return newList
        
    def regional_codes(self, df_source , _owner):
        '''
        regional codes identify the data's source community
        code are added over time. this method checks and throws error not found.
        fix by adding new owner and code to list below
        '''
        #print('regional_code 1')
        rc = []

        # look at data in in the _owner column
        for jur in self.dataframe[_owner]:
            # check if jur is in the codes
            if jur in self.region_map:
                rc.append(self.region_map[jur])
            else:
                print('bad name', )
                raise Exception('Regional-Code for ({}) is not available... add new '.format(jur)) 
                #rc.append('XXX')

        return rc    
    
    def get_dataframe(self):
        return self.dataframe
    
    def process(self):
        self.getSummary()[self.get_class_key()]={}
        self.getSummary()[self.get_class_key()]['before']=len(self.get_dataframe())
        '''
        clean up the df_source
        '''
        self.dataframe = self.clean_column_names()
        self.dataframe = self.dataframe.rename(columns=self.getColumnDict())
        
        # create col to compare duplicate coordinates
        print(' -- add coordinate')
        self.dataframe['dup_coordinate'] = '(' + self.dataframe['dr_lon'].astype(str) + ' ' + self.dataframe['dr_lat'].astype(str) + ')'
        #print('dup_coordinate',self.dataframe['dup_coordinate'])
        # patch up bad owner and jurisdiction names
        print(' -- replace dr_jurisdiction with dr_owner')
        self.dataframe['dr_jurisdiction'] = self.dataframe['dr_owner'] # is what it is
        
        # mark all empties with nan
        print(' -- identify empty dr_facility_ids')
        self.dataframe['dr_facility_id'] = self.dataframe['dr_facility_id'].apply(lambda x:  np.nan if x != x or x == '' or x == ' ' or x == None else x)
        
        # some dr_facilities have alfa numeric values ... clean up
        print(' -- remove char from dr_facility_id')
        self.dataframe['dr_facility_id'] = self.remove_char(self.dataframe['dr_facility_id'])
        
        # add colunm to id the source of data records
        print(' -- create field: source_code from dr_owner')
        self.dataframe['source_code'] = self.regional_codes( self.dataframe , 'dr_owner')
        
        # convert typ to integer
        print(' -- int-ify field: dr_facility_id')
        self.dataframe['dr_facility_id'] = self.dataframe['dr_facility_id'].astype('int64')
        
        # create final id aka dr_asset_id
        print(' -- create field: dr_asset_id from source_code, dr_facility_id')
        self.dataframe['dr_asset_id'] = self.dataframe['source_code'] + '_'+ self.dataframe['dr_facility_id'].astype(str)
        
        print(' -- create field: dr_type from constant "Storm Water Inlet Drain"')
        self.dataframe['dr_type'] = self.dataframe['dr_asset_id'].apply(lambda x: 'Storm Water Inlet Drain')
        
        # add dr_discharge column
        print(' -- create field: dr_discharge from dr_subwatershed')
        self.dataframe['dr_discharge'] = self.dataframe['dr_subwatershed']
        #self.dataframe['dr_discharge'].apply(lambda x: 'none' if x != x or x == '' or x == ' ' or x == None else x)
        
        #print('CleanDrain', self.get_dataframe()['dr_discharge'])


        #print(self.dataframe['dr_discharge'])
        
        self.getSummary()[self.get_class_key()]['after']=len(self.get_dataframe())
        diff = self.getSummary()[self.get_class_key()]['after']  - self.getSummary()[self.get_class_key()]['before']
        self.getSummary()[self.get_class_key()]['diff'] = diff
        
clean = Clean(None, maps['commonNameMap'], maps['region_map'])
cleanDrains = CleanDrains(None, maps['commonNameMap'], maps['region_map'])
assert cleanDrains.getSummary()=={}
assert clean.getSummary()=={}

#CleanDrains(self.get_dataframe(), self.maps['commonNameMap'], self.maps['region_map'])

In [729]:
class Outputs(Process):
    def __init__(self, df_source, expected_output_columns_list):
        Process.__init__(self)
        self.dataframe = df_source
        self.expected_output_columns_list=expected_output_columns_list
        

class OutputDrains(Outputs):
    def __init__(self, df_source, expected_output_columns_list):
        Outputs.__init__(self, df_source, expected_output_columns_list)
        self.summary_key='05'
    
    def get_class_key(self):
        return '{}.{}'.format(self.summary_key, self.getClassName())
    
    def get_dataframe(self):
        return self.dataframe
    
    def set_dataframe(self, dataframe):
        self.dataframe = dataframe
    
    def validateOutputColumns(self):
        # examine for extra cols not needed for data.world
        for nm in self.get_dataframe().columns.values:
            if not nm in self.expected_output_columns_list:   
                raise Exception('{} is unexpected output for clean data'.format(nm))


    def toCSV(self):    
        self.get_dataframe().to_csv(local_config["local_clean"], index=False)

    def process(self):
        print('* Outputs')
        self.validateOutputColumns()
        self.toCSV() # write data to disk
        
        # print(' - output folder: ','data.world/clean-data/adopt-a-drain/')
        print(' - output file: ','/data.world/clean-data/adopt-a-drain/', metadata['output_file_name'] )
        print(' - data.world file: ','/data.world/clean-data/adopt-a-drain/', metadata['copy_file_name'] )
        
        for colname in self.get_dataframe().columns.values:
            print(' -- column: ', colname )
        
        # OUTPUT_FILE_NAME
        
        ifn = '{}/{}'.format(helper.get_clean_data_folder(), metadata['output_file_name'])
        ofn = '{}/{}'.format(helper.get_clean_data_folder(), metadata['copy_file_name'])

        copyfile(ifn, ofn)
        # set up a smaller version of file
        tfn = '{}/{}'.format(helper.get_test_version_folder(), metadata['copy_file_name'])
        #df_small = df_source.query("dr_jurisdiction = 'City of Grand Rapids'")
        print(' - make short file for testing: ', '/data.world/test-data/adopt-a-drain/', metadata['copy_file_name'])
        df_small=self.get_dataframe().query("dr_jurisdiction == 'City of Grand Rapids'").head(5000)
        df_small.to_csv( tfn, index=False)
        
outputs = Outputs(None,expected_output_columns_list)
outputDrains = OutputDrains(None,expected_output_columns_list)
assert outputDrains.getSummary()=={}
assert outputs.getSummary()=={}

In [730]:
class Condense(Process):
    def __init__(self, dataframe, expected_output_columns_list, extraColumns, outlier_settings):
        Process.__init__(self)
        self.dataframe = dataframe
        self.expected_output_columns_list=expected_output_columns_list
        self.extraColumns = extraColumns
        self.outlier_settings = outlier_settings
        self.summary_key='06'
    
    def get_class_key(self):
        return '{}.{}'.format(self.summary_key, self.getClassName())
        
    def validateColumns(self):  
        print(' - validate columns')
        '''
        check 's column name for the expected colnames
        '''
        for nm in self.expected_output_columns_list:
            if not nm in self.get_dataframe().columns.values:
                raise Exception('{} is missing from '.format(get_dataframe().format(nm)) )
                
    def removeExtraColumns(self):
        print(' - Remove extra columns')
        for colname in self.extraColumns:
            if( colname in self.get_dataframe().columns.values):
                print(' -- drop column: ',colname)
                self.set_dataframe(self.get_dataframe().drop([colname], axis=1))
                
    def remove_obvious_outliers(self):
        print(' - remove outliers')
        '''
        remove individual observations
        remove range of observation

        '''
        
        for outlier in self.outlier_settings['outliers']:
            # pprint(outlier)
            col_name = outlier['column']

            if 'range' in outlier:

                low = outlier['range'][0]
                high = outlier['range'][1]
                sz = len(self.get_dataframe())

                tmp = None
                tmp1 = ''

                if isinstance(low, np.datetime64):
                    self.set_dataframe(
                      self.get_dataframe()[(self.get_dataframe()[col_name].to_datetime() >= low) & (self.get_dataframe()[col_name].to_datetime() <= high)]
                    )
                else:   
                    self.set_dataframe(
                        self.get_dataframe()[(self.get_dataframe()[col_name] >= low) & (self.get_dataframe()[col_name] <= high)]
                    )
                outlier["count"] = sz - len(self.get_dataframe())

            elif 'categories' in outlier:
                _list = outlier['categories']
                sz = len(self.get_dataframe())
                self.set_dataframe(
                    self.get_dataframe()[self.get_dataframe()[col_name].isin(_list)]
                )
                outlier["count"] = sz - len(self.get_dataframe())
            if "reason" in outlier:
                outlier["reason"] = outlier["reason"].format(  str(outlier["count"]) )

    def remove_duplicate_assets(self):
        print(' - drop duplicate assets')
        
        sz1 = len(self.get_dataframe())
        self.set_dataframe(self.get_dataframe().drop_duplicates('dr_asset_id',keep='first'))
        sz2 = len(self.get_dataframe())
        
        if 'duplicates' not in self.getSummary()[self.get_class_key()]:
            self.getSummary()[self.get_class_key()]['duplicates']=[]
         
        self.getSummary()[self.get_class_key()]['duplicates'].append({'dr_asset_id': {'before':sz1, 'after':sz2, 'diff':(sz2-sz1)}})
        #self.summary['duplicates'].append({'dr_asset_id': {'before':sz1, 'after':sz2, 'diff':(sz1-sz2)}})

        # self.summary['duplicates'].append({'dr_asset_id':(sz1-sz2)})

    def remove_duplicate_coordinates(self):
        print(' - drop duplicate coordinates')
        print(' - cols ', self.get_dataframe().columns.values)
        
        #print(' -- add coordinate')
        #self.get_dataframe()['dup_coordinate'] = '(' + self.dataframe['dr_lon'].astype(str) + ' ' + self.dataframe['dr_lat'].astype(str) + ')'
        
        sz1 = len(self.get_dataframe())
        
        self.set_dataframe(self.get_dataframe().sort_values('dup_coordinate'))
        self.set_dataframe(self.get_dataframe().drop_duplicates('dup_coordinate',keep='first'))
        
        sz2 = len(self.get_dataframe())
        
        if 'duplicates' not in self.getSummary()[self.get_class_key()]:
            self.getSummary()[self.get_class_key()]['duplicates']=[]
                
        self.getSummary()[self.get_class_key()]['duplicates'].append({'coordinates': {'before':sz1, 'after':sz2, 'diff':(sz2-sz1)}})

        #self.summary['duplicates'].append({'coordinates': {'before':sz1, 'after':sz2, 'diff':(sz1-sz2)}})
        #self.summary['duplicates'].append({'coordinates':(sz1-sz2)})

    def get_dataframe(self):
        return self.dataframe
    
    def set_dataframe(self, dataframe):
        self.dataframe = dataframe  
    
    def process(self):
        print('* Condense')
        self.getSummary()[self.get_class_key()]={}
        self.getSummary()[self.get_class_key()]['before']=len(self.get_dataframe())
        self.remove_duplicate_assets()
        # self.remove_duplicate_coordinates()
        self.remove_obvious_outliers()
        
        self.removeExtraColumns()
        
        self.validateColumns()
        self.getSummary()[self.get_class_key()]['after']=len(self.get_dataframe())
        diff=(self.getSummary()[self.get_class_key()]['before']-self.getSummary()[self.get_class_key()]['after'])
        self.getSummary()[self.get_class_key()]['diff'] = diff
        
        
class RowCondense(Condense):
    def __init__(self, dataframe, expected_output_columns_list, extraColumns, outlier_settings):
        Condense.__init__(self, dataframe, expected_output_columns_list, extraColumns, outlier_settings)
        self.summary_key='04'
    
    def get_class_key(self):
        return '{}.{}'.format(self.summary_key, self.getClassName())
    
    def process(self):
        print('* RowCondense')
        self.getSummary()[self.get_class_key()]={}
        self.getSummary()[self.get_class_key()]['before']=len(self.get_dataframe())

        self.remove_duplicate_assets()
        self.remove_duplicate_coordinates()
        self.remove_obvious_outliers()    

        self.getSummary()[self.get_class_key()]['after']=len(self.get_dataframe())
        
        diff=(self.getSummary()[self.get_class_key()]['after']-self.getSummary()[self.get_class_key()]['before'])
        self.getSummary()[self.get_class_key()]['diff'] = diff
        

class ColCondense(Condense):
    def __init__(self, dataframe, expected_output_columns_list, extraColumns, outlier_settings):
        Condense.__init__(self, dataframe, expected_output_columns_list, extraColumns, outlier_settings)
        self.summary_key='05'
    
    def get_class_key(self):
        return '{}.{}'.format(self.summary_key, self.getClassName())
    
    def process(self):
        print('* ColCondense')
        self.getSummary()[self.get_class_key()]={}
        self.getSummary()[self.get_class_key()]['from']=[c for c in self.get_dataframe().columns]
        
        self.removeExtraColumns()
        
        self.validateColumns()
        self.getSummary()[self.get_class_key()]['to']=[c for c in self.get_dataframe().columns]
        #diff=(self.getSummary()[self.get_class_key()]['before']-self.getSummary()[self.get_class_key()]['after'])
        #self.getSummary()[self.get_class_key()]['diff'] = diff
        
                
        
condense=Condense(None, expected_output_columns_list, extraColumns, outlier_settings)
assert loadDataWorld.import_file_name == 'afilename'
assert loadDataWorld.getSummary() == {}  

In [731]:
class Wrangle(Process):
    def __init__(self, maps, expected_output_columns_list):
        Process.__init__(self)
        self.maps=maps
        self.expected_output_columns_list=expected_output_columns_list
        self.summary_key='00'
    
    def get_class_key(self):
        return '{}.{}'.format(self.summary_key, self.getClassName())
    
    def get_dataframe(self):
        return self.dataframe
    
    def set_dataframe(self, dataframe):
        self.dataframe = dataframe
    
    def xls2csv(self,xls_name):
        import xlrd
        # generates a csv file from the first sheet in an excel file

        wb = xlrd.open_workbook(xls_name)
        sh = wb.sheet_by_index(0)
        your_csv_file = open('{}.csv'.format(xls_name), 'w', encoding='utf8')

        wr = csv.writer(your_csv_file, quoting=csv.QUOTE_ALL)
        for rownum in range(sh.nrows):
            wr.writerow(sh.row_values(rownum))

        your_csv_file.close()


    def filename(self, in_f):
        ps = in_f.split('/')
        return ps[len(ps)-1]
        
    def conversions(self):
        for xls in helper.get_raw_files('xls'):
            print(' -- convert ', xls)
            self.xls2csv(xls)
        for xlsx in helper.get_raw_files('xlsx'):
            print(' -- convert ', xlsx)
            self.xls2csv(xlsx)    
    '''        
    def finalCSV(self):
        
        self.get_dataframe().to_csv(local_config["local_clean"], index=False)
    '''    
    def process(self):
        print('* Wrangle')
        
        print('self.get_class_key()', self.get_class_key())
        #self.getSummary()[self.get_class_key()]={}
        #self.getSummary()[self.get_class_key()]['before']=0
        
        # get list of raw data files
        print(' - raw folder ', helper.get_raw_data_folder())
        # print(helper.get_raw_files('csv'))
        raw_folder = helper.get_raw_data_folder()
        clean_folder = helper.get_clean_data_folder()

        concat_list = []
        #* load data
        #* convert xls to csv
        #* fix column names
        #* map expected colums to raw-data columns
        #* drop drains without a facility id
        #* fix column types

        self.conversions() # convert excel files to csv
        print('####### LOAD DW 01')
        # load these up first

        loadDataWorld = PatchDW20190925(dw_source)

        loadDataWorld\
            .setSummary(self.getSummary())\
            .run() # a
        concat_list.append( loadDataWorld.get_dataframe() )     
        
        # load up the files in the raw data folder
        for in_f in helper.get_raw_files('csv'):
            # print(' - raw: ', self.filename(in_f))
            print('####### LOAD DRAINS 02')

            # LOAD
            loadDrains = LoadDrains(in_f)\
                .setSummary(self.getSummary())\
                .run() # b
                    
            self.set_dataframe( loadDrains.get_dataframe())
            
             # CLEAN
            print('####### CLEAN DRAINS 03')
            cleanDrains=CleanDrains(self.get_dataframe(), 
                                            self.maps['commonNameMap'], 
                                            self.maps['region_map'])\
                .setSummary(self.getSummary())\
                .run() # c
                
            self.set_dataframe( cleanDrains.get_dataframe() )
            
            # CONDENCE individual datasets
            print('####### ROWCONDENCE DRAINS 04')

            rowCondense = RowCondense(self.get_dataframe(),\
                                     expected_output_columns_list,\
                                     extraColumns,\
                                     outlier_settings)\
                .setSummary(self.getSummary())\
                .run() # d
                
            self.set_dataframe(rowCondense.get_dataframe())
            
            print('####### COLCONDENCE DRAINS 04')

            colCondense = ColCondense(self.get_dataframe(),\
                                     expected_output_columns_list,\
                                     extraColumns,\
                                     outlier_settings)\
                .setSummary(self.getSummary())\
                .run() # d
                
            self.set_dataframe(colCondense.get_dataframe())
            
            # COMPILE
            print('####### COMPILE DRAINS')
            concat_list.append(self.get_dataframe())
                               
        '''
        --------------------------------- combine datasets
        '''  
        print('concat_list', len(concat_list))
        for ds in concat_list:
            print('df cnt',len(ds))
        self.set_dataframe( pd.concat(concat_list) )
        
        '''
        --------------------------------- Condense dataset (cols, rows)
        '''

        #self.getSummary()[self.get_class_key()]['before']=len(self.get_dataframe())
        
        print('####### CONDENSE ALL 05')


        condense = Condense(self.get_dataframe(),\
                                     expected_output_columns_list,\
                                     extraColumns,\
                                     outlier_settings)\
            .setSummary(self.getSummary())\
            .run() # e
        
        self.set_dataframe( condense.get_dataframe() )

        '''
        --------------------------------- save csv 
        '''
        # assume new file and remove old one
        local_config["local_clean"]='{}/{}'.format(helper.get_clean_data_folder(),metadata['output_file_name'])

        if os.path.isfile(local_config["local_clean"]):
            os.remove(local_config['local_clean'])
            cell_log.collect('* deleted {} '.format(local_config['local_clean']))

        #self.getSummary()[self.get_class_key()]['after']=len(self.get_dataframe())
        #diff = self.getSummary()[self.get_class_key()]['after']  - self.getSummary()[self.get_class_key()]['before']
        #self.getSummary()[self.get_class_key()]['diff'] = diff
        
        pprint(self.getSummary())
            
        # stop if columns are not expected
        #self.validateOutputColumns()
       
        # self.finalCSV()
        #df_source.to_csv(local_config["local_clean"], index=False)
        OutputDrains(self.get_dataframe(), self.expected_output_columns_list).run()
        

## Wrangling Script

In [732]:
# NEW CELL
# testing 
# current dataset from dataworld
dw_source = 'citizenlabs/grb-storm-drains-2019-04-03'
summary = {}
wrangle=Wrangle(maps, expected_output_columns_list).setSummary(summary)
pprint(wrangle.getSummary())
if ENV_ERROR:
    cell_log.collect("# Script Failure!!")
    cell_log.collect("# !!! Missing Environment Variables !!!")
    cell_log.collect("### see [Environment Variable Setup](#env-setup)")
else:
    # get list of raw data files
    wrangle.run()

#print('wrangle dr_discharge', wrangle.get_dataframe()['dr_discharge'])    

{}
* Wrangle
self.get_class_key() 00.Wrangle
 - raw folder  /Users/jameswilfong/Documents/Github/Wilfongjt/01-AAD-data-world/01-In-Progress/06-20190924-data-load/data.world/raw-data/adopt-a-drain
 -- convert  /Users/jameswilfong/Documents/Github/Wilfongjt/01-AAD-data-world/01-In-Progress/06-20190924-data-load/data.world/raw-data/adopt-a-drain/AAD_Grandville_fruitport_SpringLake_OCRC9162019.xlsx
####### LOAD DW 01
* Load Data.World
 -- column: dr_asset_id
 -- column: dr_discharge
 -- column: dr_jurisdiction
 -- column: dr_lat
 -- column: dr_lon
 -- column: dr_owner
 -- column: dr_subtype
 -- column: dr_subwatershed
 -- column: dr_type
 -- column: dr_location
kill log:  ./logs/PatchDW20190925.log
####### LOAD DRAINS 02
* Load Drains AAD_Grandville_fruitport_SpringLake_OCRC9162019.xlsx.csv
####### CLEAN DRAINS 03
 -- add coordinate
 -- replace dr_jurisdiction with dr_owner
 -- identify empty dr_facility_ids
 -- remove char from dr_facility_id
 -- create field: source_code from dr_owner
 -

# Output new CSV File
* replacement for data.world and the production db
* the small version for the test db

In [733]:
wrangle.get_dataframe().info()
wrangle.get_dataframe().head()

<class 'pandas.core.frame.DataFrame'>
Index: 45534 entries, City of Wyoming to 1072
Data columns (total 9 columns):
dr_asset_id        45534 non-null object
dr_discharge       45412 non-null object
dr_jurisdiction    1517 non-null object
dr_lat             45534 non-null float64
dr_lon             45534 non-null float64
dr_owner           45534 non-null object
dr_subtype         45534 non-null float64
dr_subwatershed    45412 non-null object
dr_type            45534 non-null object
dtypes: float64(3), object(6)
memory usage: 3.5+ MB


Unnamed: 0,dr_asset_id,dr_discharge,dr_jurisdiction,dr_lat,dr_lon,dr_owner,dr_subtype,dr_subwatershed,dr_type
City of Wyoming,CWY_40089252,Buck Creek,,42.892421,-85.704451,City of Wyoming,15.0,Buck Creek,Storm Water Inlet Drain
City of Wyoming,CWY_40089269,Buck Creek,,42.898591,-85.649908,City of Wyoming,15.0,Buck Creek,Storm Water Inlet Drain
Kent County Road Commission,KCRC_40088536,Plaster Creek,,42.833919,-85.615018,Kent County Road Commission,16.0,Plaster Creek,Storm Water Inlet Drain
Kent County Road Commission,KCRC_40088538,Direct Drainage to Lower Grand River,,43.065199,-85.636698,Kent County Road Commission,15.0,Direct Drainage to Lower Grand River,Storm Water Inlet Drain
City of Wyoming,CWY_40088561,Buck Creek,,42.887989,-85.68953,City of Wyoming,15.0,Buck Creek,Storm Water Inlet Drain
