In [1]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}


<IPython.core.display.Javascript object>

In [2]:
from lib.p3_ProcessLogger import ProcessLogger
cell_log = ProcessLogger() 

 # Project: Adopt a Drain
 * Author: James Wilfong, wilfongjt@gmail.com
 
## Basics
* data processed in a local clone of source-data 
* intermediate files are put into source-data repo
* the final data.world data set name is the same as the raw-data file name
* the source-data repo folders /raw-data, /clean-data, /notebook are updated during the process

## Raw-data Process
* input: raw-data/ 
* use python via jupyter notebook to manipulate data into usable file
* update results to github
* output: clean-data/

## GIT Process
* input: clean-data/
* process: add, commit, push files from raw-data/, clean-data/, notebook/ folders
* output: GitHub source-data repo

## Data.World Process
* input: GitHub source-data/clean-data/
* process: transfer github clean-data/ to data.world
* output: data.world

## Table of Contents
* [Introduction](#intro)

* [Data Wrangling](#wrangling_steps)


<a id='intro'></a>
## Introduction
* why adopt a drain


<a id='prerequisites'></a>
## Prerequisites
* create [Github](#github) repository to hold raw data
* create [Data World](#data-world) account
* [Notebook Config](#notebook-config)
* [Environment Variable Setup](#env-setup)

<a id='data-world'></a>
## Dataworld
* Set up an account
* DW_AUTH_TOKEN value comes from your [data.world](https://data.world/) account-settings-advanced-Admin.
* Application data is stored in data.world
* A Data.world dataset is mostly read-only
* A Data.world is updated via file replacement


<a id='github'></a>
## Github

* raw-data is loaded from the remote source-data repo on Github
* raw-data is stored in the /raw-data folder of the source-data repo
* raw-data is pushed to the remote source-data repo before running this notebook

<a id='env-setup'></a>
## Environment Variable Setup
* Create a file .env and put in the /notebook folder
* .env does not get included in the github repository. Exclude .env from github in the .gitignore file
* Add environment variables to .env file
    * DW_USER=your-data-world-user-name
    * GH_URL=https://raw.githubusercontent.com/Wilfongjt/source-data/master/raw-data/
    * DW_DB_URL=https://api.data.world/v0/datasets/wilfongjt/
    * DW_DB_RW_TOKEN=dataworld-token
    * DW_ADM_TOKEN=dataworld-adm-token


In [3]:
cell_log.clear()
cell_log.collect('## Load Packages')
# import dotenv
cell_log.collect('* Load environment variables')
from settings import *
cell_log.collect('* Import third party packages')
# from exceptions import ApiException
from datadotworld.client import _swagger
from datadotworld.client.api import RestApiError
import datadotworld as dw

import numpy as np 
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import csv # read and write csv files
from IPython.display import display, HTML
from IPython.display import Markdown
from pprint import pprint
import time
import os
import subprocess

# convenience functions -- cleaning
cell_log.collect('* Import custom packages')
from lib.p3_CellCounts import CellCounts
import lib.p3_clean as clean
from lib.p3_configuration import get_configuration
import lib.p3_explore as explore
import lib.p3_gather as gather # gathering functions
import lib.p3_helper_functions as helper
import lib.p3_map as maps

Markdown('''{}'''.format(cell_log.getMarkdown()))

settings


## Load Packages
* Load environment variables
* Import third party packages
* Import custom packages

In [4]:
%env

cell_log.clear()
cell_log.collect("<a id='notebook-config'></a>")
cell_log.collect("## Notebook Config")
# ------------ environment variable magic

# Install a pip packages in the current Jupyter kernel
# ------------ Python-dotenv
cell_log.collect("* python-dotenv")
import sys
!{sys.executable} -m pip install python-dotenv
# ------------ data.world API 
cell_log.collect("* datadotworld")
!{sys.executable} -m pip install datadotworld[pandas]

Markdown('''{}'''.format(cell_log.getMarkdown()))

[33mYou are using pip version 9.0.1, however version 18.0 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m
[33mYou are using pip version 9.0.1, however version 18.0 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


<a id='notebook-config'></a>
## Notebook Config
* python-dotenv
* datadotworld

# Process
## Prepare Data
* download github repo with data
* put new file in raw-data/
* make copy of this jupyter notebook 
* configure to transform raw-data/ into clean-data/
* put clean data into clean/ folder
* push final changes to github
## Load Data
* 

<a id='wrangling_steps'></a>
# Data Wrangling


<a id='wrangle-process'></a>
## Process

# Download Data

In [5]:
MODE='prod' # dev, prod
LOCAL_RAW_FOLDER = os.getcwd().replace('notebook','raw-data') + '/'
LOCAL_CLEAN_FOLDER = os.getcwd().replace('notebook','clean-data') + '/'
print('MODE: ', MODE)
print('LOCAL_RAW_FOLDER: ', LOCAL_RAW_FOLDER)
print('LOCAL_CLEAN_FOLDER: ', LOCAL_CLEAN_FOLDER)


MODE:  prod
LOCAL_RAW_FOLDER:  /Users/jameswilfong/Documents/Github/Wilfongjt/source-data/raw-data/
LOCAL_CLEAN_FOLDER:  /Users/jameswilfong/Documents/Github/Wilfongjt/source-data/clean-data/


In [16]:
def getSourceData(tblDef):
    return pd.read_csv(tblDef["local_raw"])

def deprecated_getEvaluation(df_source):
    #
    limit = 0
    flds = df_source.columns.tolist()
    dups =   {} # {fldname: count of duplicates, ...}
    values = {} # {fldname:[list of values], ...}
    blanks = {} # {fldname: count of blanks}
    minmax = {} # {fldname: {min: value, max: value}
    evaluation = {"duplicates": dups, "blanks": blanks, "minmax": minmax}
    for fld in flds:
        if fld not in dups:
            dups[fld] = 0
        if fld not in values:
            values[fld] = []
        if fld not in blanks:
            blanks[fld]=0
        if fld not in minmax:
            minmax[fld]={}
            
    cnt = 0
    # print(flds)
    
    for row in df_source.values: # loop drains
        cnt += 1
        # print(row)
        for fld in flds:  # loop field names

            ifld = flds.index(fld)
            if row[ifld] in values[fld]:
                dups[fld] += 1
            else:
                values[fld].append(row[ifld])
            
            if row[ifld] == ' ' or row[ifld] == None:
                blanks[fld] += 1
                
            if 'min' in minmax[fld]:
                if row[ifld] < minmax[fld]['min']:
                    minmax[fld]['min'] = row[ifld]
            else:
                minmax[fld]['min'] = row[ifld]
                
            if 'max' not in minmax[fld]:  
                minmax[fld]['max'] = row[ifld]
            
            if row[ifld] > minmax[fld]['max']:
                minmax[fld]['max'] = row[ifld]
            
                
                
        if limit > 0 and limit < cnt:
            break
            
    return evaluation
        
def getTableDef(table_name, ext='csv'):
    return { "owner_id": DW_USER, 
             "title": table_name, 
             "gh_url": GH_URL + table_name, 
             "visibility": "OPEN", 
             "license": "Public Domain",
             "files": {table_name + '.' + 'csv': {"url": GH_URL + table_name + '.' + ext}},
             "dw_url": DW_DB_URL + table_name + '.' + ext, 
             "local_raw": LOCAL_RAW_FOLDER + table_name + '.' + ext,
             "local_clean": LOCAL_CLEAN_FOLDER + table_name + '.' + ext
           }


def loadDataWorld(tbl_def):
    '''
        Takes a csv file and imports it into dataworld
        tbl_def is { "owner_id": DW_USER, 
                     "title": table_name, 
                     "gh_url": GH_URL + table_name, 
                     "visibility": "OPEN", 
                     "license": "Public Domain",
                     "files": {table_name + '.csv': {"url": GH_URL + table_name + '.csv'}},
                     "dw_url": DW_DB_URL + table_name + '.csv' 
                    }
                    
    '''
    # api_client.create_dataset(
    dw.api_client().create_dataset(    
        owner_id=tbl_def["owner_id"], 
        title=tbl_def["title"], 
        visibility=tbl_def["visibility"],
        license=tbl_def['license'],
        files=tbl_def["files"]
    )
    
    
def deleteDataWorld(tbl_def):
    '''
    Removes table from data.world
    tbl_def is { "owner_id": DW_USER, 
                     "title": table_name, 
                     "gh_url": GH_URL + table_name, 
                     "visibility": "OPEN", 
                     "license": "Public Domain",
                     "files": {table_name + '.csv': {"url": GH_URL + table_name + '.csv'}},
                     "dw_url": DW_DB_URL + table_name + '.csv' 
                    }
    '''
    print("user/dataset: " + tbl_def["owner_id"] + "/" + tbl_def["title"])
    return dw.api_client().delete_dataset(       
        tbl_def["owner_id"] + "/" + tbl_def["dw_url"]
    )
        
    
# def renameColumns(df,):    
#    df = df.rename(columns=clean_column_names)    

def setDuplicates(default_value, dup_list):
    return lambda default_value : a * n

## Configure Process

In [17]:
'''
------------- configure source csv
'''
table_name = 'gr_drains'
repo_branch = 'refresh-data'
'''
------------- configure source csv
'''
tables = [
    getTableDef(table_name)
]
'''
------------- configure outliers
'''
_outliers = {
  'outliers': [
    {'column':'dr_facility_id',
     'range':(1, 50000000),
     'reason':'ignore {} outliers (1 <= dr_facility_id or => 50000000).',
     'count': 0
    }, 
    {'column':'dr_lon',
     'range':(-90.0, -80.0),
     'reason':'Remove {} observations too far west or east.',
     'count': 0
    },  
    {'column':'dr_lat',
     'range':(40.0, 50.0),
     'reason':'Remove {} observations too far north or south.',
     'count': 0
    }
  ]
}

In [18]:
cell_log.clear()


cell_log.collect("# CSV Process")
'''
--------------------------------- input
'''
for tbl in tables:
    cell_log.collect("* input:  {}".format( tbl["local_raw"]))

'''
--------------------------------- load data
''' 
tbl = tables[0]
# df_source = pd.read_csv(tbl["local_raw"])
print(tbl)
df_source = getSourceData(tbl)
cell_log.collect("* input: {} observations".format(len(df_source)))
cell_log.collect("* input: columns {}".format(df_source.columns.values))

'''
--------------------------------- clean column names
'''
cell_log.collect('* format: Apply a style of lowercase and underscores to column names.')##############################
df_source = clean.clean_column_names(df_source)

'''
--------------------------------- rename columns
'''
# df_source['lon'] = df_source['trk_crnt_x_cord']
# df_source['lat'] = df_source['trk_crnt_y_cord']
df_source = df_source.rename(columns={
    "subtype": "dr_subtype",
    "drain__owner": "dr_owner",
    "local__id": "dr_local_id",
    "facilityid": "dr_facility_id",
    "drain__jurisdiction": "dr_jurisdiction",
    "subwatershed": "dr_subwatershed",
    "point__x":"dr_lon", 
    "point__y":"dr_lat"})

# print('info: ',df_source.info())
'''
--------------------------------- change empty values
'''
# change '', ' ', None, and NaN to -1

# def_facility_id = _outliers['outliers'][0]['range'][0] - 1

# def_facility_id_dup =  def_facility_id - 1

# cell_log.collect("* clean: mark bad dr_facility_id values ('',' ',None,NaN') with {}".format(def_facility_id))

## ------------------------------ DROP empty Facility id
# mark all empties with same value
df_source['dr_facility_id'] = df_source['dr_facility_id'].apply(lambda x:  np.nan if x != x or x == '' or x == ' ' or x == None else x)
scnt = len(df_source)
df_source = df_source.dropna(subset=['dr_facility_id', 'soure__id','dr_lon', 'dr_lat'])
ecnt = len(df_source)
cell_log.collect("* clean: dropped {} observations with empty dr_facility_id, soure___id, dr_lon, or dr_lat".format(scnt - ecnt))


'''
--------------------------------- change column types
'''
cell_log.collect('* format: convert dr_facility_id column to int64')
df_source['dr_facility_id'] = df_source['dr_facility_id'].astype('int64')

'''
--------------------------------- remove numbers from df_source_id
'''
df_source.info()
# df_source['dr_source_id'] = df_source['dr_source_id'].apply(lambda x: x.split('_')[0] if '_' in x else x ) 
# df_source['dr_source_id'] = df_source['dr_source_id'].apply(lambda x: x if '_' in x else x )
df_source['soure__id'] = df_source['soure__id'].apply(lambda x: x.split('_')[0] + '_' if isinstance(x, str) else 'XXX_') 

'''
--------------------------------- create a sync id
'''
df_source['dr_sync_id'] = df_source['soure__id'] + df_source['dr_facility_id'].astype(str)

'''
--------------------------------- drop soure__id
'''
df_source = df_source.drop(['soure__id'], axis=1)

'''
--------------------------------- combine source_id and facility_id
'''
# df_source['dr_sync_id'] = df_source['dr_source_id'] + df_source['dr_facility_id'].astype(str)

df_source.info()

'''
--------------------------------- outliers
'''
df_source = clean.remove_obvious_outliers(_outliers, df_source)
# cell_log.collect('# Outliers')
for r in _outliers['outliers']:##############################
    cell_log.collect('* outlier: {}'.format(r['reason']))

'''
--------------------------------- Drop DUPLICATES
'''
scnt = len(df_source)
df_source = df_source.drop_duplicates('dr_facility_id',keep=False)
ecnt = len(df_source)
cell_log.collect('* duplicates: dropped {} duplicate facility ids'.format(scnt - ecnt))


'''
--------------------------------- Final 
'''

'''
--------------------------------- save csv 
'''
# cell_log.collect('# Output')
# assume new file and remove old one
if os.path.isfile(tbl["local_clean"]):
    os.remove(tbl['local_clean'])
    cell_log.collect('* system: remove {} '.format(tbl['local_clean']))

cell_log.collect("* inter-output: columns {}".format(df_source.columns.values))
cell_log.collect('* inter-output: {} obs to {}'.format(len(df_source) , tbl["local_clean"]))


df_source.to_csv(tbl["local_clean"], index=False)


if MODE == 'dev':
    print('info: ',df_source.info())
    print('head: ',df_source.head())  
    print('outliers: ', _outliers)
    
    
if MODE == 'prod':    
    '''
    run extra git commands
    run import to data.word
    '''
    '''
    --------------------------------- GIT Process 
    '''
    cell_log.collect('')
    cell_log.collect('# GIT Process')
    '''
    --------------------------------- input
    '''
    cell_log.collect('* input: ' + tbl["local_clean"])
    '''
    --------------------------------- git add
    '''
 
    cell_log.collect('* git add raw-data/ -A')
    output = subprocess.check_output(["git", "add", "../raw-data" ,"-A"])
    cell_log.collect('* git add clean-data/ -A' )
    output = subprocess.check_output(["git", "add", "../clean-data" ,"-A"])
    cell_log.collect('* git add notebook/ -A' )
    output = subprocess.check_output(["git", "add", "../notebook"])
    cell_log.collect('* git add ../README.md -A' )
    output = subprocess.check_output(["git", "add", "../README.md"])
    '''
    --------------------------------- git commit
    '''
    # cell_log.collect('* XXXXXXX git commit -m "update raw-data {}"'.format(tbl["local_raw"]) )
    # cell_log.collect('* XXXXXXX git commit -m "update clean-data {}"'.format("local_clean") )
    cell_log.collect('* git commit -m "update raw-data, clean-data, and notebook files "' )

    # output = subprocess.check_output(["git", "commit", "-m", "'update raw-data, clean-data, and notebook files'"])

    try:
        output = subprocess.check_output(["git", "commit", "-m", "'update raw-data, clean-data, and notebook files'"])
    except subprocess.CalledProcessError as error:
        print(error)
    except:
        cell_log.collect('* unknown error' )
    '''
    --------------------------------- git push
    '''
    cell_log.collect('* git push origin ' + repo_branch)
    output = subprocess.check_output(["git", "push", "origin", repo_branch])

    '''
    --------------------------------- Data World Process 
    '''
    
    cell_log.collect('')
    cell_log.collect('# Data.World Process')
    cell_log.collect('* input: {}'.format(tbl["gh_url"] + ".csv") )
    cell_log.collect('* load data into data.world')
    
    deleteDataWorld(tbl)
        
    try:
        print('try')
        loadDataWorld(tbl)
        cell_log.collect('* output: {}'.format(tbl["dw_url"] ))
   
 
    except RestApiError as ex:
        print('RestApiException ' + str(ex) )
        cell_log.collect('* LOAD FAIL: {}'.format(str(ex)))

    



Markdown('''{}'''.format(cell_log.getMarkdown()))


{'owner_id': 'wilfongjt', 'title': 'gr_drains', 'gh_url': 'https://raw.githubusercontent.com/Wilfongjt/source-data/refresh-data/clean-data/gr_drains', 'visibility': 'OPEN', 'license': 'Public Domain', 'files': {'gr_drains.csv': {'url': 'https://raw.githubusercontent.com/Wilfongjt/source-data/refresh-data/clean-data/gr_drains.csv'}}, 'dw_url': 'https://api.data.world/v0/datasets/wilfongjt/gr_drains.csv', 'local_raw': '/Users/jameswilfong/Documents/Github/Wilfongjt/source-data/raw-data/gr_drains.csv', 'local_clean': '/Users/jameswilfong/Documents/Github/Wilfongjt/source-data/clean-data/gr_drains.csv'}
* clean_column_names: 0.0057489871978759766 sec
<class 'pandas.core.frame.DataFrame'>
Int64Index: 40189 entries, 0 to 40197
Data columns (total 9 columns):
dr_subtype         40189 non-null float64
dr_jurisdiction    40189 non-null object
dr_owner           40189 non-null object
soure__id          40189 non-null object
dr_local_id        40189 non-null object
dr_facility_id     40189 non-nu

ValueError: Invalid dataset key. Key must include user and dataset names, separated by (i.e. user/dataset).