Skip to content

Commit

Permalink
Merge branch 'apache:master' into export_gcs_options
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen authored and Naireen committed Mar 28, 2024
2 parents 9cb95de + 2f8854a commit b4cc808
Show file tree
Hide file tree
Showing 28 changed files with 2,407 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
Prompt:
What is a coder in Apache Beam?

Response:
In Apache Beam, a `Coder` is an object responsible for specifying how elements in a given `PCollection` are encoded and decoded into byte strings. The `Coder` objects provide type consistency and safety during distributed processing between workers, ensuring the correct interpretation of data based on its original types.

Every `PCollection` in an Apache Beam pipeline requires a corresponding `Coder`. While the Apache Beam SDKs can automatically infer a `Coder` for a `PCollection` based on its element type or the producing transform, there are cases where explicit specification or custom `Coder` development is necessary. It is important to note that multiple `Coder` objects can exist for a single data type.

The Apache Beam SDKs use various mechanisms to automatically infer the `Coder` for a `PCollection`. Each pipeline object contains a `CoderRegistry` object representing a mapping of language types to the default coder for those types.

In the Apache Beam SDKs for Python and Java, the `Coder` type provides the necessary methods for encoding and decoding data. The SDKs offer various `Coder` subclasses working with standard Python and Java types, available in the `apache_beam.coders` package for Python and the `Coder` package for Java.

By default, the Beam SDKs use the typehints (Python) or the type parameters (Java) from the transform's function object (such as `DoFn`) to infer the `Coder` for elements in a `PCollection`. For example, in Apache Beam Python SDK, a `ParDo` annotated with the typehints `@beam.typehints.with_input_types(int)` and `@beam.typehints.with_output_types(str)` indicates that it accepts `int` inputs and produces `str` outputs. The Python SDK automatically infers the default `Coder` for the output `PCollection`, in this case, `BytesCoder` (based on the default `CoderRegistry`).

You can use the `CoderRegistry` to look up the default coder for a given type or register a new default coder. The following tables show the default mappings of coders to standard types for any pipeline created using the Beam SDK for Java and Python:

| **Java Type** | **Default Coder** |
|---------------|-------------------|
| Double | DoubleCoder |
| Instant | InstantCoder |
| Integer | VarIntCoder |
| Iterable | IterableCoder |
| KV | KvCoder |
| List | ListCoder |
| Map | MapCoder |
| Long | VarLongCoder |
| String | StringUtf8Coder |

| **Python Type** | **Default Coder** |
|-----------------|-------------------|
| int | VarIntCoder |
| float | FloatCoder |
| str | BytesCoder |
| bytes | StrUtf8Coder |
| Tuple | TupleCoder |
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
Prompt:
How to reduce the number of external calls when calling external services from my Apache Beam pipeline?

Response:
To reduce the number of external calls when calling external services from your Apache Beam pipeline, you can employ batching techniques by using the `GroupIntoBatches` transform. Batching involves aggregating multiple elements into a single payload, reducing the number of requests sent to the external service and minimizing overhead.

Under the hood, the `GroupIntoBatches` transform utilizes state and timers to grant users precise control over batch size and buffering duration parameters, while abstracting away the implementation details. Key parameters include:
* `maxBufferDuration`: controls the maximum waiting time for a batch to be emitted.
* `batchSize`: determines the maximum number of elements in each batch. Elements are buffered until the specified number is reached, then emitted as a batch.
* `batchSizeBytes` (Java only): limits the byte size of a single batch, determined by the input coder.
* `elementByteSize` (Java only): sets the byte size of a single batch using a user-defined function.

Additionally, you can enhance parallelism by implementing the `withShardedKey()` function, which distributes a single key across multiple threads.

The following sample code snippets illustrate the use of `GroupIntoBatches` to batch elements in an Apache Beam pipeline in Java and Python.

Java:

```java
// Define your input PCollection
PCollection<KV<String, String>> input = ...;

// Specify the batch size
long batchSize = 100L;

// Apply GroupIntoBatches to group elements into batches of the specified size
PCollection<KV<String, Iterable<String>>> batched = input
.apply(GroupIntoBatches.<String, String>ofSize(batchSize))

// Set the coder for the batched PCollection
.setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of())))

// Apply ParDo to process each batch element
.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, KV<String, String>>() {
@ProcessElement
public void processElement(@Element KV<String, Iterable<String>> element,
OutputReceiver<KV<String, String>> r) {

// Call the web service with the batch values and output the result
r.output(KV.of(element.getKey(), callWebService(element.getValue())));
}
}));
```

