In [1]:
import pandas as pd  
import numpy as np
from glob import glob
from scipy.io import savemat, loadmat
from datetime import datetime, timedelta

import pdb
import pymongo
import psycopg2
from sqlalchemy import create_engine
from sqlalchemy.types import Integer, Float, Numeric, String, Date
import os
import gzip
from pchipOceanSlices import PchipOceanSlices

from importlib import reload

import visualizeProfs as vp
%config InlineBackend.figure_format = 'retina'
from pchipOceanSlices import PchipOceanSlices

# Init database

In [2]:
sqliteDb = "sqlite:////storage/kakapo/JG_profiles.db"
tableName = "profiles"
conn = create_engine(sqliteDb, echo=False)

conn.execute('''DROP TABLE IF EXISTS {};'''.format(tableName))

In [3]:
try:
    smdf = pd.read_sql_query('SELECT * FROM {} LIMIT 10;'.format(tableName), conn)
except Exception as err:
    print(err)
    pass

In [3]:
dtypes = {"latitude": Numeric(precision=3),
          "longitude": Numeric(precision=3),
          "day": Integer(),
          "month": Integer(),
          "year": Integer(),
          "date": Date(),
          "hr": Integer(),
          "min": Integer(),
          "sec": Integer(),
          "WMO": Integer(),
          "DMODE status": String(length=1),
          "pressure": Numeric(precision=3),
          "temperature": Numeric(precision=3),
          "salinity": Numeric(precision=3),
          "cycle_number": Integer()}

index_label = ["year", "month", "WMO", "profile_id"]

# Get reject list of profiles

In [4]:
rList = glob('/storage/kakapo/*temp*')
wl = glob('/storage/kakapo/*.xml')
header = ['platform_number', 'cycle', 'lat', 'lon', 'c1', 'c2', 'c3', 'c4', 'c5']

def get_lines(filename):
    with open(filename, "r") as f:
        data = f.readlines()
        lines = []
        for line in data: 
            line = ' '.join(line.split())
            line = line.split(' ')
            line[-1] = line[-1].replace('/n', '')
            lines.append(line)
        print(filename)
    return lines

def parse_kakapo(filename, header):
    '''parses reject and white list in kakapo'''
    lines = get_lines(filename)
    df = pd.DataFrame(data=lines, columns=header)
    df['profile_id'] = df['platform_number'].apply(lambda x: str(x)) + '_'  + df['cycle'].apply(lambda x: str(x))
    return df

rejectDf = parse_kakapo(rList[0], header)
whiteList = parse_kakapo(wl[0], header)

/storage/kakapo/reject_cycle_fromvar_temp_01x01_finl
/storage/kakapo/ar_whitelist_2019.xml


In [5]:
print(rejectDf.shape)
rejectDf.head()

(2135, 10)


Unnamed: 0,platform_number,cycle,lat,lon,c1,c2,c3,c4,c5,profile_id
0,6903190,18,-60.917,304.997,0,2.154,0.257,6.0xtimes1975.0,,6903190_18
1,5903417,88,-37.717,332.474,1,0.343,0.051,6.0xtimes1975.0,,5903417_88
2,6901697,81,-37.674,332.432,1,0.551,0.051,6.0xtimes1975.0,,6901697_81
3,3900529,66,-37.284,285.835,1,0.23,0.038,6.0xtimes1975.0,,3900529_66
4,5904120,53,-36.369,330.982,1,0.64,0.048,6.0xtimes1975.0,,5904120_53


In [6]:
ok_ids = whiteList['profile_id'].tolist()

In [7]:
rejectDf = rejectDf[~rejectDf['profile_id'].isin(ok_ids)]
rejectList = rejectDf['profile_id'].tolist()

In [8]:
print(rejectDf.shape)
rejectDf.head()

(1113, 10)


