In [1]:
import xarray as xr
import netCDF4 as nc
import pandas as pd
import urllib.request
from urllib.parse import urlparse
import os, json, requests
from bs4 import BeautifulSoup
import ipywidgets as widgets
from collections import defaultdict
import re

debug = False # Set to True to include detailed print statements (for debugging).

# Get available spurs datasets using podaac URL.
spurs_source='https://podaac.jpl.nasa.gov/api/cmr/dataset?format=umm_json&page_size=38&sortField=score&ids=Projects&values=SPURS&search='
series={}
with urllib.request.urlopen(spurs_source) as url:
    data = json.loads(url.read().decode())
    for item in data['items']:
        for element in item['umm']['RelatedUrls']:
            if (element['Type']=='USE SERVICE API'):
                dataset_name = item['umm']['CollectionCitations'][0]['SeriesName']
                series[dataset_name] = element['URL']
                
total_file_count = 0
urls = set()
for source in series.values():
    if source in urls:
        print ('Found duplicate URL: ' + source)
        continue
    urls.add(source)
    soup = BeautifulSoup(requests.get(source).content, 'html.parser')
    for a in soup.find_all('a', href=True):
        if a['href'].endswith('.nc') and 'viewers' not in a['href']:
            total_file_count += 1

print ('Finished reading URLS.')

varz_to_mapped_name = {}
mapped_names = set()
skip = True
with open("preprocess/remapped_varialbes.txt") as fp:
    for line in fp:
        if skip:
            skip = False
            continue
        groups = re.match(r"(.*) : (.*)", line).groups()
        varz_to_mapped_name[groups[0].strip()] = groups[1].strip()
        mapped_names.add(groups[1].strip())
sorted_mapped_names = sorted(mapped_names)
print ('Finished reading preprocess/remapped_varialbes.txt')

# Read preprocessed data to populate depth mapping.
varz_to_depth_assignment = {}
skip = True
with open("preprocess/varz_to_depth_assignment.txt") as fp:
    for line in fp:
        if skip:
            skip = False
            continue
        groups = re.match(r"(.*) : (.*)", line).groups()
        varz_to_depth_assignment[groups[0].strip()] = groups[1].strip()
print ('Finished reading preprocess/varz_to_depth_assignment.txt')

print ('done.')

Found duplicate URL: https://podaac-opendap.jpl.nasa.gov/opendap/allData/insitu/L2/spurs1/mooring/
Found duplicate URL: https://podaac-opendap.jpl.nasa.gov/opendap/allData/insitu/L2/spurs2/mooring/
Finished reading URLS.
Finished reading preprocess/remapped_varialbes.txt
Finished reading preprocess/varz_to_depth_assignment.txt
done.


In [2]:
# Display dataset selection widget.

style = {'description_width': 'initial'}
w1 = widgets.SelectMultiple(
    options=series,
    rows=10,
    layout=widgets.Layout(width='600px'),
    description='Please select SPURS data sources',
    style=style,
    disabled=False
)  
display(w1)