Python:

```python
input_data = ... # Replace ... with your input PCollection

def call_web_service(batch):
# Function to call web service with batch of values
pass # Implement your web service call logic here

batch_size = 100
batched = (
input_data
| "GroupIntoBatches" >> beam.GroupIntoBatches(batch_size)
| "ProcessBatch" >> beam.Map(lambda element: (element[0], call_web_service(element[1])))
)
```

The provided examples demonstrate the usage of `GroupIntoBatches` to organize elements into batches of a specified size. Subsequently, the code invokes the designated web service with each batch of values and produces the resulting key-value pairs.

Utilizing `GroupIntoBatches` allows for the efficient processing of elements in bulk, reducing the overhead in your pipeline when making external API calls.

When grouping elements into batches, keep in mind the following considerations:
* Batches exclusively contain elements with the same key.
* Batches retain elements from the same window, preserving windowing logic.
* A batch might encompass elements from more than one bundle.

For more examples, refer to the Apache Beam documentation for the Python and Java `GroupIntoBatches` transforms.
72 changes: 72 additions & 0 deletions learning/prompts/documentation-lookup-nolinks/45_mltransform.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
Prompt:
What is `MLTransform` in Apache Beam and how can I use it to process my data?

Response:
`MLTransform` is a class in Apache Beam that consolidates various transforms for preprocessing and transforming data in machine learning pipelines. It offers a range of capabilities:
* **Consistent preprocessing**: ensuring uniform preprocessing steps across training and inference stages, thereby maintaining consistent results.
* **Text embedding generation**: generating embeddings on text data using Large Language Models (LLMs), facilitating tasks such as semantic analysis.
* **Full dataset pass**: conducting a comprehensive pass on the dataset, beneficial for transformations requiring analysis of the entire dataset.

The feature that sets `MLTransform` apart is its capability to perform a full pass on the dataset, enabling the execution of common preprocessing steps, such as:
* Normalization: normalizing input values using minimum-maximum scaling for better convergence during model training.
* Bucketization: converting floating-point values to integers using bucketization based on the distribution of input data.
* Vocabulary generation: converting strings to integers by generating a vocabulary over the entire dataset, essential for categorical features.
* TF-IDF calculation: calculating TF-IDF (Term Frequency-Inverse Document Frequency) weights for text data, useful for feature representation in natural language processing tasks.

MLTransform facilitates the generation of text embeddings and implementation of specialized processing modules for diverse machine learning tasks. It includes:
* Text embedding transforms: enabling the generation of embeddings for pushing data into vector databases or inference purposes. These include `SentenceTransformerEmbeddings` (leveraging models from the Hugging Face Sentence Transformers) and `VertexAITextEmbeddings` (utilizing models from the Vertex AI text-embeddings API).
* Data processing transforms: a range of transforms, such as `ApplyBuckets`, `Bucketize`, `ComputeAndApplyVocabulary`, and more, sourced from the TensorFlow Transforms (TFT) library.

Apache Beam ensures consistency in preprocessing steps for both training and test data by implementing a workflow for writing and reading artifacts within the `MLTransform` class. These artifacts are additional data elements generated by data transformations, such as minimum and maximum values from a `ScaleTo01` transformation. The `MLTransform` class enables users to specify whether it creates or retrieves artifacts using the `write_artifact_location` and `read_artifact_location` parameters. When using the `write_artifact_location` parameter, `MLTransform` runs specified transformations on the dataset and creates artifacts from these transformations, storing them for future use in the specified location. When employing the `read_artifact_location` parameter, `MLTransform` retrieves artifacts from the specified location and incorporates them into the transform. This artifact workflow ensures that test data undergoes the same preprocessing steps as training data. For instance, after training a machine learning model on data preprocessed by `MLTransform` with the `write_artifact_location` parameter, you can run inference on new test data using the `read_artifact_location` parameter to fetch transformation artifacts created during preprocessing from a specified location.

