# Running Temporian on Apache Beam

This guide shows how to run Temporian programs on large datasets with Beam.

**WARNING:** Temporian with Apache Beam is experimental. The API might change, some optimizations are not implemented, and some operators are not available.

The reader is assumed to be familiar with Temporian in-process execution. Please read the [3 Minutes to Temporian](../3_minutes), or the [User Guide](../user_guide) before.


## Introduction

The Temporian [User Guide](user_guide.md) teaches how to run a Temporian program in-process using Python. This approach is ideal for quick experimentation and for datasets that are small enough to fit in a single computer. However, for large datasets with billions or even trillions of events, this approach will run out of memory. Instead, you can execute Temporian programs on large datasets using [Apache Beam](https://beam.apache.org/).

**Info:** Apache Beam is a library for large-scale distributed computation. 

Executing a Temporian program in-process and executing it with  Beam are very similar. There are only two things to watch out for:

- The Temporian program needs to be defined in [graph execution mode](user_guide.md#eager-mode-vs-graph-mode). Eager execution mode is not compatible with Beam.
- Data set I/O needs to be Beam-compatible. To enable this, replace the normal dataset I/O with their counterparts in `temporian.beam` (for example `temporian.from_csv` will become `temporian.beam.from_csv`). Alternatively, you can implement your own IO code using [Beam IO connectors](https://beam.apache.org/documentation/io/connectors/). Dataset I/O with Beam will be explained in more detail in [this section](#input-and-output-data).


## Install dependencies

We need to install Temporian and Apache Beam.

In [None]:
%pip install temporian[beam] -q

## A simple example

In this example, we run a Temporian program on a csv file both with the in-process and with the Beam approaches.

First, we create a small csv file containing our input data. This dataset contains 5 events with two features (`a` and `b`). Individual events are stored as separate rows.


In [None]:
%%writefile input.csv
timestamp,a,b
1,x,1
2,y,2
13,z,3
14,x,2
15,y,1

**Note:** In practice, large datasets should be divided into multiple files called “shards” to facilitate the distribution of the work among machines. In this case, dataset paths can be a glob expression e.g., `input-*.csv`. For more information, see the "Speed-up pipeline" section.

**Note 2:** In this file, each event is presented in a separate row. For dataset formats that support it, it is more efficient to group examples of the same index key together. For more information, see the "Speed-up pipeline" section.

The following code processes our csv file and outputs the result in the `output_in_process.csv` csv file using  in-process eager execution mode.

In [None]:
import temporian as tp

# Our processing function
def my_processing(data):
    # Re-index the events according to "a",
    # and apply a 4 time-unit moving average.
   	return data.add_index("a").moving_sum(4)

# Load the csv in memory into an EventSet 
input = tp.from_csv("input.csv")

# Apply the processing
output = my_processing(input)

# Save the results to a csv file
tp.to_csv(output, path="output_in_process.csv")

The next code applies the same processing and outputs the result in the `output_beam.csv` csv file using the Beam execution mode.

In [None]:
import temporian as tp
import temporian.beam as tpb  # Import Temporian's Beam capabilities
import apache_beam as beam

# Same processing as before
def my_processing(data):
    return data.add_index("a").moving_sum(4)

# Create the input node i.e. the schema of our input
input_node = tp.input_node([("a", tp.str_), ("b", tp.float32)])
# Create a Temporian graph. No computation is applied so far.
output_node = my_processing(input_node)

# Define a Beam pieline
with beam.Pipeline() as pipeline:
    output = (
        pipeline
        # Reads events from the csv file.
        | tpb.from_csv("input.csv", input_node.schema)
        # Apply the processing 
        | tpb.run(input=input_node, output=output_node)
        # Save the results to a csv file.
        # Note: shard_name_template="" outputs the results in a single csv file.
        | tpb.to_csv("output_beam.csv", output_node.schema, shard_name_template="")
    )

    # Execute the pipeline
    pipeline.run()

Finally, we check the content of the output csv files.

In [None]:
%cat output_in_process.csv

In [None]:
%cat output_beam.csv

Both files contain the same events. However, notice that events are not stored in the same order: In Temporian event order is not constrained, and different implementations and backend are free to change it.

## Input and output data

**Info:** Apache Beam represents data using PCollections. A PCollection is essentially a distributed and homogeneous collection of Python values. For example, a PCollection can contain strings, lists of strings, or more complex objects.

You can import and export data from files using `temporian.beam.to_<format>` and `temporian.beam.from_<format>` functions, or you can write your own exporter on top of the [Beam IO connectors](https://beam.apache.org/documentation/io/connectors/).

Here are some examples of data IO functions:

- `temporian.beam.{from|to}_csv`: Import and export data from CSV files.
- `temporian.beam.{from|to}_tensorflow_record`: Import and export data from TensorFlow Record files.

The method `temporian.beam.to_event_set` and `temporian.beam.to_dict` can respectively convert dictionaries of values from and to Temporian Beam EventSets. For example:

```python
| ... # Generate dictionaries of values.
| tpb.to_event_set(input_node.schema)
| tpb.run(input=input_node, output=output_node)
| tpb.to_dict(output_node.schema)
| ... # Do something with the results
```

Some IO functions such as `to_event_set`, `to_dict` and `temporian.beam.{from|to}_tensorflow_record` support different formats to represent events and are controlled by the `format` attribute.

- `format="single_events"`: Each event is a dictionary of feature/index key to value. Each event is a different PCollection item.
- `format="grouped_by_index"` (default): Events with the same index are grouped in the same PCollection. Each item of this PCollection is a dictionary of feature/index key to value. Index values are represented as python primitives (e.g. int) and features are represented as Numpy arrays.

The next code shows how to feed three events, with two features “a” and “b” and one index “c”, fed into Temporian with both the `format="single_events"` and `format="grouped_by_index"` format:

```python
# With format="single_events"
| pipeline
| beam.Create([
    # Index and features always have one value
    {"timestamp": 1., "a": 4., "b": b"X", "c": 10},
    {"timestamp": 2., "a": 5., "b": b"Y", "c": 10},
    {"timestamp": 3., "a": 6., "b": b"Z", "c": 11},
    ])
| tpb.to_event_set(input_node.schema, format="single_events")
| tpb.run(...)
| ...

# With format="grouped_by_index" (recommended)
| pipeline
| beam.Create([
    {"timestamp": np.array([1., 2.], dtype=np.float64),
     # "a" is a feature, so theree is one value per timestamp.
    "a": np.array([4., 5,], dtype=np.float32),
    "b": np.array([b"X", b"Y"], dtype=np.bytes_),
    "c": 10}, # "c" is an index, so there is only one value.
    
    {"timestamp": np.array([3.], dtype=np.float64),
    "a": np.array([6.,], dtype=np.float32),
    "b": np.array([b"Z"], dtype=np.bytes_),
    "c": 11}
])
| tpb.to_event_set(input_node.schema, format="grouped_by_index")
| tpb.run(...)
| ...
```

**Warning:** Temporian dtype is not permissive. For example, a tp.float32 feature can only consume a numpy array of np.float32. Feeding a list or a numpy array of float64 will fail.

Note `indexedEvents` is significantly faster than `singleEvents` format. When possible, use the `indexedEvents` format.

Using `tpb.to_event_set` and `tpb.to_dict`, you can input and export data from other Beam IO connectors. The next example shows how to import events stored in TF.Records into Temporian.

**Note:** This is just an example, TF.Records are natively supported by Temporian with `tpb.from_tensorflow_record`.

```python
def _parse_single_event_tf_example(example: example_pb2.Example) -> Dict[str, Union[float, int, bytes]]:
    """Converts a TF.Example proto into a dictionary of values."""
    
    dict_example = {}
    for k, v in example.features.feature.items():
        if v.HasField("int64_list"):
            dict_example[k] = v.int64_list.value[0]
        elif v.HasField("float_list"):
            dict_example[k] = v.float_list.value[0]
        elif v.HasField("bytes_list"):
            dict_example[k] = v.bytes_list.value[0]
        else:
            raise ValueError("Bad feature")
    return dict_example


| beam.io.tfrecordio.ReadFromTFRecord(
    file_pattern=input,
    coder=beam.coders.ProtoCoder(example_pb2.Example),
    compression_type=beam.io.filesystem.CompressionTypes.GZIP)
| beam.Map(_parse_single_event_tf_example)
| tpb.to_event_set(input_node.schema, format="single_events")
| tpb.run(...)
```

## Speed-up pipeline

Following are some considerations to create fast Beam+Temporian pipelines on large datasets.

### Consider in-process execution mode

The in-process execution mode runs on a single machine. If your dataset fits on a single machine, it is likely that in-process execution with the `@tp.compiled` or graph mode will be faster than the Beam execution mode.

You can see the memory usage of an EventSet when printing it (e.g., `print(evset)`) or with the `evset.memory_usage()` method.

### Divide dataset into multiple files

Ensure that any large dataset is divided into multiple files. As a rule of thumb, (rule 1) files should not be smaller than 10MB and (rule 2) there should be at least 10 files for each worker (unless rule 1 prevents it).

For instance, if your dataset contains 100GB of data and you have 100 workers, the input dataset should be divided into 1000 files of size 100MB.

Most Beam sharded output operations will automatically determine the number of output shards.

### Import and export events using the indexedEvents format

Prefer the `tpb.UserEventSetFormat.indexedEvents` event format to the `tpb.UserEventSetFormat.singleEvent` format. The format is specified as an argument of the import `tpb.to_event_set` and export `tpb.to_dict` functions.

### Avoid single index values with large amount of events

**Note:** Temporian indexes are currently implemented using Beam keys. This means that all the values of a given feature and a given index key must be able to fit into the memory of a single worker.

When intermediate values of a computation depend on index values with a large number of events, consider the following recommendations:

Add indexes before removing them. For instance, prefer `x.add_index(“a”).drop_index(“b”)` over `x.drop_index(“b”).add_index(“a”)`.

Apply aggregations before removing indexes. For instance, prefer `x.moving_sum(5).drop_index(“a”).moving_sum(tp.duration.shortest)` over `x.drop_index(a).moving_sum(5)`.


### Import indexed EventSet

Note: This rule is a special case of the “Avoid single index values with large amounts of events” and “Import and export events using the indexedEvents format” rules described above.

When possible, your pipeline should consume and return indexed EventSets instead of creating those indexes during the computation.

The following two examples return the same result. However, the first example can be significantly faster.

**Example 1:** Load an indexed EventSet and then apply some transformations.

```python
input_node = tp.input_node(
features=[("f", tp.int32)],
indexes=[("x", tp.str_)])
output_node = input_node.moving_sum(5)

...
| tpb.from_csv("input.csv", input_node.schema)
| tpb.run(input=input_node, output=output_node)
...
```

**Example 2:** Load a non-indexed EventSet, add an index with the `add_index` operator, and apply the same transformations.

```python
input_node = tp.input_node(features=[("f", tp.int32), ("x", tp.str_)])
output_node = input_node.add_index("x").moving_sum(5)

...
| tpb.from_csv("input.csv", input_node.schema)
| tpb.run(input=input_node, output=output_node)
...
```