<h1><center><font color = 'blue'>Prepare ML data</font></center></h1>

In [1]:
import multiprocessing as mp
import pandas as pd
import numpy as np
import sagemaker
import boto3
import glob
import time
import os
import re

from sagemaker import get_execution_role
from scipy.spatial import cKDTree
from math import trunc

In [2]:
!conda install -y xarray netcdf4

Solving environment: done


  current version: 4.4.10
  latest version: 4.5.8

Please update conda by running

    $ conda update -n base conda



## Package Plan ##

  environment location: /home/ec2-user/anaconda3/envs/python3

  added / updated specs: 
    - netcdf4
    - xarray


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    libnetcdf-4.6.1            |       h13459d8_0         1.2 MB
    h5py-2.8.0                 |   py36h8d01980_0         1.1 MB
    pytables-3.4.4             |   py36ha205bf6_0         1.5 MB
    xarray-0.10.8              |           py36_0         730 KB
    hdf4-4.2.13                |       h3ca952b_2         916 KB
    cftime-1.0.0b1             |   py36h035aef0_0         260 KB
    netcdf4-1.4.0              |   py36ha06eab4_1         536 KB
    ------------------------------------------------------------
                                           Total:  

In [3]:
import multiprocessing as mp
import xarray as xr
import pandas as pd
import numpy as np
import sagemaker
import boto3
import glob
import time
import os
import re

from sagemaker import get_execution_role
from scipy.spatial import cKDTree
from math import trunc

In [4]:
# define a function that loads obs and bkg data into pandas dataframes
def load_data(filepath, date):
    resource = boto3.resource("s3")
    filename = "/tmp/" + filepath.split("/")[-1]
    resource.Bucket("fsoi").download_file(filepath, filename)
    
    obs = pd.read_hdf(filename).xs(["AMSUA_METOP-B", 6], level=["PLATFORM", "CHANNEL"])
    os.remove(filename)
    
    obs = obs.reset_index(level=[0, 1])
    obs = obs.drop(["OBTYPE", "OBERR", "PRESSURE"], axis=1)
    
    # fix lon between -180/180 instead of 0/360
    mask_lon = obs[obs["LONGITUDE"] > 180].index.tolist()
    obs.loc[mask_lon, "LONGITUDE"] = obs.loc[mask_lon, "LONGITUDE"] - 360
    
    date = date[:-2] + "_" + date[-2:]
    month = date[:4] + "_" + date[4:6] + "/"
    filename = "e5130_hyb_01.bkg.eta." + date + "z.nc4"
    filepath = "bkg/" + month + filename
    filename = "/tmp/" + filename
    resource.Bucket("fsoi").download_file(filepath, filename)
    
    bkg = xr.open_dataset(filename).to_dataframe()\
            .reset_index(level=[0, 1, 2])
    
    os.remove(filename)
            
    obs = obs.sort_values(["LONGITUDE", "LATITUDE"]).reset_index(drop=True)
    bkg = bkg.sort_values(["lon", "lat"]).reset_index(drop=True)
        
    return obs, bkg


# define kdtree function that return pts1 indexes of nearest point from pts2
def do_kdtree(pts1, pts2):
    mytree = cKDTree(pts1)
    dist, indexes = mytree.query(pts2)
    return indexes


# define a function that gets nearest bkg point for each observation
def get_nearest(obs, bkg):
    obs_pts = obs[["LONGITUDE", "LATITUDE"]].as_matrix()
    bkg_pts = bkg[["lon", "lat"]].as_matrix()
    
    nearest_ix = do_kdtree(bkg_pts, obs_pts)
    bkg_nearest = bkg.iloc[nearest_ix]
    
    return bkg_nearest


# define a function that gets bkg levels data for a particular point
def get_lev_data(bkg, ix, lev_cols):
    return bkg.loc[ix:ix + 71, lev_cols[1:]].copy().stack().reset_index(drop=True)


# define a function that adds level data from bkg 3D to 2D by transposing it
def add_lev_data(bkg, bkg_2D, level_cols, levels):
    lev_data = []
    bkg_2D.index.map(lambda ix: lev_data.append(get_lev_data(bkg, ix, level_cols)))
    lev_data = pd.concat(lev_data, axis=1).transpose()
    
    # set level column names (!! levels are different depending on the files)
    # levels = [round(lev, 3) for lev in bkg.lev.unique()]
    lev_data.columns = [
            col + '_' + lev
            for lev in levels
            for col in level_cols[1:]
            ]
    
    bkg_2D.reset_index(drop=True, inplace=True)
    bkg_2D = pd.concat([bkg_2D, lev_data], axis=1)
    
    return bkg_2D


# define a function that merges our training data from all files
def merge_train_data(filepaths, levels, i, n):
    level_cols = ["lev", "delp", "u", "v", "tv", "sphu", "ozone", "qitot", "qltot"]
    ml_data = pd.DataFrame()
    
    for j in range(i, len(filepaths), n):
        
        start = time.time()
        
        filepath = filepaths[j]
        date = re.findall("[0-9]+.h5", filepath)[0][:-3]
        obs, bkg = load_data(filepath, date)
        
        mask_2D = np.arange(0, len(bkg), 72)
        bkg_2D = bkg.drop(level_cols, axis=1).loc[mask_2D, :]
        
        bkg_nearest = get_nearest(obs, bkg_2D)
        bkg_nearest = add_lev_data(bkg, bkg_nearest, level_cols, levels)
        
        merge = pd.concat([obs, bkg_nearest], axis=1)
        ml_data = pd.concat([ml_data, merge], axis=0)
        
        end = time.time()
        print("Merge {} obs and bkg done in: {} min and {} sec".format(
            date, trunc((end - start)/60), round((end - start)%60)
        ))
        
    return ml_data