To preprocess data using `MLTransform`, follow these steps:
* Import necessary modules. Import the required modules from Apache Beam, including `MLTransform` and any specific transforms you intend to use.
* Prepare your data. Organize your data in a suitable format, such as a list of dictionaries, where each dictionary represents a data record.
* Set up artifact location. Create a temporary directory or specify a location where artifacts generated during preprocessing will be stored.
* Instantiate transform objects. Create instances of the transforms you want to apply to your data, specifying any required parameters such as columns to transform.
* Build your pipeline. Construct your Apache Beam pipeline, including the necessary steps to create your data collection, apply the `MLTransform` with specified transformations, and any additional processing or output steps.

Here is the code snippet utilizing `MLTransform` that you can modify and integrate into your Apache Beam pipeline:

```python
import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import <TRANSFORM_NAME>
import tempfile

data = [
{
<DATA>
},
]

artifact_location = tempfile.mkdtemp()
<TRANSFORM_FUNCTION_NAME> = <TRANSFORM_NAME>(columns=['x'])

with beam.Pipeline() as p:
transformed_data = (
p
| beam.Create(data)
| MLTransform(write_artifact_location=artifact_location).with_transform(
<TRANSFORM_FUNCTION_NAME>)
| beam.Map(print))
```

In this example, `MLTransform` writes artifacts to the specified location using `write_artifact_location`. In the provided code snippet, replace the following values:
* TRANSFORM_NAME denotes the transform to be used.
* DATA denotes input data.
* TRANSFORM_FUNCTION_NAME denotes the name that you assign to your transform function in your code.

The following example illustrates using `MLTransform` to normalize data between 0 and 1 using the `ScaleToZScore` transformation:

```python
scale_to_z_score_transform = ScaleToZScore(columns=['x', 'y'])
with beam.Pipeline() as p:
(data | MLTransform(write_artifact_location=artifact_location).with_transform(scale_to_z_score_transform))
```

You can pass data processing transforms to `MLTransform` using either the `with_transform` method or a list. The `with_transform` method is useful when you want to apply a single transform to your dataset. On the other hand, providing a list of transforms is beneficial when you need to apply multiple transforms sequentially:

```python
MLTransform(transforms=transforms, write_artifact_location=write_artifact_location)
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
Prompt:
How can I explore and analyze my ML dataset in Apache Beam?

Response:
Apache Beam provides a robust framework for creating data processing pipelines for machine learning (ML) applications, offering various capabilities for preprocessing and analyzing data. Alongside powerful transforms like `MLTransform`, Apache Beam provides a rich set of I/O connectors, facilitating seamless integration with existing file systems, databases, or messaging queues.

In AI/ML projects, the following stages of data processing are essential:
* **Data exploration**: analyzing and understanding the characteristics, patterns, and distributions within a dataset to gain insight and understand the relationship between different variables.
* **Data preprocessing**: cleaning, transforming, and preparing raw data to make it suitable for machine learning algorithms.
* **Data postprocessing**: applying additional transformations to the output of a machine learning model after inference for interpretation and readability.
* **Data validation**: assessing the quality, consistency, and correctness of the data to ensure that the data meets certain standards or criteria and is suitable for the intended analysis or application.

You can implement all these data processing stages in Apache Beam pipelines.

For initial data exploration, the Apache Beam Python SDK provides a DataFrame API built on top of the pandas implementation to declare and define pipelines.

Pandas is an open-source Python library that provides data structures for data manipulation and analysis. To simplify working with relational or labeled data, pandas uses DataFrames, a data structure that contains two-dimensional tabular data and provides labeled rows and columns for the data. Pandas DataFrames are widely used for data exploration and preprocessing due to their ease of use and comprehensive functionality.

Beam DataFrames offers a pandas-like API to declare and define Beam processing pipelines, providing a familiar interface for building complex data-processing pipelines using standard pandas commands. You can think of Beam DataFrames as a domain-specific language (DSL) for Beam pipelines built into the Beam Python SDK. Using this DSL, you can create pipelines without referencing standard Beam constructs like `ParDo` or `CombinePerKey`. The DataFrame API enables the conversion of a `PCollection` to a DataFrame, allowing interaction with the data using standard pandas commands. Beam DataFrames facilitates iterative development and visualization of pipeline graphs by using the Apache Beam interactive runner with JupyterLab notebooks.

Here is an example of data exploration in Apache Beam using a notebook:

```python
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

p = beam.Pipeline(InteractiveRunner())
beam_df = p | beam.dataframe.io.read_csv(input_path)

