Skip to content

Commit

Permalink
Merge pull request #206 from dstansby/mypypypypy
Browse files Browse the repository at this point in the history
More typing
  • Loading branch information
dstansby committed Jun 1, 2023
2 parents 1b528a1 + f362406 commit c36e861
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 110 deletions.
116 changes: 54 additions & 62 deletions cdflib/cdf_to_xarray.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import re
from typing import Any, Dict, List, Tuple, Union
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union, cast

import numpy as np
import numpy.typing as npt
import xarray as xr

from cdflib import CDF
from cdflib.dataclasses import VDRInfo
from cdflib.dataclasses import AttData, VDRInfo
from cdflib.epochs import CDFepoch as cdfepoch

ISTP_TO_XARRAY_ATTRS = {"FIELDNAM": "standard_name", "LABLAXIS": "long_name", "UNITS": "units"}


def _find_xarray_plotting_values(var_att_dict) -> Dict[str, str]:
def _find_xarray_plotting_values(var_att_dict: Dict[str, str]) -> Dict[str, str]:
"""
This is a simple function that looks through a variable attribute dictionary for ISTP attributes that are similar
to ones used natively by Xarray, specifically their plotting routines. If some are found, this returns a dictionary
Expand All @@ -30,7 +31,7 @@ def _find_xarray_plotting_values(var_att_dict) -> Dict[str, str]:


def _convert_cdf_time_types(
data: npt.ArrayLike, atts, properties: VDRInfo, to_datetime: bool = False, to_unixtime: bool = False
data: npt.ArrayLike, atts: Dict[str, AttData], properties: VDRInfo, to_datetime: bool = False, to_unixtime: bool = False
) -> Tuple[npt.NDArray, Dict[str, Any]]:
"""
# Converts CDF time types into either datetime objects, unixtime, or nothing
Expand Down Expand Up @@ -66,7 +67,7 @@ def _convert_cdf_time_types(
new_atts = {}
for att in atts:
data_type = atts[att].Data_Type
data = np.atleast_1d(atts[att]["Data"])
data = atts[att].Data
if len(data) == 0 or data_type not in ("CDF_EPOCH", "CDF_EPOCH16", "CDF_TIME_TT2000"):
new_atts[att] = data
else:
Expand All @@ -84,7 +85,7 @@ def _convert_cdf_time_types(


def _convert_cdf_to_dicts(
filename, to_datetime: bool = False, to_unixtime: bool = False
filename: Union[str, Path], to_datetime: bool = False, to_unixtime: bool = False
) -> Tuple[Dict[str, List[Union[str, np.ndarray]]], Dict[str, Any], Dict[str, npt.NDArray], Dict[str, VDRInfo]]:
# Open the CDF file
# Converts the entire CDF file into python dictionary objects
Expand All @@ -107,7 +108,7 @@ def _convert_cdf_to_dicts(
for var_name in all_cdf_variables:
var_attribute_list = cdf_file.varattsget(var_name)
var_data_temp: Dict[str, Union[str, np.ndarray]] = {}
var_atts_temp = {}
var_atts_temp: Dict[str, AttData] = {}
for att in var_attribute_list:
var_atts_temp[att] = cdf_file.attget(att, var_name)
variable_properties[var_name] = cdf_file.varinq(var_name)
Expand All @@ -125,7 +126,11 @@ def _convert_cdf_to_dicts(


def _verify_depend_dimensions(
dataset, dimension_number, primary_variable_name, coordinate_variable_name, primary_variable_properties: VDRInfo
dataset: Dict[str, npt.NDArray],
dimension_number: int,
primary_variable_name: str,
coordinate_variable_name: str,
primary_variable_properties: VDRInfo,
) -> bool:
primary_data = np.array(dataset[primary_variable_name])
coordinate_data = np.array(dataset[coordinate_variable_name])
Expand Down Expand Up @@ -187,7 +192,7 @@ def _verify_depend_dimensions(
return True


def _discover_depend_variables(vardata, varatts, varprops) -> List[str]:
def _discover_depend_variables(vardata: Dict[str, npt.NDArray], varatts: Dict[str, Any], varprops: Dict[str, VDRInfo]) -> List[str]:
# This loops through the variable attributes to discover which variables are the coordinates of other variables,
# Unfortunately, there is no easy way to tell this by looking at the variable ITSELF,
# you need to look at all variables and see if one points to it.
Expand All @@ -206,7 +211,7 @@ def _discover_depend_variables(vardata, varatts, varprops) -> List[str]:
return list(set(list_of_depend_vars))


def _discover_uncertainty_variables(varatts) -> Dict[str, str]:
def _discover_uncertainty_variables(varatts: Dict[str, Any]) -> Dict[str, str]:
# This loops through the variable attributes to discover which variables are the labels of other variables
# Unfortunately, there is no easy way to tell this by looking at the label variable itself
# This returns a KEY:VALUE pair, with the LABEL VARIABLE corresponding to which dimension it covers.
Expand All @@ -221,7 +226,9 @@ def _discover_uncertainty_variables(varatts) -> Dict[str, str]:
return list_of_label_vars


def _discover_label_variables(varatts, all_variable_properties: Dict[str, VDRInfo], all_variable_data) -> Dict[str, str]:
def _discover_label_variables(
varatts: Dict[str, Any], all_variable_properties: Dict[str, VDRInfo], all_variable_data: Dict[str, npt.NDArray]
) -> Dict[str, str]:
# This loops through the variable attributes to discover which variables are the labels of other variables
# Unfortunately, there is no easy way to tell this by looking at the label variable itself
# This returns a KEY:VALUE pair, with the LABEL VARIABLE corresponding to which dimension it covers.
Expand Down Expand Up @@ -260,7 +267,7 @@ def _discover_label_variables(varatts, all_variable_properties: Dict[str, VDRInf
return list_of_label_vars


def _convert_fillvals_to_nan(var_data, var_atts, var_properties: VDRInfo) -> npt.NDArray:
def _convert_fillvals_to_nan(var_data: npt.NDArray, var_atts: Dict[str, Any], var_properties: VDRInfo) -> npt.NDArray:
if var_atts is None:
return var_data
if var_data is None:
Expand All @@ -287,25 +294,17 @@ def _convert_fillvals_to_nan(var_data, var_atts, var_properties: VDRInfo) -> npt


def _determine_record_dimensions(
var_name,
var_atts,
var_data,
var_name: str,
var_atts: Dict[str, Any],
var_data: npt.NDArray,
var_props: VDRInfo,
depend_variables,
all_variable_data,
all_variable_properties,
created_unlimited_dims,
depend_variables: List[str],
all_variable_data: Dict[str, npt.NDArray],
all_variable_properties: Dict[str, VDRInfo],
created_unlimited_dims: Dict[str, int],
) -> Tuple[str, bool, bool]:
"""
Determines the name of the
:param var_name:
:param var_atts:
:param var_data:
:param var_props:
:param depend_variables:
:param all_variable_data:
:param all_variable_properties:
:param created_unlimited_dims:
"""

if var_props.Rec_Vary and var_props.Last_Rec > 0:
Expand Down Expand Up @@ -387,35 +386,22 @@ def _determine_record_dimensions(


def _determine_dimension_names(
var_name,
var_atts,
var_data,
var_name: str,
var_atts: Dict[str, Any],
var_data: npt.NDArray,
var_props: VDRInfo,
depend_variables,
all_variable_data,
all_variable_properties,
created_regular_dims,
record_name_found,
depend_variables: List[str],
all_variable_data: Dict[str, npt.NDArray],
all_variable_properties: Dict[str, VDRInfo],
created_regular_dims: Dict[str, int],
record_name_found: str,
) -> List[Tuple[str, int, bool, bool]]:
"""
:param var_name:
:param var_atts:
:param var_props:
:param depend_variables:
:param all_variable_data:
:param all_variable_properties:
:param created_regular_dims:
:return:
"""

return_list = []

if len(var_props.Dim_Sizes) != 0 and var_props.Last_Rec >= 0:
i = 0
skip_first_dim = record_name_found
for dim_size in var_data.shape:
if skip_first_dim:
skip_first_dim = False
if record_name_found:
continue

i += 1
Expand Down Expand Up @@ -526,7 +512,9 @@ def _determine_dimension_names(
return return_list


def _reformat_variable_dims_and_data(var_dims, var_data) -> Tuple[str, npt.NDArray]:
def _reformat_variable_dims_and_data(
var_dims: List[str], var_data: Optional[np.ndarray]
) -> Tuple[Union[str, List[str]], npt.NDArray]:
if len(var_dims) > 0 and var_data is None:
var_data = np.array([])

Expand All @@ -539,14 +527,19 @@ def _reformat_variable_dims_and_data(var_dims, var_data) -> Tuple[str, npt.NDArr

# Check if both dimensions and data are empty, if so, set the dimension to the empty dimension
if var_data.size == 0 and not len(var_dims):
var_dims = "dim_empty"
var_dims_: Union[str, List[str]] = "dim_empty"
else:
var_dims_ = var_dims

return var_dims, var_data
return var_dims_, var_data


def _generate_xarray_data_variables(
all_variable_data, all_variable_attributes, all_variable_properties, fillval_to_nan
) -> Tuple[Dict[str, "xr.Variable"], Dict[str, int]]:
all_variable_data: Dict[str, npt.NDArray],
all_variable_attributes: Dict[str, Any],
all_variable_properties: Dict[str, VDRInfo],
fillval_to_nan: bool,
) -> Tuple[Dict[str, xr.Variable], Dict[str, int]]:
# Import here to avoid xarray as a dependency of all of cdflib
import xarray as xr

Expand All @@ -563,7 +556,7 @@ def _generate_xarray_data_variables(
created_vars: Dict[str, xr.Variable] = {}

for var_name in all_variable_data:
var_dims = []
var_dims: List[str] = []
var_atts = all_variable_attributes[var_name]
var_data = np.array(all_variable_data[var_name])
var_props = all_variable_properties[var_name]
Expand Down Expand Up @@ -629,9 +622,10 @@ def _generate_xarray_data_variables(
return created_vars, depend_dimensions


def _verify_dimension_sizes(created_data_vars, created_coord_vars) -> None:
def _verify_dimension_sizes(created_data_vars: Dict[str, xr.Variable], created_coord_vars: Dict[str, xr.Variable]) -> None:
for var in created_data_vars:
for d in created_data_vars[var].dims:
d = cast(str, d)
if d in created_data_vars:
if created_data_vars[d].dims != (d,):
raise Exception(
Expand All @@ -652,6 +646,7 @@ def _verify_dimension_sizes(created_data_vars, created_coord_vars) -> None:
)
for var in created_coord_vars:
for d in created_coord_vars[var].dims:
d = cast(str, d)
if d in created_data_vars:
if created_data_vars[d].dims != (d,):
raise Exception(
Expand All @@ -672,7 +667,7 @@ def _verify_dimension_sizes(created_data_vars, created_coord_vars) -> None:
)


def cdf_to_xarray(filename, to_datetime=False, to_unixtime=False, fillval_to_nan=False) -> xr.Dataset:
def cdf_to_xarray(filename: str, to_datetime: bool = False, to_unixtime: bool = False, fillval_to_nan: bool = False) -> xr.Dataset:
"""
This function converts CDF files into XArray Dataset Objects.
Expand Down Expand Up @@ -750,9 +745,6 @@ def cdf_to_xarray(filename, to_datetime=False, to_unixtime=False, fillval_to_nan
3. Gather all global scope attributes in the CDF file
4. Create an XArray Dataset objects with the data variables, coordinate variables, and global attributes.
"""
# Import here to avoid xarray as a dependency of all of cdflib
import xarray as xr

# Convert the CDF file into a series of dicts, so we don't need to keep reading the file
global_attributes, all_variable_attributes, all_variable_data, all_variable_properties = _convert_cdf_to_dicts(
filename, to_datetime=to_datetime, to_unixtime=to_unixtime
Expand All @@ -768,8 +760,8 @@ def cdf_to_xarray(filename, to_datetime=False, to_unixtime=False, fillval_to_nan
# Determine which dimensions are coordinates vs actual data
# Variables are considered coordinates if one of the other dimensions depends on them.
# Otherwise, they are considered data coordinates.
created_coord_vars = {}
created_data_vars = {}
created_coord_vars: Dict[str, xr.Variable] = {}
created_data_vars: Dict[str, xr.Variable] = {}
for var_name in created_vars:
if var_name in label_variables:
# If these are label variables, we'll deal with these later when the DEPEND variables come up
Expand Down

0 comments on commit c36e861

Please sign in to comment.