Skip to content

Commit

Permalink
feat(advanced analytics): support groupby in resample (#18045)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoyongjie committed Jan 17, 2022
1 parent 8dea7f5 commit 0c7f728
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
* specific language governing permissions and limitationsxw
* under the License.
*/
import { PostProcessingResample } from '@superset-ui/core';
import {
ensureIsArray,
isPhysicalColumn,
PostProcessingResample,
} from '@superset-ui/core';
import { PostProcessingFactory } from './types';
import { TIME_COLUMN } from './utils';

Expand All @@ -28,13 +32,21 @@ export const resampleOperator: PostProcessingFactory<
const resampleMethod = resampleZeroFill ? 'asfreq' : formData.resample_method;
const resampleRule = formData.resample_rule;
if (resampleMethod && resampleRule) {
const groupby_columns = ensureIsArray(queryObject.columns).map(column => {
if (isPhysicalColumn(column)) {
return column;
}
return column.label;
});

return {
operation: 'resample',
options: {
method: resampleMethod,
rule: resampleRule,
fill_value: resampleZeroFill ? 0 : null,
time_column: TIME_COLUMN,
groupby_columns,
},
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
import { QueryObject, SqlaFormData } from '@superset-ui/core';
import { resampleOperator } from '../../../src';
import { AdhocColumn, QueryObject, SqlaFormData } from '@superset-ui/core';
import { resampleOperator } from '@superset-ui/chart-controls';

const formData: SqlaFormData = {
metrics: [
Expand Down Expand Up @@ -75,6 +75,7 @@ test('should do resample', () => {
rule: '1D',
fill_value: null,
time_column: '__timestamp',
groupby_columns: [],
},
});
});
Expand All @@ -92,6 +93,80 @@ test('should do zerofill resample', () => {
rule: '1D',
fill_value: 0,
time_column: '__timestamp',
groupby_columns: [],
},
});
});

test('should append physical column to resample', () => {
expect(
resampleOperator(
{ ...formData, resample_method: 'zerofill', resample_rule: '1D' },
{ ...queryObject, columns: ['column1', 'column2'] },
),
).toEqual({
operation: 'resample',
options: {
method: 'asfreq',
rule: '1D',
fill_value: 0,
time_column: '__timestamp',
groupby_columns: ['column1', 'column2'],
},
});
});

test('should append label of adhoc column and physical column to resample', () => {
expect(
resampleOperator(
{ ...formData, resample_method: 'zerofill', resample_rule: '1D' },
{
...queryObject,
columns: [
{
hasCustomLabel: true,
label: 'concat_a_b',
expressionType: 'SQL',
sqlExpression: "'a' + 'b'",
} as AdhocColumn,
'column2',
],
},
),
).toEqual({
operation: 'resample',
options: {
method: 'asfreq',
rule: '1D',
fill_value: 0,
time_column: '__timestamp',
groupby_columns: ['concat_a_b', 'column2'],
},
});
});