Unnamed: 0,platform_number,cycle,lat,lon,c1,c2,c3,c4,c5,profile_id
0,6903190,18,-60.917,304.997,0,2.154,0.257,6.0xtimes1975.0,,6903190_18
1,5903417,88,-37.717,332.474,1,0.343,0.051,6.0xtimes1975.0,,5903417_88
2,6901697,81,-37.674,332.432,1,0.551,0.051,6.0xtimes1975.0,,6901697_81
3,3900529,66,-37.284,285.835,1,0.23,0.038,6.0xtimes1975.0,,3900529_66
4,5904120,53,-36.369,330.982,1,0.64,0.048,6.0xtimes1975.0,,5904120_53


In [9]:
def create_collection(dbName, collectionName):
    dbUrl = 'mongodb://localhost:27017/'
    client = pymongo.MongoClient(dbUrl)
    db = client[dbName]
    coll = db[collectionName]
    coll = init_profiles_collection(coll)
    return coll    

def init_profiles_collection(coll):
    try:
        coll.create_index([('date', pymongo.DESCENDING)])
        coll.create_index([('dac', pymongo.DESCENDING)])
        coll.create_index([('lat', pymongo.DESCENDING)])
        coll.create_index([('lon', pymongo.DESCENDING)])
    except:
        logging.warning('not able to get collections or set indexes')
    return coll

coll = create_collection('JG', 'profiles')

# Parse data files

In [10]:
files = glob('/storage/kakapo/*_padj.dat.gz')
header = ['latitude', 'longitude', 'day', 'month', 'year', 'hr', 'min',
          'sec', 'pressure', 'temperature', 'potential temperature',
          'salinity', 'DMODE status', 'WMO', 'cycle number', 'original netcdf file']



def get_gzip_lines(filename):
    with gzip.open(filename, "r") as f:
        data = f.readlines()
        lines = []
        for line in data:
            line = line.decode('utf-8')
            line = ' '.join(line.split())
            line = line.split(' ')
            line[-1] = line[-1].replace('/n', '')
            lines.append(line)
    return lines

def make_docs_from_lines(lines, presRange=[0,2000]):
    df = pd.DataFrame(data=lines, columns=header)
    df['profile_id'] = df['WMO'].apply(lambda x: str(x)) + '_'  + df['cycle number'].apply(lambda x: str(x))
    df = df[~df['profile_id'].isin(rejectList)]
    docs = []
    df = df.rename({'latitude': 'lat', 'longitude': 'lon', 'pressure':'pres',
                    'salinity':'psal', 'temperature': 'temp'}, axis=1)
    floatCols = ['lat', 'lon', 'pres', 'temp', 'psal']
    df[floatCols] = df[floatCols].astype(float)
    if presRange:
        df = df[(df['pres'].astype(float) >= presRange[0]) & (df['pres'].astype(float) <= presRange[1])]
    for profile_id, profDf in df.groupby('profile_id'):
        dl = profDf[['year', 'month', 'day']].astype(int).iloc[0]
        date = datetime(*dl)
        doc = {}
        doc['_id'] = profile_id
        doc['lat'] = profDf.lat.iloc[0]
        doc['lon'] = profDf.lon.iloc[0]
        doc['date'] = date
        meas = profDf[['pres', 'temp', 'psal']].to_dict(orient='records')
        if len(meas) == 0:
            continue
        
        doc['measurements'] = meas
        docs.append(doc)
        
    return docs
        
        

In [11]:
#coll.drop()
for fdx, filename in enumerate(files):
    lines = get_gzip_lines(filename)
    documents = make_docs_from_lines(lines)
    print('index: {0}, filename: {1} lines: {2}'.format(fdx, filename, len(documents)))
    try:
        coll.insert_many(documents, ordered=False)
    except pymongo.errors.BulkWriteError as bwe:
        writeErrors = bwe.details['writeErrors']
        problem_idx = []
        for we in writeErrors:
            problem_idx.append(we['index'])
        trouble_list = [documents[i] for i in problem_idx]
        for doc in trouble_list:
            coll.replace_one({'_id': doc['_id']}, doc, upsert=True)

