# Merge EE results and store them in a normalized postgresql database

* Purpose of script: This script will join the csv tables from GCS into one file using pandas
* Author: Rutger Hofste
* Kernel used: python35
* Date created: 20170914
* Data revisited: 20171122 

The script requires a file called .password to be stored in the current working directory with the password to the database.


In [1]:
import time, datetime
dateString = time.strftime("Y%YM%mD%d")
timeString = time.strftime("UTC %H:%M")
start = datetime.datetime.now()
print(dateString,timeString)

Y2017M11D22 UTC 09:11


In [11]:
x = 2

In [16]:
"{:02.0f}".format(x)

'02'

In [18]:
GCS_VERSION = 19
OUTPUT_VERSION = 17

SCRIPT_NAME = "Y2017M09D14_RH_merge_EE_results_V02"

GCS_INPUT_PATH = "gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V%0.2d/" %(GCS_VERSION)
EC2_INPUT_PATH = "/volumes/data/%s/input" %(SCRIPT_NAME)
EC2_OUTPUT_PATH = "/volumes/data/%s/output" %(SCRIPT_NAME)
S3_OUTPUT_PATH = "s3://wri-projects/Aqueduct30/processData/%s/output/" %(SCRIPT_NAME)


STRING_TRIM = "V%0.2dee_export.csv" %(GCS_VERSION)
# e.g. IrrLinearWW_monthY2014M12V15ee_export.csv -> IrrLinearWW_monthY2014M12

#Aux files, do not change order i.e. zones, area, extra
AUXFILES = ["Hybas06",
            "area_30s_m2",
            "ones_30s"
           ]

DROP_COLUMNS = [".geo","system:index"]

OUTPUTFILENAME = "mergedZonalStatsEE_V%0.2d" %(OUTPUT_VERSION)

# Database settings
DATABASE_IDENTIFIER = "aqueduct30v01"
DATABASE_NAME = "database01"
TABLE_NAME = str.lower(SCRIPT_NAME)

In [3]:
!rm -r {EC2_INPUT_PATH}
!rm -r {EC2_OUTPUT_PATH}

In [4]:
!mkdir -p {EC2_INPUT_PATH}
!mkdir -p {EC2_OUTPUT_PATH}

In [5]:
!gsutil cp -r {GCS_INPUT_PATH} {EC2_INPUT_PATH} 



Updates are available for some Cloud SDK components.  To install them,
please run:
  $ gcloud components update

Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/Hybas06V19ee_export.csv...
Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/IrrLinearWN_monthY2014M01V19ee_export.csv...
Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/IrrLinearWN_monthY2014M02V19ee_export.csv...
Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/IrrLinearWN_monthY2014M03V19ee_export.csv...
\ [4 files][  1.9 MiB/  1.9 MiB]                                                
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m -o ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/IrrLinearWN_monthY2014M04V19ee_export.csv...
Copying gs://aqueduct30_v01/Y2017M0

Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/PIndWW_monthY2014M10V19ee_export.csv...
Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/PIndWW_monthY2014M11V19ee_export.csv...
Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/PIndWW_monthY2014M12V19ee_export.csv...
Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/PIndWW_yearY2014M12V19ee_export.csv...
Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/PIrrWN_monthY2014M01V19ee_export.csv...
Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/PIrrWN_monthY2014M02V19ee_export.csv...
Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/PIrrWN_monthY2014M03V19ee_export.csv...
Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/PIrrWN_monthY2014M04V19ee_export.csv...
Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/PIrrWN_monthY2014M05V19ee_export.csv...
Copying gs://aqueduct30_v01/Y2017M09D11_RH_zonal_stats_EE_V19/PIrrWN_monthY2014M06V

In [25]:
import pandas as pd
import os
import re
import boto3
import botocore
from sqlalchemy import *

