# Aggregate
---
Performs a Rolling Aggregate on the input data according to specifications.

Given:
- `df_artifact`: Input DataFrame
- `keys`: Labels to work by 
- `metrics`: List of metrics to perform the aggregate task on 
- `labels`: List of labels to perform the label aggregate on 
- `metric_aggs`: Array of aggregations to run on the metrics  
  **ex:** `['mean', 'sum']`
- `label_aggs`: Array of aggregations to run on the labels  
  **ex:** `['max']`
- `suffix`: A suffix to add to the aggregations (**ex:** '_daily')
- `window`: Window size, can be `int` for number of rows, or in time interval string if timestamp index is available 
- `center`: Perform aggregation on the sample at the window's center
- `inplace`: 
 - **True:** Returns only the newly aggregated data 
 - **False:** Returns the newly aggregated data with the original dataset

## Examples
> This test uses the metrics data, created by the [Generator function](https://github.com/mlrun/demo-network-operations/blob/master/notebooks/generator.ipynb) from MLRun's [Network Operations Demo](https://github.com/mlrun/demo-network-operations)  
To test it yourself, please generate this dataset or use any of your available csv/parquet datasets.

In [1]:
from mlrun import code_to_function, mount_v3io, NewTask, mlconf, run_local
mlconf.dbpath = mlconf.dbpath or 'http://mlrun-api:8080'

In [2]:
metrics_path = '/User/demo-network-operations/data/metrics.pq'

### Local Example
Define the aggregate example task

In [3]:
from aggregate import aggregate

aggregate_task = NewTask(name='aggregate',
                         project='network-operations',
                         params={'metrics': ['cpu_utilization'],
                                 'labels': ['is_error'],
                                 'metric_aggs': ['mean', 'sum'],
                                 'label_aggs': ['max'],
                                 'suffix': 'daily',
                                 'inplace': False,
                                 'window': 5,
                                 'center': True,
                                 'save_to': 'aggregate.pq',
                                 'files_to_select': 2},
                         inputs={'df_artifact': metrics_path},
                         handler=aggregate)

In [4]:
aggregate_run = run_local(aggregate_task)

> 2021-02-16 13:40:05,369 [info] starting run aggregate uid=1ff88863c7464498a21c2507108c8419 DB=http://mlrun-api:8080
> 2021-02-16 13:40:05,453 [info] Aggregating /User/demo-network-operations/data/metrics.pq
> 2021-02-16 13:40:05,515 [error] Traceback (most recent call last):
  File "/User/.pythonlibs/jupyter-eyals/lib/python3.7/site-packages/mlrun/runtimes/local.py", line 217, in exec_from_params
    val = handler(*args_list)
  File "/User/functions/aggregate/aggregate.py", line 73, in aggregate
    input_df = df_artifact.as_df()
  File "/User/.pythonlibs/jupyter-eyals/lib/python3.7/site-packages/mlrun/datastore/base.py", line 242, in as_df
    self._path, columns=columns, df_module=df_module, format=format, **kwargs
  File "/User/.pythonlibs/jupyter-eyals/lib/python3.7/site-packages/mlrun/datastore/base.py", line 124, in as_df
    return reader(self._join(key), **kwargs)
  File "/conda/lib/python3.7/site-packages/pandas/io/parquet.py", line 317, in read_parquet
    return impl.read(

/User/demo-network-operations/data/metrics.pq


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
network-operations,...108c8419,0,Feb 16 13:40:05,error,aggregate,v3io_user=eyalskind=handlerowner=eyalshost=jupyter-eyals-666bf556fc-5v7bf,df_artifact,"metrics=['cpu_utilization']labels=['is_error']metric_aggs=['mean', 'sum']label_aggs=['max']suffix=dailyinplace=Falsewindow=5center=Truesave_to=aggregate.pqfiles_to_select=2",,


to track results use .show() or .logs() or in CLI: 
!mlrun get run 1ff88863c7464498a21c2507108c8419 --project network-operations , !mlrun logs 1ff88863c7464498a21c2507108c8419 --project network-operations
> 2021-02-16 13:40:05,706 [info] run executed, status=error


RunError: /User/demo-network-operations/data/metrics.pq

### Example on cluster

Convert the code to an MLRun function

In [None]:
import yaml

with open("item.yaml") as item_file:
    items = yaml.load(item_file, Loader=yaml.FullLoader)
fn = mlrun.code_to_function(
    items["name"],
    handler=items["spec"]["handler"],
    filename=items["spec"]["filename"],
    description=items["description"],
    categories=items["categories"],
    labels=items["labels"],
)
fn.export('aggregate.yaml')

In [None]:
import os
aggregate_run = fn.apply(mount_v3io()).run(aggregate_task, artifact_path=os.path.abspath('./'))

### Show results

In [None]:
pd.read_parquet(aggregate_run.artifact('aggregate')['target_path'])