index: 0, filename: /storage/kakapo/200401_padj.dat.gz lines: 1787
index: 1, filename: /storage/kakapo/200402_padj.dat.gz lines: 1707
index: 2, filename: /storage/kakapo/200403_padj.dat.gz lines: 1914
index: 3, filename: /storage/kakapo/200404_padj.dat.gz lines: 1875
index: 4, filename: /storage/kakapo/200405_padj.dat.gz lines: 2062
index: 5, filename: /storage/kakapo/200406_padj.dat.gz lines: 2009
index: 6, filename: /storage/kakapo/200407_padj.dat.gz lines: 2063
index: 7, filename: /storage/kakapo/200408_padj.dat.gz lines: 2179
index: 8, filename: /storage/kakapo/200409_padj.dat.gz lines: 2340
index: 9, filename: /storage/kakapo/200410_padj.dat.gz lines: 2527
index: 10, filename: /storage/kakapo/200411_padj.dat.gz lines: 2489
index: 11, filename: /storage/kakapo/200412_padj.dat.gz lines: 2701
index: 12, filename: /storage/kakapo/200501_padj.dat.gz lines: 2811
index: 13, filename: /storage/kakapo/200502_padj.dat.gz lines: 2660
index: 14, filename: /storage/kakapo/200503_padj.dat.gz li

index: 121, filename: /storage/kakapo/201402_padj.dat.gz lines: 10111
index: 122, filename: /storage/kakapo/201403_padj.dat.gz lines: 11188
index: 123, filename: /storage/kakapo/201404_padj.dat.gz lines: 11076
index: 124, filename: /storage/kakapo/201405_padj.dat.gz lines: 11399
index: 125, filename: /storage/kakapo/201406_padj.dat.gz lines: 11033
index: 126, filename: /storage/kakapo/201407_padj.dat.gz lines: 11431
index: 127, filename: /storage/kakapo/201408_padj.dat.gz lines: 11358
index: 128, filename: /storage/kakapo/201409_padj.dat.gz lines: 10888
index: 129, filename: /storage/kakapo/201410_padj.dat.gz lines: 11121
index: 130, filename: /storage/kakapo/201411_padj.dat.gz lines: 10910
index: 131, filename: /storage/kakapo/201412_padj.dat.gz lines: 11551
index: 132, filename: /storage/kakapo/201501_padj.dat.gz lines: 11853
index: 133, filename: /storage/kakapo/201502_padj.dat.gz lines: 10736
index: 134, filename: /storage/kakapo/201503_padj.dat.gz lines: 11701
index: 135, filename

# Interpolate

In [6]:
pc = PchipOceanSlices([10,20])
cursor = coll.find().batch_size(1000)
xintp = 10
tdx = 0
profiles = []
for profile in cursor:
    #interpolate profile
    profiles.append(profile)
    if len(profiles) > 25000:
        iTempDf = pc.make_interpolated_df(profiles, xintp, xLab='pres', yLab='temp')
        pc.saveIDF(iTempDf, 'JG-profiles-10.csv', tdx)
        tdx += 1
        profiles=[]
        

In [12]:
x_dup_idx = unique_idxs(x)

In [13]:
x_dup_idx

[0, 2]

In [15]:
xu = [x[idx] for idx in x_dup_idx]

In [16]:
xu

[9.0, 13.0]

{'_id': '5900038_38',
 'lat': -33.339,
 'lon': 111.249,
 'date': datetime.datetime(2004, 2, 8, 0, 0),
 'measurements': [{'pres': 5.7, 'temp': 19.543, 'psal': 35.767},
  {'pres': 10.9, 'temp': 19.513, 'psal': 35.902}]}

In [10]:



#conn.execute('''DROP TABLE IF EXISTS {};'''.format(tableName))
for fdx, filename in enumerate(files):
    if fdx <= 158:
        continue
    lines = get_gzip_lines(filename)
    df = pd.DataFrame(data=lines, columns=header)
    df['profile_id'] = df['WMO'].apply(lambda x: str(x)) + '_'  + df['cycle number'].apply(lambda x: str(x))
    dllist = df[['year', 'month', 'day', 'hr']].astype(int).values.tolist()
    dateList = [datetime(*dl) for dl in dllist]
    df['date'] = dateList
    df = df[~df['profile_id'].isin(rejectList)]
    print('index: {0}, filename: {1} lines: {2}, dfLen: {3}'.format(fdx, filename, len(lines), df.shape[0]))
    df.to_sql(tableName, con=conn, index=False, if_exists='append', dtype=dtypes, index_label=index_label)