In [28]:
# RDS Connection
def rdsConnect(database_identifier,database_name):
    rds = boto3.client('rds')
    F = open(".password","r")
    password = F.read().splitlines()[0]
    F.close()
    response = rds.describe_db_instances(DBInstanceIdentifier="%s"%(database_identifier))
    status = response["DBInstances"][0]["DBInstanceStatus"]
    print("Status:",status)
    endpoint = response["DBInstances"][0]["Endpoint"]["Address"]
    print("Endpoint:",endpoint)
    engine = create_engine('postgresql://rutgerhofste:%s@%s:5432/%s' %(password,endpoint,database_name))
    connection = engine.connect()
    return engine, connection






In [29]:
engine, connection = rdsConnect(DATABASE_IDENTIFIER,DATABASE_NAME)

Status: available
Endpoint: aqueduct30v01.cgpnumwmfcqc.eu-central-1.rds.amazonaws.com


In [15]:
folder = os.path.join(EC2_INPUT_PATH,"Y2017M09D11_RH_zonal_stats_EE_V%0.2d/" %(GCS_VERSION)) 

In [16]:
files = os.listdir(folder)

In [31]:
oneFile = files[0]

In [34]:
df = pd.read_csv(os.path.join(folder,oneFile))

In [35]:
df.head()

Unnamed: 0,system:index,PfafID_PIndWN_monthY2014M03,count_PIndWN_monthY2014M03,mean_PIndWN_monthY2014M03,.geo
0,0,111011,2536,0.000859,
1,1,111012,3921,9.1e-05,
2,2,111013,1194,0.000278,
3,3,111014,5605,7e-06,
4,4,111015,21769,0.000217,


In [7]:
def createRegex(aList):
    return '|'.join(aList)

def prepareFile(oneFile):
        trimFileName = oneFile[:-len(STRING_TRIM)]
        d ={}
        d["df"] = pd.read_csv(os.path.join(folder,oneFile))
        d["df"] = prepareDf(d["df"])
        d["trimFileName"] = trimFileName
        return d         
        

def prepareDf(df):
    for column in df.columns:
        if re.search("PfafID",column):
            df2 = df.set_index(column)
            df2 = df2.drop(DROP_COLUMNS,1)        
            return df2
        


    
    


In [8]:
folder = os.path.join(EC2_INPUT_PATH,"Y2017M09D11_RH_zonal_stats_EE_V%0.2d/" %(GCS_VERSION)) 

In [9]:
files = os.listdir(folder)

## Process Auxiliary Datasets (PfafID, Area, Ones)

In [10]:
dAux ={}
for regex in AUXFILES:
    r = re.compile(regex)
    newList = filter(r.match, files)
    oneFile = list(newList)[0]
    dAux[regex] = prepareFile(oneFile)   

In [11]:
regex = createRegex(AUXFILES)

In [12]:
print(regex)

Hybas06|area_30s_m2|ones_30s


In [13]:
d ={}
dAux ={}
for oneFile in files: 
    trimFileName = oneFile[:-len(STRING_TRIM)]    
    if not re.search(regex,oneFile):
        d[trimFileName] = prepareFile(oneFile)
        
    elif re.search(regex,oneFile):
        dAux[trimFileName] = prepareFile(oneFile)
    
    else:
        print("Unrecognized file name, check STRING_TRIM variable")
        

In [14]:
dfLeft = dAux[AUXFILES[0]]["df"]

# Adding area to shapes

In [15]:
dAux[AUXFILES[1]]["df"]["total_%s" %(AUXFILES[1])] = dAux[AUXFILES[1]]["df"]["count_%s" %(AUXFILES[1])] * dAux[AUXFILES[1]]["df"]["mean_%s" %(AUXFILES[1])]

In [16]:
dfMerge = dAux[AUXFILES[0]]["df"].merge(dAux[AUXFILES[1]]["df"],
                       how="outer",
                       left_index=True,
                       right_index=True,
                       sort=True
                      )

