Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
Merge pull request #745 from CCI-Tools/707_dzelge_data_frame_aggregate
Browse files Browse the repository at this point in the history
707 dzelge data frame aggregate
  • Loading branch information
forman committed Sep 14, 2018
2 parents 20abda6 + 8114baf commit 56eef6f
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 4 deletions.
6 changes: 3 additions & 3 deletions cate/core/workspace.py
Expand Up @@ -368,21 +368,21 @@ def _update_resource_json_from_feature_collection(cls, resource_json, features:
if scalar_value is not UNDEFINED:
variable_descriptors[0]['value'] = scalar_value

geometry = features.schema.get('geometry')
geometry_type = features.schema.get('geometry')
crs = str(features.crs)
crs_wkt = str(features.crs_wkt)
driver = features.driver

attributes = {
'driver': driver,
'geometry': geometry,
'geometryType': geometry_type,
'crs': crs,
'crsWkt': crs_wkt,
'numFeatures': num_features,
}

resource_json.update(variables=variable_descriptors,
geometry=geometry,
geometry=geometry_type,
numFeatures=num_features,
attributes=attributes)

Expand Down
93 changes: 93 additions & 0 deletions cate/ops/data_frame.py
Expand Up @@ -36,6 +36,7 @@
import numpy as np
import pandas as pd
import shapely.geometry
from shapely.geometry import MultiPolygon

from cate.core.op import op, op_input
from cate.core.types import VarName, DataFrameLike, GeometryLike, ValidationError, VarNamesLike, PolygonLike
Expand Down Expand Up @@ -267,6 +268,98 @@ def data_frame_find_closest(gdf: gpd.GeoDataFrame,
return new_gdf


@op(tags=['arithmetic'], version='1.0')
@op_input('df', data_type=DataFrameLike)
@op_input('var_names', data_type=VarNamesLike)
@op_input('aggregate_geometry', data_type=bool)
def data_frame_aggregate(df: DataFrameLike.TYPE,
var_names: VarNamesLike.TYPE = None,
aggregate_geometry: bool = False,
monitor: Monitor = Monitor.NONE) -> pd.DataFrame:
"""
Aggregate columns into count, mean, median, sum, std, min, and max. Return a
new (Geo)DataFrame with a single row containing all aggregated values. Specify whether the geometries of
the GeoDataFrame are to be aggregated. All geometries are merged union-like.
The return data type will always be the same as the input data type.
:param df: The (Geo)DataFrame to be analysed
:param var_names: Variables to be aggregated ('None' uses all aggregatable columns)
:param aggregate_geometry: Aggregate (union like) the geometry and add it to the resulting GeoDataFrame
:param monitor: Monitor for progress bar
:return: returns either DataFrame or GeoDataFrame. Keeps input data type
"""
vns = VarNamesLike.convert(var_names)

df_is_geo = isinstance(df, gpd.GeoDataFrame)
aggregations = ["count", "mean", "median", "sum", "std", "min", "max"]

# Check var names integrity (aggregatable, exists in data frame)
types_accepted_for_agg = ['float64', 'int64', 'bool']
agg_columns = list(df.select_dtypes(include=types_accepted_for_agg).columns)

if df_is_geo:
agg_columns.append('geometry')

columns = list(df.columns)

if vns is None:
vns = agg_columns

diff = list(set(vns) - set(columns))
if len(diff) > 0:
raise ValidationError('Variable ' + ','.join(diff) + ' not in data frame!')

diff = list(set(vns) - set(agg_columns))
if len(diff) > 0:
raise ValidationError('Variable(s) ' + ','.join(diff) + ' not aggregatable!')

try:
df['geometry']
except KeyError as e:
raise ValidationError('Variable geometry not in GEO data frame!') from e

# Aggregate columns
if vns is None:
df_buff = df.select_dtypes(include=types_accepted_for_agg).agg(aggregations)
else:
df_buff = df[vns].select_dtypes(include=types_accepted_for_agg).agg(aggregations)

res = {}
for n in df_buff.columns:
for a in aggregations:
val = df_buff[n][a]
h = n + '_' + a
res[h] = [val]

df_agg = pd.DataFrame(res)

# Aggregate (union) geometry if GeoDataFrame
if df_is_geo and aggregate_geometry:
total_work = 100
num_work_rows = 1 + len(df) // total_work
with monitor.starting('Aggregating geometry: ', total_work):
multi_polygon = MultiPolygon()
i = 0
for rec in df.geometry:
if monitor.is_cancelled():
break
# noinspection PyBroadException
try:
multi_polygon = multi_polygon.union(other=rec)
except Exception:
pass

if i % num_work_rows == 0:
monitor.progress(work=1)
i += 1

df_agg = gpd.GeoDataFrame(df_agg, geometry=[multi_polygon])
df_agg.crs = df.crs

return df_agg


def great_circle_distance(p1: shapely.geometry.Point, p2: shapely.geometry.Point) -> float:
"""
Compute great-circle distance on a Sphere in degrees.
Expand Down
54 changes: 53 additions & 1 deletion test/ops/test_data_frame.py
Expand Up @@ -7,8 +7,9 @@
import shapely.wkt
from shapely.geometry import Point

from cate.core.types import ValidationError
from cate.ops.data_frame import data_frame_min, data_frame_max, data_frame_query, data_frame_find_closest, \
great_circle_distance, data_frame_subset
great_circle_distance, data_frame_aggregate, data_frame_subset


class TestDataFrameOps(TestCase):
Expand Down Expand Up @@ -199,6 +200,57 @@ def test_data_frame_find_closest(self):
self.assertEqual(shapely.wkt.loads('POINT(20 30)'), df2['geometry'].iloc[0])
self.assertEqual(shapely.wkt.loads('POINT(20 20)'), df2['geometry'].iloc[1])

def test_data_frame_aggregate(self):
# Generate mock data
data = {'name': ['A', 'B', 'C'],
'lat': [45, 46, 47.5],
'lon': [-120, -121.2, -122.9]}

df = pd.DataFrame(data)
# needs to be a copy
gdf_empty_geo = gpd.GeoDataFrame(df).copy()
gdf = gpd.GeoDataFrame(df, geometry=[Point(xy) for xy in zip(df['lon'], df['lat'])])

var_names_not_agg = 'name, lat, lon'
var_names_not_in = 'asdc, lat, lon'
var_names_valid = ['lat', 'lon']
aggregations = ["count", "mean", "median", "sum", "std", "min", "max"]

# Assert that a Validation exception is thrown if the df is None
with self.assertRaises(ValidationError):
data_frame_aggregate(df=None)

# Assert that a Validation exception is thrown if the var_names contain non-existing fields in the df
with self.assertRaises(ValidationError):
data_frame_aggregate(df=df, var_names=var_names_not_in)

# Assert that a Validation exception is thrown if the var_names contain non-aggregatable fields
with self.assertRaises(ValidationError):
data_frame_aggregate(df=df, var_names=var_names_not_agg)

# Assert that a Validation exception is thrown if the GeoDataFrame does not have a geometry
with self.assertRaises(ValidationError):
data_frame_aggregate(df=gdf_empty_geo, var_names=None)

with self.assertRaises(ValidationError):
data_frame_aggregate(df=gdf_empty_geo, var_names='lat')

# assert that a input and output types for df are the same
rdf = data_frame_aggregate(df=gdf, var_names=var_names_valid)
self.assertEqual(len(rdf), 1)

# assert that columns are return if var_names = None for a DataFrame
rdf = data_frame_aggregate(df=df, var_names=None)
self.assertEqual(len(rdf.columns), len(aggregations) * len(var_names_valid))

# assert that columns are return if var_names = None for a GeoDataFrame
rdf = data_frame_aggregate(df=gdf, var_names=None, aggregate_geometry=True)
self.assertEqual(len(rdf.columns), len(aggregations) * len(var_names_valid) + 1)

# assert that geometry union is created
rdf = data_frame_aggregate(df=gdf, var_names=var_names_valid, aggregate_geometry=True)
self.assertIsNotNone(rdf.geometry)


class GreatCircleDistanceTest(TestCase):
def test_great_circle_distance(self):
Expand Down

0 comments on commit 56eef6f

Please sign in to comment.