index: 159, filename: /storage/kakapo/201704_padj.dat.gz lines: 5307015, dfLen: 5304828
index: 160, filename: /storage/kakapo/201705_padj.dat.gz lines: 5438359, dfLen: 5437042
index: 161, filename: /storage/kakapo/201706_padj.dat.gz lines: 5125342, dfLen: 5124233
index: 162, filename: /storage/kakapo/201707_padj.dat.gz lines: 5319867, dfLen: 5318640
index: 163, filename: /storage/kakapo/201708_padj.dat.gz lines: 5444536, dfLen: 5442363
index: 164, filename: /storage/kakapo/201709_padj.dat.gz lines: 5322740, dfLen: 5319026
index: 165, filename: /storage/kakapo/201710_padj.dat.gz lines: 5644258, dfLen: 5642081
index: 166, filename: /storage/kakapo/201711_padj.dat.gz lines: 5572708, dfLen: 5565489
index: 167, filename: /storage/kakapo/201712_padj.dat.gz lines: 5797202, dfLen: 5791986
index: 168, filename: /storage/kakapo/201801_padj.dat.gz lines: 5735981, dfLen: 5734770
index: 169, filename: /storage/kakapo/201802_padj.dat.gz lines: 5257042, dfLen: 5253413
index: 170, filename: /storage/k

# Querying

In [11]:
df[['year', 'month', 'day', 'hr', 'min']].astype(int).values.tolist()[0]

Exception ignored in: <function WeakKeyDictionary.__init__.<locals>.remove at 0x7f23284eb7b8>
Traceback (most recent call last):
  File "/home/tyler/anaconda3/envs/argo/lib/python3.6/weakref.py", line 356, in remove
    def remove(k, selfref=ref(self)):
KeyboardInterrupt


[2019, 7, 7, 7, 41]

In [12]:
profile_ids = pd.read_sql_query(
    '''
    SELECT DISTINCT profile_id FROM {};
    '''.format(tableName), conn)

In [13]:
print(profile_ids.shape)
profile_ids.head()

(1588159, 1)


Unnamed: 0,profile_id
0,1900022_67
1,1900022_68
2,1900022_69
3,1900037_76
4,1900037_77


In [14]:
cursor = conn.execute('''SELECT COUNT(*) FROM {};'''.format(tableName))

In [15]:
cursor.fetchall()

[(452563449,)]

In [16]:
cursor = conn.execute(
    '''SELECT sql 
FROM sqlite_master 
WHERE name = '{}';
'''.format(tableName))

In [18]:
cursor.fetchall()

[('CREATE TABLE profiles (\n\tlatitude NUMERIC(3), \n\tlongitude NUMERIC(3), \n\tday INTEGER, \n\tmonth INTEGER, \n\tyear INTEGER, \n\thr INTEGER, \n\tm ... (137 characters truncated) ... 3), \n\t"DMODE status" VARCHAR(1), \n\t"WMO" INTEGER, \n\t"cycle number" TEXT, \n\t"original netcdf file" TEXT, \n\tprofile_id TEXT, \n\tdate DATE\n)',)]

In [27]:
cursor = conn.execute('''CREATE INDEX idx_contacts_name 
ON {0} (profile_id);'''.format(tableName))