# Investigate columns and data types
beam_df.dtypes

# Generate descriptive statistics
ib.collect(beam_df.describe())

# Investigate missing values
ib.collect(beam_df.isnull())
```

The following example demonstrates using Beam DataFrames to read the New York City taxi data from a CSV file, perform a grouped aggregation, and write the output back to CSV:

```python
from apache_beam.dataframe.io import read_csv

with pipeline as p:
rides = p | read_csv(input_path)

# Count the number of passengers dropped off per LocationID
agg = rides.groupby('DOLocationID').passenger_count.sum()
agg.to_csv(output_path)
```

In this example, pandas can infer column names from the first row of the CSV data, which is where `passenger_count` and `DOLocationID` come from.
To use the DataFrames API in a larger pipeline, you can convert a `PCollection` to a DataFrame, process the DataFrame, and then convert the DataFrame back to a `PCollection`. To achieve this conversion, you need to use schema-aware `PCollection` objects.

For more information and examples of using Beam DataFrames and embedding them in a pipeline, consult the Beam DataFrames section in the Apache Beam documentation and explore sample DataFrame pipelines available in the Apache Beam GitHub repository.
65 changes: 65 additions & 0 deletions learning/prompts/documentation-lookup-nolinks/47_resource_hints.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
Prompt:
How to customize compute resources for an Apache Beam pipeline?

Response:
Resource hints allow pipeline authors to provide Apache Beam runners with additional information, suggesting desired compute resource requirements. Runners can apply such hints to configure the execution environment for specific transforms or the entire pipeline.

Currently, Apache Beam supports two resource hints:
* `min_ram`: specifies the minimum amount of RAM to allocate to workers, expressed in MB, GB, MiB, or GiB (for example, `min_ram="4GB"`). This value serves as a lower limit when allocating memory to workers.
* `accelerator`: describes a hardware accelerator for processing a transform, such as specifying GPU type, count, and configuration options.

Resource hints can be nested, allowing hints to be specified on sub-transforms (inner transforms) of a composite transform (parent transform). When applying hints, runners follow these rules:
* For `min_ram`, the largest value among a transform and its parent transforms is used.
*Example: if an inner transform sets `min_ram` to 16GB and the parent transform sets it to 8GB, the inner transform's value (16GB) is used for steps within the inner transform, and the parent transform's value (8GB) is used for steps outside the inner transform. If an inner transform sets min_ram to 16GB and the parent transform sets it to 32GB, a hint of 32GB is used for all steps in the entire transform.*
* For `accelerator`, the innermost value takes precedence.
*Example: if an inner transform's accelerator hint differs from a parent transform's, the inner transform's hint is used.*

***Specify Resource Hints for a Pipeline:***

In Apache Beam's Java SDK, you can specify hints via the `resourceHints` pipeline option. Example:

```java
mvn compile exec:java -Dexec.mainClass=com.example.MyPipeline \
-Dexec.args="... \
--resourceHints=min_ram=<N>GB \
--resourceHints=accelerator='hint'" \
-Pdirect-runner
```

In the Python SDK, you can specify hints via the `resource_hints` pipeline option. Example:

```python
python my_pipeline.py \
... \
--resource_hints min_ram=<N>GB \
--resource_hints accelerator="hint"
```

***Specify Resource Hints for a Transform:***

In Apache Beam’s Java SDK, you can specify resource hints programmatically on pipeline steps (transforms) using the `setResourceHints` method. Example:

```java
pcoll.apply(MyCompositeTransform.of(...)
.setResourceHints(
ResourceHints.create()
.withMinRam("15GB")
.withAccelerator("type:nvidia-tesla-k80;count:1;install-nvidia-driver")))

pcoll.apply(ParDo.of(new BigMemFn())
.setResourceHints(
ResourceHints.create().withMinRam("30GB")))
```

In the Python SDK, you can specify resource hints programmatically on pipeline steps (transforms) using the `PTransforms.with_resource_hints` method. Example:

```python
pcoll | MyPTransform().with_resource_hints(
min_ram="4GB",
accelerator="type:nvidia-tesla-k80;count:1;install-nvidia-driver")

pcoll | beam.ParDo(BigMemFn()).with_resource_hints(
min_ram="30GB")
```

The interpretation and actuation of resource hints can vary between runners.

0 comments on commit b4cc808

Please sign in to comment.