In [17]:
for key, value in d.items():
    dfNew = value["df"].copy()
    # total new value = area in m^2 times mean flux 
    dfNew["total_volume_%s" %(value["trimFileName"])] = dAux[AUXFILES[1]]["df"]["total_%s" %(AUXFILES[1])] * value["df"]["mean_%s" %(value["trimFileName"])]
    
     
    
    dfMerge = dfMerge.merge(dfNew,
                           how="outer",
                           left_index=True,
                           right_index=True,
                           sort=True                   
                           )

In [18]:
dfMerge.head()

Unnamed: 0_level_0,count_Hybas06,mean_Hybas06,count_area_30s_m2,mean_area_30s_m2,total_area_30s_m2,count_PDomWN_monthY2014M09,mean_PDomWN_monthY2014M09,total_volume_PDomWN_monthY2014M09,count_reducedmeanrunoff_month_Y1960Y2014M04,mean_reducedmeanrunoff_month_Y1960Y2014M04,...,total_volume_PLivWN_monthY2014M09,count_PLivWW_monthY2014M01,mean_PLivWW_monthY2014M01,total_volume_PLivWW_monthY2014M01,count_runoff_monthY2014M07,mean_runoff_monthY2014M07,total_volume_runoff_monthY2014M07,count_IrrLinearWN_monthY2014M06,mean_IrrLinearWN_monthY2014M06,total_volume_IrrLinearWN_monthY2014M06
PfafID_Hybas06,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
111011,2536,111011.0,2536,743658.186761,1885917000.0,2536,0.000395,745809.175699,2479,3.0981e-05,...,257.785492,2536,1.137619e-07,214.5456,2479,-2.767351e-07,-521.899501,2536,5.4e-05,101765.706562
111012,3921,111012.0,3921,746186.462653,2925797000.0,3921,5e-06,13455.826007,3914,8.585877e-06,...,1938.331951,3921,5.535384e-07,1619.541163,3914,-7.916655e-08,-231.625276,3921,0.0,0.0
111013,1194,111013.0,1194,747422.836265,892422900.0,1194,1.3e-05,11996.855094,1141,8.16737e-06,...,45.720323,1194,4.26933e-08,38.100481,1141,-7.598471e-08,-67.810494,1194,0.0,0.0
111014,5605,111014.0,5605,750449.27045,4206268000.0,5605,3e-06,11148.519841,5605,3.193022e-06,...,888.152682,5605,1.771844e-07,745.284928,5605,0.0,0.0,5605,0.0,0.0
111015,21873,111015.0,21873,758792.279231,16597060000.0,21769,1.8e-05,306753.393803,21086,5.18205e-07,...,446.54922,21769,2.249575e-08,373.363372,21086,0.0,0.0,21769,0.0,0.0


In [19]:
dfMerge.to_csv(os.path.join(EC2_OUTPUT_PATH,OUTPUTFILENAME+".csv"))

In [20]:
dfMerge.to_pickle(os.path.join(EC2_OUTPUT_PATH,OUTPUTFILENAME+".pkl"))

In [21]:
outputLocation = os.path.join(S3_OUTPUT_PATH,OUTPUTFILENAME)

In [22]:
!aws s3 cp --recursive {EC2_OUTPUT_PATH} {S3_OUTPUT_PATH}

upload: ../../../../data/Y2017M09D14_RH_merge_EE_results_V01/output/mergedZonalStatsEE_V17.csv to s3://wri-projects/Aqueduct30/processData/Y2017M09D14_RH_merge_EE_results_V01/output/mergedZonalStatsEE_V17.csv
upload: ../../../../data/Y2017M09D14_RH_merge_EE_results_V01/output/mergedZonalStatsEE_V17.pkl to s3://wri-projects/Aqueduct30/processData/Y2017M09D14_RH_merge_EE_results_V01/output/mergedZonalStatsEE_V17.pkl


In [23]:
end = datetime.datetime.now()
elapsed = end - start
print(elapsed)

0:01:54.615969
