Skip to content

Commit

Permalink
Extending existing plugins to open Level 3 data (#13)
Browse files Browse the repository at this point in the history
* Extend the definition of MODIS_L3 to include monthly files.

* Rename the CCI products with a _L2 suffix, to be consistent with the MODIS products.

* Minor, pedantic code tidying.

* Workaround for badly named variabled in PP files.

* Removing warnings from Iris 2.0.0 and pyHDF 0.9.0.

* Remove Aeronet from AProduct.get_variable_names() as there are many other suffixes.

* Extend the Aeronet routines to read the files from the Version 3 Direct Sun Algorithm, either version of the
Spectral Decomposition Algorithm, and the Maritime Aerosol Network.

* A useful addition to the HDF_SDS class, returning an OrderedDict of the available dimensions.

* Extend the CCI plugin to open Level 3 data. These most inherit from NetCDF_Gridded.

* Switch from linecache to islice to obtain a few lines from an Aeronet file.

Aeronet now distributes it's all-sites data as a single file, which spans 10s
of Gb. linecache opens the entire file, which is rather resource intensive.
islice uses minimal resources when the file is only being accessed a few times.

* A few bits of tidying/warning elimination forgotten in a previous commit.

* Missing values for MAN files were more complicated than originally thought.

* get_aeronet_file_variables() returns a unique list of length equal to the data.

In some Aeronet versions, only the first Exact_Wavelength field is named.
AOD_Empty can be repeated. Each of these are now listed in full, appending .{}
to the name.

Also, the exact names for the geolocation fields (e.g. date) vary. These are
now standardised for ease of reference.

* Switch from numpy to pandas to read Aeronet files.

Aeronet now distributes it's all-sites data as a single file, which spans 10s
of Gb. np.getfromtxt() requires twice that memory to open such a file.
pd.read_csv() manages memory more sensibly.

Also, update the product definition to include level 1.5 data.

* Update the version number and dependencies.

* Resolve ambiguity in the depenedency on the iris package.

* Move to Python 3.6 in the Travis to satisfy the Iris requirement.

* Remove iris from the pip dependencies until they push to V2.1 and remove the matplotlib<1.9 requirement.
  • Loading branch information
adamcpovey authored and duncanwp committed Jan 21, 2019
1 parent 0cf03aa commit a3d344e
Show file tree
Hide file tree
Showing 21 changed files with 334 additions and 125 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -3,7 +3,7 @@ sudo: false

#python:
# - 2.7
# - 3.4
# - 3.6

env:
- TEST_TARGET=unit PYTHON_TARGET=2.7
Expand Down
6 changes: 3 additions & 3 deletions cis/__init__.py
Expand Up @@ -35,7 +35,7 @@ def read_data(filenames, variable, product=None):
include all files in that directory, or wildcards such as ``*`` or ``?``.
:type filenames: string or list
:param str variable: The variable to read from the files
:param str product: The name of the data reading plugin to use to read the data (e.g. ``Cloud_CCI``).
:param str product: The name of the data reading plugin to use to read the data (e.g. ``Cloud_CCI_L2``).
:return: The specified data as either a :class:`GriddedData` or :class:`UngriddedData` object.
"""
data_list = read_data_list(filenames, variable, product)
Expand All @@ -54,7 +54,7 @@ def read_data_list(filenames, variables, product=None, aliases=None):
:type filenames: string or list
:param variables: One or more variables to read from the files
:type variables: string or list
:param str product: The name of the data reading plugin to use to read the data (e.g. ``Cloud_CCI``).
:param str product: The name of the data reading plugin to use to read the data (e.g. ``Cloud_CCI_L2``).
:param aliases: List of aliases to put on each variable's data object as an alternative means of identifying them.
:type aliases: string or list
:return: A list of the data read out (either a :class:`GriddedDataList` or :class:`UngriddedDataList` depending on
Expand All @@ -78,7 +78,7 @@ def get_variables(filenames, product=None, type=None):
separated list, or a :class:`list` of string filenames. Filenames can include directories which will be expanded to
include all files in that directory, or wildcards such as ``*`` or ``?``.
:type filenames: string or list
:param str product: The name of the data reading plugin to use to read the data (e.g. ``Cloud_CCI``).
:param str product: The name of the data reading plugin to use to read the data (e.g. ``Cloud_CCI_L2``).
:param str type: The type of HDF data to read, i.e. 'VD' or 'SD'
:return: A list of the variables
"""
Expand Down
214 changes: 154 additions & 60 deletions cis/data_io/aeronet.py
Expand Up @@ -2,28 +2,129 @@

defaultdeletechars = """~!@#$%^&*=+~\|]}[{'; /?.>,<"""

AERONET_HEADER_LENGTH = {"AERONET-SDA/2" : 5, "AERONET/2" : 5, "MAN-SDA/2" : 5, "MAN/2" : 5,
"AERONET-SDA/3" : 7, "AERONET/3" : 7}
AERONET_MISSING_VALUE = {"AERONET-SDA/2" : 'N/A', "AERONET/2" :'N/A', "MAN-SDA/2" : -999.0, "MAN/2" : (-999.0, -10000),
"AERONET-SDA/3" : -999.0, "AERONET/3" : -999.0}

V2_HEADER = "Version 2 Direct Sun Algorithm"
V3_HEADER = "AERONET Version 3;"

AERONET_COORDINATE_RENAME = {
"Date(dd:mm:yy)" : "date",
"Date(dd-mm-yy)" : "date",
"Date(dd:mm:yyyy)" : "date",
"Date(dd-mm-yyyy)" : "date",
"Date_(dd:mm:yy)" : "date",
"Date_(dd-mm-yy)" : "date",
"Date_(dd:mm:yyyy)" : "date",
"Date_(dd-mm-yyyy)" : "date",
"Time(hh:mm:ss)" : "time",
"Time(hh-mm-ss)" : "time",
"Time_(hh:mm:ss)" : "time",
"Time_(hh-mm-ss)" : "time",
"Latitude" : "latitude",
"Longitude" : "longitude",
"Site_Latitude(Degrees)" : "latitude",
"Site_Longitude(Degrees)" : "longitude",
"Site_Elevation(m)" : "altitude",
}

def get_slice_of_lines_from_file(filename, start, end):
"""Grab a subset of lines from a file, defined using slice-style start:end.
This uses less memory than the equivalent linecache.getline()."""
from itertools import islice
with open(filename) as fileobj:
return list(islice(fileobj, start, end))

def get_aeronet_version(filename):
"""
Classifies the format of an Aeronet file based on it's header.
:param filename: Full path to the file to read.
:return str: AERONET or MAN, followed by an optional -SDA, and /# where # is the version number.
"""
from cis.exceptions import FileFormatError

first_line, second_line = get_slice_of_lines_from_file(filename, 0, 2)

man = "Maritime Aerosol Network" in first_line
sda = "SDA" in first_line

if man:
return "MAN-SDA/2" if sda else "MAN/2"

if first_line.startswith(V3_HEADER):
return "AERONET-SDA/3" if sda else "AERONET/3"

def get_aeronet_file_variables(filename):
if second_line.startswith(V2_HEADER):
return "AERONET-SDA/2" if sda else "AERONET/2"

raise FileFormatError(["Unable to determine Aeronet file version", filename],
"Unable to determine Aeronet file version " + filename)


def get_aeronet_file_variables(filename, version=None):
"""
Return a list of valid Aeronet file variables with invalid characters removed. We need to remove invalid characters
primarily for writing back out to CF-compliant NetCDF.
:param filename: Full path to the file to read
:return: A list of Aeronet variable names in the order they appear in the file
"""
import linecache
vars = linecache.getline(filename, 5).split(",")
for i in range(0, len(vars)):
from collections import Counter

if version is None:
version = get_aeronet_version(filename)

try:
first_line, second_line = get_slice_of_lines_from_file(filename, AERONET_HEADER_LENGTH[version]-1,
AERONET_HEADER_LENGTH[version]+1)
except ValueError:
# Aeronet files can, for some reason, contain no data
return []

variables = first_line.replace("\n", "").split(",")

# The SDA files don't list all of the columns
if variables[-1] == "Exact_Wavelengths_for_Input_AOD(um)" or variables[-1] == "Exact_Wavelengths_for_Input_AOD(nm)":
original_name = variables[-1]

# Find the number of wavelengths from the first data line
values = second_line.split(",")
n_wavelengths = int(values[len(variables)-2])
for i in range(n_wavelengths-1):
variables.append(original_name)

repeated_items = {var:-1 for var, num in Counter(variables).items() if num > 1}

final_variables = []
for var in variables:
# Add a numerical counter to repeated variable names
if var in repeated_items:
repeated_items[var] += 1
var += ".{}".format(repeated_items[var])

# Remove nonstandard characters
for char in defaultdeletechars:
vars[i] = vars[i].replace(char, "")
return [var.strip() for var in vars]
var = var.replace(char, "")

var = var.strip()

def load_multiple_aeronet(fnames, variables=None):
# Enforce standardised names for the coordinate fields
try:
final_variables.append(AERONET_COORDINATE_RENAME[var])
except KeyError:
final_variables.append(var)

return final_variables


def load_multiple_aeronet(filenames, variables=None):
from cis.utils import add_element_to_list_in_dict, concatenate

adata = {}

for filename in fnames:
for filename in filenames:
logging.debug("reading file: " + filename)

# reading in all variables into a dictionary:
Expand All @@ -38,68 +139,60 @@ def load_multiple_aeronet(fnames, variables=None):
return adata


def load_aeronet(fname, variables=None):
def load_aeronet(filename, variables=None):
"""
loads aeronet lev 2.0 csv file.
Loads aeronet csv file.
Originally from http://code.google.com/p/metamet/
License: GNU GPL v3
:param fname: data file name
:param filename: data file name
:param variables: A list of variables to return
:return: A dictionary of variables names and numpy arrays containing the data for that variable
"""
import numpy as np
from numpy import ma
from datetime import datetime, timedelta
from cis.time_util import cis_standard_time_unit
from cis.exceptions import InvalidVariableError
from cis.time_util import cis_standard_time_unit
from copy import copy
from numpy.ma import masked_invalid
from pandas import read_csv, to_datetime

std_day = cis_standard_time_unit.num2date(0)

ordered_vars = get_aeronet_file_variables(fname)
version = get_aeronet_version(filename)
ordered_vars = get_aeronet_file_variables(filename, version)
if len(ordered_vars) == 0:
return {}

def date2daynum(datestr):
the_day = datetime(int(datestr[-4:]), int(datestr[3:5]), int(datestr[:2]))
return float((the_day - std_day).days)
# Load all available geolocation information and any requested variables
cols = [var for var in ("date", "time", "latitude", "longitude", "altitude") if var in ordered_vars]
if cols is not None:
cols.extend(variables)

def time2fractionalday(timestr):
td = timedelta(hours=int(timestr[:2]), minutes=int(timestr[3:5]), seconds=int(timestr[6:8]))
return td.total_seconds()/(24.0*60.0*60.0)
dtypes = {var:'str' if var in ("date", "time") else "float" for var in cols}

try:
rawd = np.genfromtxt(fname, skip_header=5, delimiter=',', names=ordered_vars,
converters={0: date2daynum, 1: time2fractionalday, 'Last_Processing_Date': date2daynum},
dtype=np.float64, missing_values='N/A', usemask=True)
except (StopIteration, IndexError) as e:
raise IOError(e)

lend = len(rawd)
# The date and time column are already in days since cis standard time, and fractional days respectively, so we can
# just add them together
# Find the columns by number rather than name as some older versions of numpy mangle the special characters
datetimes = rawd[rawd.dtype.names[0]] + rawd[rawd.dtype.names[1]]

metadata = get_file_metadata(fname)
lon = np.zeros(lend) + float(metadata.misc[2][1].split("=")[1])
lat = np.zeros(lend) + float(metadata.misc[2][2].split("=")[1])
alt = np.zeros(lend) + float(metadata.misc[2][3].split("=")[1])

data_dict = {}
if variables is not None:
for key in variables:
try:
# Again, we can't trust the numpy names so we have to use our pre-read names to index the right column
data_dict[key] = rawd[rawd.dtype.names[ordered_vars.index(key)]]
except ValueError:
raise InvalidVariableError(key + " does not exist in " + fname)

data_dict["datetime"] = ma.array(datetimes)
data_dict["longitude"] = ma.array(lon)
data_dict["latitude"] = ma.array(lat)
data_dict["altitude"] = ma.array(alt)

return data_dict
rawd = read_csv(filename, sep=",", header=AERONET_HEADER_LENGTH[version]-1, names=ordered_vars,
index_col=False, usecols=cols, na_values=AERONET_MISSING_VALUE[version], dtype=dtypes,
parse_dates={"datetime":["date", "time"]}, infer_datetime_format=True, dayfirst=True,
error_bad_lines=False, warn_bad_lines=True, #low_memory="All_Sites_Times_All_Points" in filename
)
except ValueError:
raise InvalidVariableError("{} not available in {}".format(variables, filename))

# Empty file
if rawd.shape[0] == 0:
return {"datetime":[], "latitude":[], "longitude":[], "altitude":[]}

# Convert pandas Timestamps into CIS standard numbers
rawd["datetime"] = [cis_standard_time_unit.date2num(timestamp.to_pydatetime())
for timestamp in to_datetime(rawd["datetime"], format='%d:%m:%Y %H:%M:%S')]

# Add position metadata that isn't listed in every line for some formats
if version.startswith("MAN"):
rawd["altitude"] = 0.

elif version.endswith("2"):
metadata = get_file_metadata(filename)
rawd["longitude"] = float(metadata.misc[2][1].split("=")[1])
rawd["latitude"] = float(metadata.misc[2][2].split("=")[1])
rawd["altitude"] = float(metadata.misc[2][3].split("=")[1])

return {var : masked_invalid(arr) for var, arr in rawd.items()}


def get_file_metadata(filename, variable='', shape=None):
Expand All @@ -108,9 +201,10 @@ def get_file_metadata(filename, variable='', shape=None):

if variable is None:
variable = ''
version = get_aeronet_version(filename)
metadata = Metadata(name=variable, long_name=variable, shape=shape)
lines = []
for i in range(0, 4):
for i in range(0, AERONET_HEADER_LENGTH[version]-1):
lines.append(file.readline().replace("\n", "").split(","))
metadata.misc = lines
return metadata
4 changes: 2 additions & 2 deletions cis/data_io/data_reader.py
Expand Up @@ -144,7 +144,7 @@ def read_datagroups(self, datagroups):
{'filenames': ['filename1.nc', 'filename2.nc'],
'variables': ['variable1', 'variable2'],
'product' : 'Aerosol_CCI'}
'product' : 'Aerosol_CCI_L2'}
:return list: A list of CommonData objects (either GriddedData or UngriddedData, *or a combination*)
"""
Expand All @@ -170,7 +170,7 @@ def read_single_datagroup(self, datagroup):
{'filenames': ['filename1.nc', 'filename2.nc'],
'variables': ['variable1', 'variable2'],
'product' : 'Aerosol_CCI'}
'product' : 'Aerosol_CCI_L2'}
:return CommonDataList: Either a GriddedDataLise or an UngriddedDataList
"""
Expand Down
6 changes: 0 additions & 6 deletions cis/data_io/gridded_data.py
Expand Up @@ -18,8 +18,6 @@ def load_cube(*args, **kwargs):
:raises ValueError: If 0 or more than one cube is found
"""
from iris.exceptions import MergeError, ConcatenateError
# Removes warnings and prepares for future Iris change
iris.FUTURE.netcdf_promote = True

cubes = iris.load(*args, **kwargs)

Expand Down Expand Up @@ -293,8 +291,6 @@ def save_data(self, output_file):
# If we have a time coordinate then use that as the unlimited dimension, otherwise don't have any
if self.coords('time'):
save_args['unlimited_dimensions'] = ['time']
else:
iris.FUTURE.netcdf_no_unlimited = True
iris.save(self, output_file, **save_args)

def as_data_frame(self, copy=True):
Expand Down Expand Up @@ -442,8 +438,6 @@ def save_data(self, output_file):
# If we have a time coordinate then use that as the unlimited dimension, otherwise don't have any
if self.coords('time'):
save_args['unlimited_dimensions'] = ['time']
else:
iris.FUTURE.netcdf_no_unlimited = True

iris.save(self, output_file, **save_args)

Expand Down
3 changes: 1 addition & 2 deletions cis/data_io/hdf.py
Expand Up @@ -76,8 +76,7 @@ def _read_hdf4(filename, variables):

vds_dict = hdf_vd.read(filename, vdvariables)
except HDF4Error as e:
joined_up_message = "".join(e)
raise IOError(joined_up_message)
raise IOError(str(e))

for variable in variables:
if variable not in sds_dict and variable not in vds_dict:
Expand Down
16 changes: 14 additions & 2 deletions cis/data_io/hdf_sd.py
Expand Up @@ -15,7 +15,7 @@ def get_hdf_SD_file_variables(filename):
Get all the variables from an HDF SD file
:param str filename: The filename of the file to get the variables from
:returns: An OrderedDict containing the variables from the file
:return: An OrderedDict containing the variables from the file
"""
if not SD:
raise ImportError("HDF support was not installed, please reinstall with pyhdf to read HDF files.")
Expand Down Expand Up @@ -105,10 +105,22 @@ def info(self):
finally:
self._close_sds()

def dimensions(self):
"""
Call pyhdf.SD.SDS.dimensions(), opening and closing the file
"""
from collections import OrderedDict
try:
self._open_sds()
var_description = self._sd.datasets()[self._variable]
return OrderedDict(zip(var_description[0], var_description[1]))
finally:
self._close_sds()


def read(filename, variables=None, datadict=None):
"""
Reads SD from a HDF4 file into a dictionary.
Reads SD from a HDF4 file into a dictionary.
:param str filename: The name (with path) of the HDF file to read.
:param iterable names: A sequence of variable (dataset) names to read from the
Expand Down
2 changes: 1 addition & 1 deletion cis/data_io/hdf_vd.py
Expand Up @@ -23,7 +23,7 @@ def get_hdf_VD_file_variables(filename):
Get all the variables from an HDF VD file
:param filename: The filename of the file to get the variables from
:returns: An OrderedDict containing the variables from the file
:return: An OrderedDict containing the variables from the file
"""
variables = None
if not HDF:
Expand Down

0 comments on commit a3d344e

Please sign in to comment.