SelectMultiple(description='Please select SPURS data sources', layout=Layout(width='600px'), options={'SPURS-1…

In [8]:
import dask.dataframe
from datetime import datetime

# Get NetCDF files for selected datasets. Load data in dataframes.

debug = False # set to True to print more debugging statements.
ignore_depth_assignment = True # set to false to consider depth assignment (note that there are some files with multiple depth assingment values, we throw error for such cases)
url_prefix = 'https://podaac-opendap.jpl.nasa.gov/opendap/allData/insitu/L2/'

file_name_to_dataset_name = {}
file_name_to_var_name_dict = {}
file_name_to_coords = {}
file_name_to_records = {}
file_name_to_dask_records = {}
file_name_to_matched_depth_assignment_varz_to_depth_map = {}

times = ['time', 'Time', 'TIME', 'JULD', 'ctd_time']
special_cols = ['time', 'lat', 'latitude', 'lon', 'longitude', 'depth', 'z', 'juld']

total_file_count = 0
urls = set()
for source in w1.value:
    if source in urls:
        print ('Found duplicate URL: ' + source)
        continue
    urls.add(source)
    soup = BeautifulSoup(requests.get(source).content, 'html.parser')
    for a in soup.find_all('a', href=True):
        if a['href'].endswith('.nc') and 'viewers' not in a['href']:
            total_file_count += 1

processed_files = []
for source in w1.value:
    url_index = source.index(url_prefix)
    # use url to get dataset name, for example that name for dataset in https://podaac/../../spurs1/adcp/ will be spurs1_adcp
    dataset_name = source[(url_index + len(url_prefix)):-1].replace('/', '_')
    print ("selected: " + source)
    print ('dataset_name: ' + dataset_name)
    soup = BeautifulSoup(requests.get(source).content, 'html.parser')
    for a in soup.find_all('a', href=True):
        if a['href'].endswith('.nc') and 'viewers' not in a['href']:
            netcdf=source+a['href']
            
            # Download and process files for this dataset.
            base_dir = 'netcdf/'
            full_file_name = os.path.basename(urlparse(netcdf).path)
            file_name = full_file_name[0:full_file_name.index('.nc')]
            file_name_to_dataset_name[file_name] = dataset_name
            file_name_to_var_name_dict[file_name] = {}
            file_name_to_coords[file_name] = []
            
            urllib.request.urlretrieve(netcdf, base_dir + file_name)
            netcdf_data = xr.open_dataset(base_dir + file_name,decode_times=False,engine='netcdf4')
            standard_name = lambda v: v is not None
            netcdf_data = netcdf_data.filter_by_attrs(standard_name=standard_name)
            
            print ('processing file: ' + file_name)
            
            vars_has_time = False
            coords_has_time = False
            
            # Required for depth_assignment changes.
            matched_depth_assignment_varz_to_depth_map = {}
            
            for coord in netcdf_data.coords:
                file_name_to_coords[file_name].append(coord)
            
            # Here symbol is like u, v, etc (i.e., variable name present in data frame)
            for symbol in netcdf_data.keys():
                if symbol.strip().lower() in special_cols:
                    file_name_to_coords[file_name].append(symbol)
                    if debug:
                        print ('symbol: ' + symbol + ' is treated as coord as it is in special cols: ' + ','.join(special_cols))
                    continue
                file_name_to_var_name_dict[file_name][symbol] = netcdf_data[symbol].standard_name
                if symbol in times:
                    vars_has_time = True
                # Following is for depth_assignment changes.
                standard_name = netcdf_data[symbol].standard_name
                key = symbol + '*' + standard_name
                if key not in varz_to_mapped_name.keys():
                    # ignore symbols not present in preprocess/remapped_varaibles.txt.
                    continue
                if key in varz_to_depth_assignment.keys():
                    # stores depth_assignment for the varz present in this file that also has a depth assignment defined in varz_to_depth_assignment map.
                    #
                    # for example, air_temp_10*air_temperature is a varz, if it is present in data_frame then depth has to be set to 10.
                    # we store <air_temperature, 10> as the matched values for this data_frame (to be used while exporting data).
                    # we assert that only 1 such varz should match in the data_frame (hence we expect only 1 depth value).
                    # so we populate 1 depth value for a file in file_name_to_depth_assignment map.
                    
                    # print ('adding depth_assignment match: symbol*standard_name->mapped_name: ' + key + ' -> ' + varz_to_mapped_name[key])
                    matched_depth_assignment_varz_to_depth_map[key] = varz_to_depth_assignment[key]
            file_name_to_matched_depth_assignment_varz_to_depth_map[file_name] = matched_depth_assignment_varz_to_depth_map
            
            if debug:
                print (netcdf_data)
                
            for t in times:
                if t in netcdf_data.keys():
                    time_var = t
            
            time_origin=netcdf_data[time_var].units.split(' ')[2] # 1950-01-01
            if netcdf_data[time_var].units.split(' ')[0] == 'seconds':
                time_unit = 's'
            else:
                time_unit='d'
                
            if debug:
                if time_var == 'JULD':
                    print ('Found JULD')
                if time_origin != '1970-01-01':
                    print ('Found time_origin: ' + time_origin)
                
                print (time_var)
                print (time_unit)
                print (time_origin)
            
            chunksize = 100
            chunkprams = {}
            for k,v in dict(netcdf_data.dims).items():
                divs = [1000, 100, 10]
                for d in divs: 
                    chunksize = int(v/d)
                    if d == 1000 and chunksize > 1:
                        break
                    if chunksize < 10:
                        chunksize = v
                        break
                if v > chunksize :
                    chunkprams[k] = chunksize
            
            print ('chunking the dataframe (to handle memory issues) - chunkparams: ')        
            print (chunkprams)
            
            dd = netcdf_data.chunk(chunkprams).to_dask_dataframe()
            dd[time_var] = dd[time_var].map(lambda ts: pd.to_datetime(ts, unit=time_unit, origin=time_origin.rstrip('Z')))
            file_name_to_dask_records[file_name] = dd
            processed_files.append(a['href'])
            
            if debug:
                print (file_name_to_var_name_dict[file_name])
                print (file_name_to_coords[file_name])

            print ('Processed ' + str(len(processed_files)) + ' / ' + str(total_file_count) + ' files.')
print ('\n')

if debug:
    for file_name, matched_depth_assignment_varz_to_depth_map in file_name_to_matched_depth_assignment_varz_to_depth_map.items():
        print ('File: ' + file_name + ' has depth_assignment.')
        for varz, depth in matched_depth_assignment_varz_to_depth_map.items():
            print ('symbol*standard_name : depth -> ' + varz +  ' : ' +str(depth))

print ('done.')

selected: https://podaac-opendap.jpl.nasa.gov/opendap/allData/insitu/L2/spurs1/ecomapper/
dataset_name: spurs1_ecomapper
processing file: SPURS_Knorr_Ecomapper1
chunking the dataframe (to handle memory issues) - chunkparams: 
{'obs': 5}
Processed 1 / 2 files.
processing file: SPURS_Knorr_Ecomapper2
chunking the dataframe (to handle memory issues) - chunkparams: 
{'obs': 55}
Processed 2 / 2 files.


done.


In [9]:
# Display coords/variable selection widget.
# Format of coords displayed: 'coord : coord_name'
# Format of vars displayed: 'var_name : var_standard_name'

variable_widgets = []
file_name_to_values = {}
for file_name in file_name_to_var_name_dict.keys():
    values = []
    for c in file_name_to_coords[file_name]:
        found = False
        for k in varz_to_mapped_name.keys():
            if found:
                break
            keys = k.split('*')
            symbol = keys[0]
            standard_name = keys[1]
            if c == symbol or c == standard_name:
                values.append('coord : ' + varz_to_mapped_name[k])
                found = True
    for symbol,standard_name in file_name_to_var_name_dict[file_name].items():
        if symbol + '*' + standard_name in varz_to_mapped_name.keys():
            values.append( symbol + ' : ' + varz_to_mapped_name[symbol + '*' + standard_name])
    file_name_to_values[file_name] = values

file_number = 1
for file_name in file_name_to_var_name_dict.keys():
    description = str(file_number) + ". " + file_name
    file_number += 1
    style = {'description_width': 'initial'}
    w = widgets.SelectMultiple(
        options=file_name_to_values[file_name],
        rows=10,
        layout=widgets.Layout(width='600px'),
        description=description,
        style=style,
        disabled=False
    )
    variable_widgets.append(w)
    display(w)

SelectMultiple(description='1. SPURS_Knorr_Ecomapper1', layout=Layout(width='600px'), options=('coord : time',…

SelectMultiple(description='2. SPURS_Knorr_Ecomapper2', layout=Layout(width='600px'), options=('coord : time',…

In [10]:
# Set this to True in order to include all coords for a file, when vars are selected.
should_include_all_coords = True

file_name_to_selected_coords = defaultdict(list)
file_name_to_selected_varz = defaultdict(list)
for w in variable_widgets:
    idx = w.description.index(' ')
    file_name = w.description[idx+1:]
    for file_var, file_standard_var in file_name_to_var_name_dict[file_name].items():
        if file_var + '*' + file_standard_var in varz_to_mapped_name.keys():
            mapped_name = varz_to_mapped_name[file_var + '*' + file_standard_var]
            if file_var + ' : ' + mapped_name in w.value:
                file_name_to_selected_varz[file_name].append(file_var + '*' + file_standard_var)
    for coord in file_name_to_coords[file_name]:
        if (should_include_all_coords and len(file_name_to_selected_varz[file_name]) > 0) or 'coord : ' + coord in w.value:
            file_name_to_selected_coords[file_name].append(coord)

if should_include_all_coords:
    print ('Including all coords for selected vars ...')
else:
    print ('Including only selected coords and vars ...')

for file_name in sorted(file_name_to_selected_coords):
    selected_coords = file_name_to_selected_coords[file_name]
    if len(selected_coords) == 0:
        continue
    print (str(file_name) + " : coords: " + ','.join(selected_coords))
for file_name in sorted(file_name_to_selected_varz.keys()):
    selected_vars = file_name_to_selected_varz[file_name]
    if len(selected_vars) == 0:
        continue
    print (str(file_name) + " : vars: " + ','.join(selected_vars))
print ('done.')

Including all coords for selected vars ...
SPURS_Knorr_Ecomapper1 : coords: time,z,longitude,latitude
SPURS_Knorr_Ecomapper2 : coords: time,z,longitude,latitude
SPURS_Knorr_Ecomapper1 : vars: conductivity*conductivity
SPURS_Knorr_Ecomapper2 : vars: YSI_temperature*temperature
done.


In [11]:
# Read csv header from preprocess/final_header.txt.
import csv

debug = False

csv_cols = ['dataset_name', 'file_name']
with open('preprocess/final_header.txt', 'r') as f:
    for line in f:
        csv_cols.append(line.strip())

# Write csv header to output/sprus_data.csv.
with open('output/spurs_data.csv', 'w', newline='') as outcsv:
    writer = csv.writer(outcsv)
    writer.writerow(csv_cols)
print ('Total csv_cols (coords + vars): ' + str(len(csv_cols)))

header_dict = {}
for i in range(len(csv_cols)):
    header_dict[csv_cols[i]] = i

if debug == True:
    print (header_dict)
    print (file_name_to_selected_coords)
    print (file_name_to_selected_varz)
print ('Done.')

Total csv_cols (coords + vars): 101
Done.


In [12]:
import csv
pd.options.mode.chained_assignment = None

debug = False

# limiting to 3 during demo as CSV file is getting too big. 
# Set to -1 to load all records for every selected file.
max_records_per_file = 10

# Consider following special columns as coordinates.
special_cols = ['time', 'lat', 'latitude', 'lon', 'longitude', 'depth', 'z', 'juld']

selected_file_names = [name for name in sorted(file_name_to_selected_coords.keys())]
for file_name in [name for name in sorted(file_name_to_selected_varz.keys())]:
    selected_vars = file_name_to_selected_varz[file_name]
    if len(selected_vars) == 0:
            continue
    if file_name not in selected_file_names:
        selected_file_names.append(file_name)
if debug:
    print ('selected_file_names: ' + ','.join(selected_file_names))

for file_name in selected_file_names:
    dataset_name = file_name_to_dataset_name[file_name]
    print ('Data for file: ' + file_name + ' in dataset: ' + dataset_name)
    
    # Store df coords/vars to csv coords/vars for easier lookup while preparing CSV row from df row.
    df_cols = []
    df_to_csv_dict = {}
    
    for coord in file_name_to_selected_coords[file_name]:
        _coord = coord
        special_coord = coord.lower().strip()
        if special_coord in special_cols:
            if 'lat' in special_coord:
                special_coord = 'latitude'
            elif 'lon' in special_coord:
                special_coord = 'longitude'
            elif 'z' in special_coord:
                special_coord = 'depth'
            elif 'juld' in special_coord:
                special_coord = 'time'
            _coord = special_coord
            
        if _coord not in csv_cols:
            print ('coord: ' + _coord + ' NOT FOUND in csv cols - THIS SHOULD NOT HAPPEN.')
        else:
            df_cols.append(coord)
            df_to_csv_dict[coord] = _coord
            
    if debug:
        print ('coords...')
        print ('df_cols: ' + ','.join(df_cols))
        print ('df_to_csv_dict: ' + str(df_to_csv_dict))
    
    for key in file_name_to_selected_varz[file_name]:
        # key = symbol + '*' + standard_name
        if key in varz_to_mapped_name.keys():
            mapped_name = varz_to_mapped_name[key]
            symbol = key.split('*')[0]
            special_symbol = symbol.lower().strip()
            if symbol in df_to_csv_dict.keys() or special_symbol in df_to_csv_dict.keys():
                print ('symbol: ' + symbol + ' already present in df_to_csv_dict - THIS SHOULD BE RARE.')
                continue
            if special_symbol in special_cols:
                print ('symbol: ' + symbol + ' is in special cols, treat it as coordinate - THIS SHOULD BE RARE.')
                if 'lat' in special_symbol:
                    special_symbol = 'latitude'
                elif 'lon' in special_symbol:
                    special_symbol = 'longitude'
                elif 'z' in special_symbol:
                    special_symbol = 'depth'
                mapped_name = special_symbol
            df_cols.append(symbol)
            df_to_csv_dict[symbol] = mapped_name
    if debug:
        print ('vars...')
        print ('df_cols: ' + ','.join(df_cols))
        print ('df_to_csv_dict: ' + str(df_to_csv_dict))
    
    df_header = list(df_to_csv_dict.keys())
    dd = file_name_to_dask_records[file_name]
    _df = dd[df_header]
        
    matched_depth_assignment_varz_to_depth_map = file_name_to_matched_depth_assignment_varz_to_depth_map[file_name]
    
    # Prepare csv_row by populating csv_indexes for coor/var in df. By default include dataset_name, file_name.
    csv_row = ['']*len(header_dict)
    csv_row[0] = dataset_name 
    csv_row[1] = file_name
    csv_row_count = 0
    
    with open('output/spurs_data.csv', 'a+', newline='') as out_csv:
        writer = csv.writer(out_csv)
        for index, df_row in _df.iterrows():
            depth_values = []
            for df_col in df_cols:
                if df_col not in df_to_csv_dict.keys():
                    print ('df coord/var: ' + df_col + ' not in df_to_csv_dict - THIS SHOULD NOT HAPPEN.')
                    continue
                csv_col = df_to_csv_dict[df_col]
                
                if debug:
                    print ('df_col: ' + df_col)
                    print ('csv_col: ' + csv_col)
                
                if csv_col not in header_dict:
                    print ('csv coord/var: ' + csv_col + ' not in csv header cols - THIS SHOULD NOT HAPPEN.')
                    continue

                csv_idx = header_dict[csv_col]
                csv_row[csv_idx] = df_row[df_col]

                if debug:
                    print ('appending csv_col: ' + csv_col + 'at csv_idx: ' + str(csv_idx) + ' value: ' + str(df_row[df_col]) + ' for df_col: ' + df_col)
            
                # print ('CHECKING FOR DEPTH ASSIGNMENT UPDATES....')
                for varz, depth_assignment in matched_depth_assignment_varz_to_depth_map.items():
                    # print ('df_col: ' + df_col + ' symbol*standard_name : depth ' + varz + ' : '+ str(depth_assignment))
                    if df_col == varz.split('*')[0]:
                        depth_values.append(depth_assignment)
            
            # print row before writing to csv.
            if len(depth_values) > 0:
                depth_idx = header_dict['depth']
                for depth_value in depth_values:
                    csv_row_with_depth = csv_row
                    csv_row_with_depth[depth_idx] = depth_value
                    print (csv_row_with_depth)
                    writer.writerow(csv_row_with_depth)
            else:
                print (csv_row)
                writer.writerow(csv_row)
            csv_row_count += 1
            if max_records_per_file != -1 and csv_row_count >= max_records_per_file:
                print ('reached max_records_per_file: ' + str(max_records_per_file))
                break

print ('Done.')

Data for file: SPURS_Knorr_Ecomapper1 in dataset: spurs1_ecomapper
['spurs1_ecomapper', 'SPURS_Knorr_Ecomapper1', Timestamp('2012-09-30 16:02:50.860003328'), 0.06300000101327896, 26.1494083404541, -38.33799743652344, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', 6.203744411468506, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
['spurs1_ecomapper', 'SPURS_Knorr_Ecomapper1', Timestamp('2012-09-30 16:02:51.879993600'), 0.02800000086426735, 26.14940643310547, -38.3380012512207, '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', ''