test('should append `undefined` if adhoc non-existing label', () => {
expect(
resampleOperator(
{ ...formData, resample_method: 'zerofill', resample_rule: '1D' },
{
...queryObject,
columns: [
{
sqlExpression: "'a' + 'b'",
} as AdhocColumn,
'column2',
],
},
),
).toEqual({
operation: 'resample',
options: {
method: 'asfreq',
rule: '1D',
fill_value: 0,
time_column: '__timestamp',
groupby_columns: [undefined, 'column2'],
},
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ export interface PostProcessingResample {
rule: string;
fill_value?: number | null;
time_column: string;
// If AdhocColumn doesn't have a label, it will be undefined.
// todo: we have to give an explicit label for AdhocColumn.
groupby_columns?: Array<string | undefined>;
};
}

Expand Down
26 changes: 20 additions & 6 deletions superset/utils/pandas_postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -958,27 +958,41 @@ def outliers(series: Series) -> Set[float]:
return aggregate(df, groupby=groupby, aggregates=aggregates)


def resample(
@validate_column_args("groupby_columns")
def resample( # pylint: disable=too-many-arguments
df: DataFrame,
rule: str,
method: str,
time_column: str,
groupby_columns: Optional[Tuple[Optional[str], ...]] = None,
fill_value: Optional[Union[float, int]] = None,
) -> DataFrame:
"""
resample a timeseries dataframe.
support upsampling in resample
:param df: DataFrame to resample.
:param rule: The offset string representing target conversion.
:param method: How to fill the NaN value after resample.
:param time_column: existing columns in DataFrame.
:param groupby_columns: columns except time_column in dataframe
:param fill_value: What values do fill missing.
:return: DataFrame after resample
:raises QueryObjectValidationError: If the request in incorrect
"""
df = df.set_index(time_column)
if method == "asfreq" and fill_value is not None:
df = df.resample(rule).asfreq(fill_value=fill_value)

def _upsampling(_df: DataFrame) -> DataFrame:
_df = _df.set_index(time_column)
if method == "asfreq" and fill_value is not None:
return _df.resample(rule).asfreq(fill_value=fill_value)
return getattr(_df.resample(rule), method)()

if groupby_columns:
df = (
df.set_index(keys=list(groupby_columns))
.groupby(by=list(groupby_columns))
.apply(_upsampling)
)
df = df.reset_index().set_index(time_column).sort_index()
else:
df = getattr(df.resample(rule), method)()
df = _upsampling(df)
return df.reset_index()
67 changes: 67 additions & 0 deletions tests/integration_tests/pandas_postprocessing_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1029,3 +1029,70 @@ def test_resample(self):
)
self.assertListEqual(post_df["label"].tolist(), ["x", "y", 0, 0, "z", 0, "q"])
self.assertListEqual(post_df["y"].tolist(), [1.0, 2.0, 0, 0, 3.0, 0, 4.0])

def test_resample_with_groupby(self):
"""
The Dataframe contains a timestamp column, a string column and a numeric column.
__timestamp city val
0 2022-01-13 Chicago 6.0
1 2022-01-13 LA 5.0
2 2022-01-13 NY 4.0
3 2022-01-11 Chicago 3.0
4 2022-01-11 LA 2.0
5 2022-01-11 NY 1.0
"""
df = DataFrame(
{
"__timestamp": to_datetime(
[
"2022-01-13",
"2022-01-13",
"2022-01-13",
"2022-01-11",
"2022-01-11",
"2022-01-11",
]
),
"city": ["Chicago", "LA", "NY", "Chicago", "LA", "NY"],
"val": [6.0, 5.0, 4.0, 3.0, 2.0, 1.0],
}
)
post_df = proc.resample(
df=df,
rule="1D",
method="asfreq",
fill_value=0,
time_column="__timestamp",
groupby_columns=("city",),
)
assert list(post_df.columns) == [
"__timestamp",
"city",
"val",
]
assert [str(dt.date()) for dt in post_df["__timestamp"]] == (
["2022-01-11"] * 3 + ["2022-01-12"] * 3 + ["2022-01-13"] * 3
)
assert list(post_df["val"]) == [3.0, 2.0, 1.0, 0, 0, 0, 6.0, 5.0, 4.0]

# should raise error when get a non-existent column
with pytest.raises(QueryObjectValidationError):
proc.resample(
df=df,
rule="1D",
method="asfreq",
fill_value=0,
time_column="__timestamp",
groupby_columns=("city", "unkonw_column",),
)

# should raise error when get a None value in groupby list
with pytest.raises(QueryObjectValidationError):
proc.resample(
df=df,
rule="1D",
method="asfreq",
fill_value=0,
time_column="__timestamp",
groupby_columns=("city", None,),
)

0 comments on commit 0c7f728

Please sign in to comment.