In [5]:
pstart = time.time()

client = boto3.client("s3")
resource = boto3.resource("s3")

months = ["2014_12", "2015_01", "2015_02"]
filepaths = []

for month in months:
    monthpaths = client.list_objects(Bucket="fsoi", Prefix="obs/GMAO_" + month + "/GMAO.dry.")
    filepaths += [dic["Key"] for dic in monthpaths["Contents"]]

# import levels
filename = "levels.txt"

resource.Bucket("fsoi").download_file(filename, filename)

f = open(filename, "r")
levels = f.read().split("\n")
f.close()

os.remove(filename)

# define how many process we want to run
n = 12
pool = mp.Pool(processes=n)

# run our processes
results = [
        pool.apply_async(merge_train_data, args=(filepaths, levels, i, n))
        for i in range(n)
        ]

results = [p.get() for p in results]

# merge our training data
ml_data = pd.concat(results, axis=0)

# save and compress training data in hdf5 format
start = time.time()

filename = "amsua_metop_b_ch6.h5"
key = "ml_data/" + filename
filename = "/tmp/" + filename

ml_data = ml_data.sort_values(["DATETIME", "LATITUDE", "LONGITUDE"])\
                 .reset_index(drop=True)
                             
ml_data.to_hdf(filename, key="df", complevel=9)

resource.Bucket("fsoi").upload_file(filename, key)

os.remove(filename)

end = time.time()
print("Saved and compressed in: {} min and {} sec".format(trunc((end - start)/60),
                                                          round((end - start)%60)))
    
# display total program time
pend = time.time()
print("Total program took: {} hours and {} min".format(trunc((pend - pstart)/3600),
                                                       round((pend - pstart)%3600/60)))

Merge 2014120118 obs and bkg done in: 0 min and 48 sec
Merge 2014120200 obs and bkg done in: 0 min and 48 sec
Merge 2014120312 obs and bkg done in: 0 min and 48 sec
Merge 2014120206 obs and bkg done in: 0 min and 48 sec
Merge 2014120106 obs and bkg done in: 0 min and 48 sec
Merge 2014120100 obs and bkg done in: 0 min and 48 sec
Merge 2014120218 obs and bkg done in: 0 min and 48 sec
Merge 2014120300 obs and bkg done in: 0 min and 49 sec
Merge 2014120112 obs and bkg done in: 0 min and 57 sec
Merge 2014120500 obs and bkg done in: 0 min and 45 sec
Merge 2014120506 obs and bkg done in: 0 min and 45 sec
Merge 2014120612 obs and bkg done in: 0 min and 46 sec
Merge 2014120400 obs and bkg done in: 0 min and 46 sec
Merge 2014120406 obs and bkg done in: 0 min and 46 sec
Merge 2014120518 obs and bkg done in: 0 min and 46 sec
Merge 2014120600 obs and bkg done in: 0 min and 46 sec
Merge 2014120418 obs and bkg done in: 0 min and 47 sec
Merge 2014120212 obs and bkg done in: 1 min and 53 sec
Merge 2014

Merge 2015010306 obs and bkg done in: 0 min and 48 sec
Merge 2015010206 obs and bkg done in: 0 min and 46 sec
Merge 2015010712 obs and bkg done in: 0 min and 47 sec
Merge 2015010418 obs and bkg done in: 0 min and 48 sec
Merge 2015011306 obs and bkg done in: 0 min and 44 sec
Merge 2015011412 obs and bkg done in: 0 min and 46 sec
Merge 2015011300 obs and bkg done in: 0 min and 48 sec
Merge 2015011100 obs and bkg done in: 0 min and 47 sec
Merge 2015011200 obs and bkg done in: 0 min and 47 sec
Merge 2015010218 obs and bkg done in: 0 min and 48 sec
Merge 2015010918 obs and bkg done in: 0 min and 44 sec
Merge 2015011212 obs and bkg done in: 0 min and 47 sec
Merge 2015011012 obs and bkg done in: 0 min and 45 sec
Merge 2015010606 obs and bkg done in: 0 min and 47 sec
Merge 2015010506 obs and bkg done in: 0 min and 47 sec
Merge 2015010718 obs and bkg done in: 0 min and 47 sec
Merge 2015011606 obs and bkg done in: 0 min and 48 sec
Merge 2015011712 obs and bkg done in: 0 min and 48 sec
Merge 2015

Merge 2015021912 obs and bkg done in: 0 min and 48 sec
Merge 2015020718 obs and bkg done in: 0 min and 47 sec
Merge 2015021800 obs and bkg done in: 0 min and 50 sec
Merge 2015021600 obs and bkg done in: 0 min and 49 sec
Merge 2015021006 obs and bkg done in: 0 min and 53 sec
Merge 2015021418 obs and bkg done in: 0 min and 54 sec
Merge 2015021106 obs and bkg done in: 0 min and 56 sec
Merge 2015021712 obs and bkg done in: 0 min and 49 sec
Merge 2015021700 obs and bkg done in: 0 min and 57 sec
Merge 2015021512 obs and bkg done in: 0 min and 49 sec
Merge 2015021218 obs and bkg done in: 0 min and 49 sec
Merge 2015022106 obs and bkg done in: 0 min and 47 sec
Merge 2015022212 obs and bkg done in: 0 min and 47 sec
Merge 2015021018 obs and bkg done in: 0 min and 49 sec
Merge 2015022100 obs and bkg done in: 0 min and 49 sec
Merge 2015021900 obs and bkg done in: 0 min and 50 sec
Merge 2015021718 obs and bkg done in: 0 min and 47 sec
Merge 2015021406 obs and bkg done in: 0 min and 49 sec
Merge 2015