# 3.3 signac-flow aggregation example

## About

This notebook contains a minimal example for running workflows on aggregates of jobs using **signac-flow**. 


## Author

Hardik Ojha

## Prerequisites

Following are the requirements for this notebook to run successfully

1. **signac-flow** >= 0.13
2. numpy
3. matplotlib

The below commands can be executed to install the required packages 
```
pip install signac-flow >= 0.13
pip install matplotlib
pip install numpy
```

# Objective

Goal of this project is to plot the temperature values present in **signac** data space along with the average value of all the temperatures present.

# Environment setup

Before we initialize a **signac** project inside the `projects/tutorial-aggregation` directory, we need to be sure that no such directory exists.

In [None]:
rm -rf projects/tutorial-aggregation

In [None]:
import random

import signac
from flow import FlowProject, aggregator

import matplotlib.pyplot as plt
import numpy as np

# Setting default figure size
plt.rcParams["figure.figsize"] = (15, 8)


# Initializing a signac project
project = signac.init_project('AggregationTutorialProject', 'projects/tutorial-aggregation')

# Initializing data space

For the purpose of this notebook, we will be creating a random dataset using some mathematical calculations.

All the **signac** jobs will have two state point parameters

1. `day`: Day of the month
2. `temperature`: Average temperature for that day

In [None]:
days = 31
avg_temperature = 10 + np.random.rand() * 20
daily_variation = -np.cos(days / 12 * 2 * np.pi) * days
random_variation = np.random.rand(days)
temperatures = avg_temperature + daily_variation + random_variation

for i, temp in enumerate(temperatures):
    sp = dict(day=i + 1, temperature=temp)
    # Create a signac job having the state point parameters 'day' and 'temperature'
    project.open_job(sp).init()

# Generating Project

In order to achieve our goal using **signac-flow**, we need to generate a `FlowProject` and add operations to it.
There will be following operations in our workflow:

1. `compute_average_temperature`: This operation computes the average temperature of the month and stores it in the project document. For this operation, all the jobs present in the **signac** project will be aggregated together. This will be the first operation to get executed in our workflow.
2. `plot_deviation_from_average`: This operation plots the temperature (as a scatter plot) and the average temperature of the month. For this operation, all the jobs, when sorted by the state point parameter `day`, present in the **signac** project will be aggregated together. This will be executed after the operation `compute_average_temperature`.

In [None]:
class AggregationProject(FlowProject):
    pass


@aggregator()
@AggregationProject.operation
@AggregationProject.post(
    lambda *jobs: project.doc.get('average_temperature', False)
)
def compute_average_temperature(*jobs):
    average_temp = sum([job.sp.temperature for job in jobs]) / len(jobs)
    project.doc['average_temperature'] = average_temp

    
@aggregator(sort_by='day')
@AggregationProject.operation
@AggregationProject.pre(
    lambda *jobs: project.doc.get('average_temperature', False)
)
def plot_deviation_from_average(*jobs):
    average_temp = project.doc['average_temperature']
    days = [job.sp.day for job in jobs] 
    plt.plot(days, [job.sp.temperature for job in jobs], 'rx')
    plt.plot(days, [average_temp] * len(days), 'g')
    plt.legend(['Temperature per day', 'Average Temperature'])
    plt.xlabel('Day of month')
    plt.ylabel('Temperature')
    plt.show()

# Executing the workflow

### Initializing the FlowProject
In order to register the operations, conditions, and the aggregators associated with the project we created, we need to initialize a `FlowProject`.
Since the **signac** project does not belong in the same directory of the `FlowProject`, there is a need to specify the configuration of the **signac** project.

In [None]:
flow_project = AggregationProject(config=project.config)

### Running the workflow
The `FlowProject.run` method allows the execution all eligible operations in the `FlowProject`.

In [None]:
flow_project.run()

# Summary

We have successfully plotted the temperature values present in **signac** data space along with the average value of all the temperatures present using the aggregation feature of **signac-flow**

To know more about how to use aggregation, visit https://docs.signac.io/en/latest/aggregation.html