OperationalError: (sqlite3.OperationalError) database or disk is full
[SQL: CREATE INDEX idx_contacts_name 
ON profiles (profile_id);]
(Background on this error at: http://sqlalche.me/e/e3q8)

In [24]:
cursor.fetchall()

[]

In [129]:
profile_ids[~profile_ids['profile_id'].isin(rejectList)]

Unnamed: 0,profile_id
0,1900022_67
1,1900022_68
2,1900022_69
3,1900037_76
4,1900037_77
5,1900037_78
6,1900039_76
7,1900039_77
8,1900039_78
9,1900040_76


In [56]:
smdf = pd.read_sql_query(
    '''
    SELECT * FROM {} 
    WHERE year = 2004
    AND month = 1
    AND pressure BETWEEN 5 AND 15;
    '''.format(tableName), conn)

In [77]:
smdf = smdf[['latitude', 'longitude', 'day', 'month', 'year', 'hr', 'min', 'sec', 'pressure', 'temperature', 'profile_id']]
smdf.head()

Unnamed: 0,latitude,longitude,day,month,year,hr,min,sec,pressure,temperature,profile_id
0,24.393,298.67,3,1,2004,15,52,26,10.0,25.408,1900022_67
1,24.63,298.537,13,1,2004,16,1,30,10.0,24.904,1900022_68
2,24.853,297.923,23,1,2004,16,40,40,10.0,24.608,1900022_69
3,31.505,329.864,19,1,2004,11,24,51,10.0,20.552,1900037_77
4,32.704,311.464,10,1,2004,6,52,37,10.0,20.736,1900039_76


In [114]:
profiles = []
for profile_id, df in smdf.groupby('profile_id'):
    p = {}
    dl = df[['year', 'month', 'day', 'hr', 'min']].iloc[0].tolist()
    dt = datetime(*dl)
    p['date'] = dt
    p['profile_id'] = profile_id
    p['lat'] = df['latitude'].iloc[0]
    p['lon'] = df['longitude'].iloc[0]
    meas = df[['pressure', 'temperature']]
    meas = meas.rename({'pressure':'pres', 'temperature': 'temp'}, axis=1)
    p['measurements'] = meas.to_dict(orient='records')
    profiles.append(p)

In [112]:
dl = df[['year', 'month', 'day', 'hr', 'min']].iloc[0].tolist()
print(dl)
dt = datetime(*dl)
print(dt)

[2004, 1, 30, 12, 15]
2004-01-30 12:15:00


In [107]:
meas = df[['pressure', 'temperature']]
meas = meas.rename({'pressure':'pres', 'temperature': 'temp'}, axis=1)
meas.to_dict(orient='records')

[{'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres': 5.0, 'temp': 0.314},
 {'pres'

In [86]:
len(smdf.pressure.unique())

113

In [88]:
np.sort(smdf.pressure.unique())


array([ 5.   ,  5.1  ,  5.2  ,  5.3  ,  5.4  ,  5.5  ,  5.6  ,  5.7  ,
        5.8  ,  5.9  ,  6.   ,  6.1  ,  6.2  ,  6.3  ,  6.4  ,  6.425,
        6.5  ,  6.58 ,  6.6  ,  6.7  ,  6.8  ,  6.9  ,  7.   ,  7.1  ,
        7.14 ,  7.2  ,  7.3  ,  7.4  ,  7.5  ,  7.6  ,  7.65 ,  7.7  ,
        7.8  ,  7.84 ,  7.9  ,  8.   ,  8.1  ,  8.2  ,  8.3  ,  8.4  ,
        8.5  ,  8.6  ,  8.7  ,  8.8  ,  8.9  ,  8.96 ,  9.   ,  9.1  ,
        9.2  ,  9.225,  9.3  ,  9.32 ,  9.4  ,  9.5  ,  9.533,  9.6  ,
        9.7  ,  9.733,  9.8  ,  9.9  , 10.   , 10.1  , 10.2  , 10.3  ,
       10.4  , 10.5  , 10.6  , 10.7  , 10.8  , 10.825, 10.9  , 11.   ,
       11.1  , 11.2  , 11.3  , 11.4  , 11.5  , 11.6  , 11.7  , 11.8  ,
       11.9  , 12.   , 12.1  , 12.2  , 12.3  , 12.4  , 12.45 , 12.5  ,
       12.6  , 12.7  , 12.8  , 12.9  , 13.   , 13.1  , 13.2  , 13.3  ,
       13.4  , 13.5  , 13.6  , 13.7  , 13.8  , 13.9  , 14.   , 14.1  ,
       14.2  , 14.3  , 14.4  , 14.5  , 14.6  , 14.7  , 14.8  , 14.9  ,
      