## Aggregate Function Example
> Time-series aggregation function   
  performs a rolling aggregation on {df_artifact}, over {window} by the selected {keys}
  applying {metric_aggs} on {metrics} and {label_aggs} on {labels}. adding {suffix} to the 
  feature names.
    
    

In [1]:
# Make sure mlrun is aligned !
# !/User/align_mlrun.sh
# !pip install sklearn

In [2]:
import pandas as pd
import mlrun
from sklearn.datasets import load_iris
mlrun.set_environment(project='function-marketplace')

> 2021-10-05 10:40:16,310 [info] loaded project function-marketplace from MLRun DB


('function-marketplace', 'v3io:///projects/{{run.project}}/artifacts')

### Simple aggregations on sklearn's iris dataset

In [3]:
dataset = load_iris()
df = pd.DataFrame(dataset['data'],columns = dataset['feature_names'])
df['target'] = [dataset['target_names'][i] for i in dataset['target']]
df.to_parquet("iris.parquet")
df.head()

Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),target
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa


In [4]:
# import the function
aggregate_function = mlrun.import_function("hub://aggregate").apply(mlrun.auto_mount())

In [5]:
import os
os.path.abspath('iris.parquet')

'/User/test/functions/aggregate/iris.parquet'

In [6]:
aggregate_run = aggregate_function.run(params={'metrics': dataset['feature_names'][0],
               'metric_aggs': ['mean', 'sum'],
               'suffix': 'aggregated',
               'inplace': False,
               'window': 2}           
           , inputs={'df_artifact': os.path.abspath('iris.parquet')}
           )

> 2021-10-05 10:40:16,613 [info] starting run aggregate-aggregate uid=c531676eb37a498fb6fe8f867b9d9a7d DB=http://mlrun-api:8080
> 2021-10-05 10:40:16,753 [info] Job is running in the background, pod: aggregate-aggregate-9vqmq
> 2021-10-05 10:40:22,212 [info] Aggregating /User/test/functions/aggregate/iris.parquet
> 2021-10-05 10:40:22,243 [info] Logging artifact
> 2021-10-05 10:40:22,397 [info] run executed, status=completed
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
function-marketplace,...7b9d9a7d,0,Oct 05 10:40:22,completed,aggregate-aggregate,v3io_user=danikind=jobowner=danihost=aggregate-aggregate-9vqmq,df_artifact,"metrics=sepal length (cm)metric_aggs=['mean', 'sum']suffix=aggregatedinplace=Falsewindow=2",,aggregate





> 2021-10-05 10:40:22,907 [info] run executed, status=completed


### Show results

In [7]:
aggregate_run.artifact('aggregate').as_df().head().style.set_caption("Aggregated Dataframe !")

Unnamed: 0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),target,m_e_a_n_aggregated,s_u_m_aggregated
1,4.9,3.0,1.4,0.2,setosa,5.0,10.0
2,4.7,3.2,1.3,0.2,setosa,4.8,9.6
3,4.6,3.1,1.5,0.2,setosa,4.65,9.3
4,5.0,3.6,1.4,0.2,setosa,4.8,9.6
5,5.4,3.9,1.7,0.4,setosa,5.2,10.4


### More complex aggregations

In [8]:
# Getting & inspecting original dataset
DATA = "https://s3.wasabisys.com/iguazio/data/market-palce/aggregate/metrics.pq"
original = pd.read_parquet(DATA)
original.head().style.set_caption("Original Dataframe !")

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,cpu_utilization,latency,packet_loss,throughput,is_error
timestamp,company,data_center,device,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2021-04-27 14:46:46.780000,Smith_Group,Denise_Crest,5124209057231,75.598891,0.0,0.0,252.445971,False
2021-04-27 14:46:46.780000,Smith_Group,Denise_Crest,2891755865712,50.090373,3.280849,0.0,229.889187,False
2021-04-27 14:46:46.780000,Smith_Group,Debra_Gateway,388020295311,73.243063,9.372341,2.170138,260.883807,False
2021-04-27 14:46:46.780000,Smith_Group,Debra_Gateway,9633813691441,60.83042,12.241878,2.295717,244.238613,False
2021-04-27 14:46:46.780000,Ferrell_Ltd,Murphy_Meadow,1517129765931,72.647964,0.535463,0.0,212.944943,False


In [9]:
aggregate_run = aggregate_function.run(params={'metrics': ['cpu_utilization'],
               'labels': ['is_error'],
               'metric_aggs': ['mean', 'sum'],
               'label_aggs': ['max'],
               'suffix': 'daily',
               'inplace': False,
               'window': 5,
               'center': True,               
               'files_to_select': 2}           
           , inputs={'df_artifact': DATA}
           )
    

> 2021-10-05 10:40:24,797 [info] starting run aggregate-aggregate uid=a874ee2e49894f98adc092aa8138db53 DB=http://mlrun-api:8080
> 2021-10-05 10:40:24,931 [info] Job is running in the background, pod: aggregate-aggregate-mpf4p
> 2021-10-05 10:40:30,245 [info] Aggregating https://s3.wasabisys.com/iguazio/data/market-palce/aggregate/metrics.pq
> 2021-10-05 10:40:31,547 [info] Logging artifact
> 2021-10-05 10:40:31,708 [info] run executed, status=completed
Converting input from bool to <class 'numpy.uint8'> for compatibility.
final state: completed


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
function-marketplace,...8138db53,0,Oct 05 10:40:30,completed,aggregate-aggregate,v3io_user=danikind=jobowner=danihost=aggregate-aggregate-mpf4p,df_artifact,"metrics=['cpu_utilization']labels=['is_error']metric_aggs=['mean', 'sum']label_aggs=['max']suffix=dailyinplace=Falsewindow=5center=Truefiles_to_select=2",,aggregate





> 2021-10-05 10:40:34,126 [info] run executed, status=completed


Convert the code to an MLRun function

### Show results

In [10]:
aggregate_run.artifact('aggregate').as_df().head().style.set_caption("Aggregated Dataframe !")

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,cpu_utilization,latency,packet_loss,throughput,is_error,cpu_utilization_mean_daily,cpu_utilization_sum_daily,is_error_max_daily
timestamp,company,data_center,device,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2021-04-27 14:46:46.780000,Smith_Group,Debra_Gateway,388020295311,73.243063,9.372341,2.170138,260.883807,False,66.482142,332.410712,0.0
2021-04-27 14:46:46.780000,Smith_Group,Debra_Gateway,9633813691441,60.83042,12.241878,2.295717,244.238613,False,63.92082,319.604098,0.0
2021-04-27 14:46:46.780000,Ferrell_Ltd,Murphy_Meadow,1517129765931,72.647964,0.535463,0.0,212.944943,False,68.4882,342.441002,0.0
2021-04-27 14:46:46.780000,Ferrell_Ltd,Murphy_Meadow,6964486699383,62.792277,1.758166,3.840787,238.125042,False,64.804826,324.024128,0.0
2021-04-27 14:46:46.780000,Ferrell_Ltd,Nicholas_Estate,8002897098167,72.927277,4.996304,1.528991,235.680379,False,71.184971,355.924855,0.0


### Removing generated files

In [11]:
os.remove('iris.parquet')