In [1]:
from pymongo import MongoClient
import os
import time
import pandas as pd
import datetime as dt
import multiprocessing as mp

client = MongoClient("localhost", 27017, client= False)
db = client['usgs']

In [3]:
def parse_datetime(d, t, tz):
    dt_obj = dt.datetime(year   = int(d[0:4]), 
                         month  = int(d[4:6]), 
                         day    = int(d[6:8]), 
                         hour   = int(t[0:2]), 
                         minute = int(t[2:4]), 
                         second = int(t[4:6]))
    return int(time.mktime(dt_obj.utctimetuple()) + tz*3600)


def load_sites_info(db, data_folder, sites_info_file, overwrite = True, verbosity = 100):
    filelist = [f for f in os.listdir(data_folder) if os.path.isfile(os.path.join(data_folder, f))]
    df = pd.read_csv(sites_info_file)

    collection = db['sites']
    if overwrite:
        collection.delete_many({})

    for i, f in enumerate(filelist):
        tags = f.split('.')    
        if i % verbosity == 0:
            print(i, f)
            
        if tags[3] == 'web' or tags[3] == 'comp':
            continue

        with open(os.path.join(data_folder, f)) as file:
            while file.readline()[0] == '#':
                pass
            file.readline()    
            line = file.readline()
            if len(line) > 0:
                prec = int(line.split('\t')[4])
            else:
                prec = -1

        site_no = int(tags[1])
        site_info = df.loc[df['SITE_NO'] == site_no].iloc[0]
        site = {
            "site_no": site_no,
            "description": site_info['STATION_NM'],
            "lat": site_info['DEC_LAT_VA'],
            "lon": site_info['DEC_LONG_V'],
            "state": site_info['STATE_NM'],
            "district": site_info['DISTRICT_N'],
            "drain_area": site_info['DRAIN_AREA'],
            "status": site_info['STATUS_15'],
            "precision": prec,
        }
        collection.insert_one(site)  
        
    return collection


def load_data_worker(measured, computed, filelist, data_folder):
    tz_codes = {'AKDT': -8, 'AKST': -9, 'AST' : -4, 'CDT' : -6, 'CST' : -5, 'EDT' : -4, 'EST' : -5, 'GST' : -2, 
                'HST' : -10, 'MDT' : -6, 'MST' : -7, 'PDT' : -7, 'PST' : -8}    
    for i, f in enumerate(filelist):
        tags = f.split('.')            
        if tags[3] != 'meas' and tags[3] != 'comp':
            continue
        site_no = int(tags[1])
        
        utc = list()
        gh = list()
        with open(os.path.join(data_folder, f)) as file:
            while file.readline()[0] == '#':
                pass
            file.readline()        
            for line in file.readlines():
                if len(line) < 1:
                    continue
                val = line.split('\t')
                ts = parse_datetime(val[0], val[1], tz_codes[val[2]])
                utc.append(ts)
                gh.append(float(val[3]))        
        if len(utc) < 2 or len(gh) < 2:
            continue            
        measurement = {"site_no": site_no, "utc": utc, "gh": gh} 
        if tags[3] == "meas":
            measured.insert_one(measurement)
        else:
            computed.insert_one(measurement)        
    return

        
def load_measurement_data(db, data_folder, n_jobs = 4, overwrite = True, verbosity = 100):
    filelist = [f for f in os.listdir(data_folder) if os.path.isfile(os.path.join(data_folder, f))]
        
    if overwrite:
        db['measured'].delete_many({})
        db['computed'].delete_many({})
        
    n = len(filelist)
    k = n // n_jobs

    jobs = list()
    for i in range(n_jobs):
        chunk = filelist[i*k:min((i+1)*k, n)]
        p = mp.Process(target = load_data_worker, 
                       args   = (db['measured'], db['computed'], chunk, data_folder))
        jobs.append(p)
        p.start()   
    
    for job in jobs:
        job.join()
    
    return db['meas'], db['comp']

In [4]:
print('Loading sites info...')
#sites = load_sites_info(db, data_folder = 'data/', sites_info_file = 'sites.csv', overwrite = True, verbosity = 100)
print('...done.')

print('Loading measurments data...')
measured, verified = load_measurement_data(db, data_folder = 'data/', overwrite = True, verbosity = 5)
print('...done.')

print('Creating indexes...')
#sites.create_index('site_no')
measured.create_index('site_no')
verified.create_index('site_no')
print('...done.')

Loading sites info...
...done.
Loading measurments data...


  "MongoClient opened before fork. Create MongoClient "
  "MongoClient opened before fork. Create MongoClient "
  "MongoClient opened before fork. Create MongoClient "
  "MongoClient opened before fork. Create MongoClient "


...done.
Creating indexes...
...done.


In [22]:
db['measured'].count()

0