Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for the high memory usage #935

Merged
merged 4 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ enable_jemalloc

if [[ -z "$JAVA_OPTS" ]]; then
echo "JAVA_OPTS is empty, initialising with default values"
JAVA_OPTS="-Xms2g -Xmx2g"
JAVA_OPTS="-Xms6g -Xmx6g"
fi

# The -Xmx value is to make sure there is a minimum amount of memory; it can be
Expand Down
16 changes: 14 additions & 2 deletions docker/.env
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,17 @@ DWH_ROOT=./dwh
# data dump set `small`, which is the default mode.
DATABASE_DUMP_MODE=small

# JVM parameters to be used by the application
JAVA_OPTS=-Xms2g -Xmx2g
# JVM parameters to be used by the application. If the application is set to use FlinkRunner for the
# Beam pipelines in local cluster mode (which is the default currently), then the JVM total memory
# should be roughly set with a minimum value derived by the below equation.

# Minimum JVM Memory
# = Memory for JVM stack, perm files etc
# + (#Parallel Pipeline Threads * #Parallel Pipelines * Parquet Row Group Size
# * Parquet Row Group Inflation factor when data is decoded)
# Refer com.google.fhir.analytics.FlinkConfiguration.minJVMMemory() for the accurate value

# So, for a 32 core machine, with Parquet Row Group size of 32mb and with the capability of
# running 2 pipelines in parallel, the minimum JVM memory needed would be 6g (512mb allocation for
# JVM stack and others). The Parquet Row Group Inflation factor is estimated to be 2.
JAVA_OPTS=-Xms6g -Xmx6g
1 change: 1 addition & 0 deletions docker/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ fhirdata:
createHiveResourceTables: true
thriftserverHiveConfig: "config/thriftserver-hive-config_local.json"
hiveResourceViewsDir: "config/views"
rowGroupSizeForParquetFiles: 33554432 # 32mb
viewDefinitionsDir: "config/views"
sinkDbConfigPath: "config/hapi-postgres-config_local_views.json"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2020-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.fhir.analytics;

import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;

/**
* Base common class for the batch and incremental pipeline runs, containing the common
* configurations
*/
public interface BasePipelineOptions extends PipelineOptions {
@Description(
"The approximate size (bytes) of the row-groups in Parquet files. When this size is reached,"
+ " the content is flushed to disk. A large value means more data for one column can fit"
+ " into one big column chunk which means better compression and faster IO/query. On the"
+ " downside, larger value means more in-memory size will be needed to hold the data "
+ " before writing to files. The default value of 0 means use the default row-group size"
+ " of Parquet writers.")
@Default.Integer(0)
int getRowGroupSizeForParquetFiles();

void setRowGroupSizeForParquetFiles(int value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation.Required;

/** Options supported by {@link FhirEtl}. */
public interface FhirEtlOptions extends PipelineOptions {
public interface FhirEtlOptions extends BasePipelineOptions {

@Description("Fhir source server URL, e.g., http://localhost:8091/fhir, etc.")
@Required
Expand Down Expand Up @@ -166,16 +165,6 @@ public interface FhirEtlOptions extends PipelineOptions {

void setSecondsToFlushParquetFiles(int value);

@Description(
"The approximate size (bytes) of the row-groups in Parquet files. When this size is reached,"
+ " the content is flushed to disk. This won't be triggered if there are less than 100"
+ " records.\n"
+ "The default 0 uses the default row-group size of Parquet writers.")
@Default.Integer(0)
int getRowGroupSizeForParquetFiles();

void setRowGroupSizeForParquetFiles(int value);

// TODO: Either remove this feature or properly implement patient history fetching based on
// Patient Compartment definition.
@Description(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Google LLC
* Copyright 2020-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -157,10 +157,10 @@ public FetchRowsJdbcIo(
StringBuilder builder =
new StringBuilder(
"SELECT res.res_id, hfi.forced_id, res.res_type, res.res_updated, res.res_ver,"
+ " res.res_version, ver.res_encoding, ver.res_text, ver.res_text_vc FROM hfj_resource res JOIN"
+ " hfj_res_ver ver ON res.res_id = ver.res_id AND res.res_ver = ver.res_ver "
+ " LEFT JOIN hfj_forced_id hfi ON res.res_id = hfi.resource_pid WHERE"
+ " res.res_type = ? AND res.res_id % ? = ? AND"
+ " res.res_version, ver.res_encoding, ver.res_text, ver.res_text_vc FROM"
+ " hfj_resource res JOIN hfj_res_ver ver ON res.res_id = ver.res_id AND"
+ " res.res_ver = ver.res_ver LEFT JOIN hfj_forced_id hfi ON res.res_id ="
+ " hfi.resource_pid WHERE res.res_type = ? AND res.res_id % ? = ? AND"
+ " ver.res_encoding != 'DEL'");
// TODO do date sanity-checking on `since` (note this is partly done by HAPI client call).
if (since != null && !since.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.io.parquet.ParquetIO.Sink;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -226,12 +227,16 @@ public void processElement(ProcessContext c) {
.setCoder(
AvroCoder.of(
AvroConversionUtil.getInstance().getResourceSchema(type, fhirContext)));

Sink parquetSink =
ParquetIO.sink(AvroConversionUtil.getInstance().getResourceSchema(type, fhirContext))
.withCompressionCodec(CompressionCodecName.SNAPPY);
if (options.getRowGroupSizeForParquetFiles() > 0) {
parquetSink.withRowGroupSize(options.getRowGroupSizeForParquetFiles());
}
merged.apply(
FileIO.<GenericRecord>write()
.via(
ParquetIO.sink(
AvroConversionUtil.getInstance().getResourceSchema(type, fhirContext))
.withCompressionCodec(CompressionCodecName.SNAPPY))
.via(parquetSink)
.to(mergedDwhFiles.getResourcePath(type).toString())
.withSuffix(".parquet")
// TODO if we don't set this, DirectRunner works fine but FlinkRunner only writes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Google LLC
* Copyright 2020-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,11 +17,10 @@

import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation.Required;

/** Options supported by {@link ParquetMerger}. */
public interface ParquetMergerOptions extends PipelineOptions {
public interface ParquetMergerOptions extends BasePipelineOptions {

@Description(
"The path to the first set of Parquet files to be merged; the data-warehouse file structure"
Expand Down
6 changes: 6 additions & 0 deletions pipelines/controller/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ fhirdata:
# you can use those predefined views in your dev. env. too.
hiveResourceViewsDir: "config/views"

# This is the size of the Parquet Row Group (a logical horizontal partitioning into rows) that
# will be used for creating row groups in parquet file by pipelines. A large value means more data
# for one column can be fit into one big column chunk which will speed up the reading of column
# data. On the downside, more in-memory will be needed to hold the data before writing to files.
rowGroupSizeForParquetFiles: 33554432 # 32mb

# The location from which ViewDefinition resources are read and applied to the
# corresponding input FHIR resources. Any file in this directory that ends
# `.json` is assumed to be a single ViewDefinition. To output these views to a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public class DataProperties {

private String fhirServerOAuthClientSecret;

private int rowGroupSizeForParquetFiles;

@PostConstruct
void validateProperties() {
CronExpression.parse(incrementalSchedule);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.parquet.hadoop.ParquetWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.FileSystemUtils;
Expand Down Expand Up @@ -62,6 +63,13 @@ public class FlinkConfiguration {

private static final int DEFAULT_PARALLELISM = Runtime.getRuntime().availableProcessors();
static final String DEFAULT_MANAGED_MEMORY_SIZE = "256mb";

/**
* The factor of how much memory is needed when parquet row groups are read and decoded into
* in-memory. 2 is a rough estimate *
*/
private static final int PARQUET_ROW_GROUP_INFLATION_FACTOR = 2;

private static final String KEY_VALUE_FORMAT = "%s: %s";
private String flinkConfDir;

Expand Down Expand Up @@ -93,6 +101,7 @@ void initialiseFlinkConfiguration(DataProperties dataProperties) throws IOExcept
// Do not generate or validate the Flink configuration in case of non-local mode
if (!isFlinkModelLocal) return;

validateHeapMemoryConfig(dataProperties);
if (dataProperties.isAutoGenerateFlinkConfiguration()) {
Path confPath = Files.createTempDirectory(TEMP_FLINK_CONF_DIR);
logger.info("Creating Flink temporary configuration directory at {}", confPath);
Expand Down Expand Up @@ -187,6 +196,56 @@ private void deleteFilesRecursivelyOnExit(Path path) {
}));
}

/**
* Validates if the JVM memory is sufficient to run pipelines successfully. This validation helps
* the application to fail-fast rather than failing during batch execution.
*/
private void validateHeapMemoryConfig(DataProperties dataProperties) {
long maxMemory = Runtime.getRuntime().maxMemory();
long minJVMMemoryRequired = minJVMMemory(dataProperties);
if (minJVMMemoryRequired > maxMemory) {
throw new IllegalConfigurationException(
String.format(
"Insufficient max JVM memory, required %s MB but provided %s MB",
memoryInMB(minJVMMemoryRequired), memoryInMB(maxMemory)));
}
}

/**
* The minimum JVM memory needed for the pipelines to run without fail. This is derived based on
* the number of parquet row groups that can be read or written in parallel by the pipeline
* threads. This is roughly derived by the below formula.
*
* <p>Minimum JVM Memory = Memory for JVM stack, perm files etc + (#Parallel Pipeline Threads *
* #Parallel Pipelines * Parquet Row Group Size * Row Group Inflation factor)
*/
private long minJVMMemory(DataProperties dataProperties) {
// If the parquet files have already been created with a different row group size using the
// pipelines earlier, then the below values won't be honored. The below values will be applied
// only on the files which are written and read by the new pipelines.
int rowGroupSize =
dataProperties.getRowGroupSizeForParquetFiles() > 0
? dataProperties.getRowGroupSizeForParquetFiles()
chandrashekar-s marked this conversation as resolved.
Show resolved Hide resolved
: ParquetWriter.DEFAULT_BLOCK_SIZE;
int parallelism =
dataProperties.getNumThreads() > 0 ? dataProperties.getNumThreads() : DEFAULT_PARALLELISM;
chandrashekar-s marked this conversation as resolved.
Show resolved Hide resolved
// This is the minimum memory required for reading/writing parquet row groups in parallel by
// pipelines. Row group size is multiplied by inflation factor to accommodate the memory needed
// when the parquet rows are read and decoded in memory.
long memoryNeededForParquetRowGroups =
parallelism
* EtlUtils.NO_OF_PARALLEL_PIPELINES
* rowGroupSize
* PARQUET_ROW_GROUP_INFLATION_FACTOR;
// This is required for stack, perm files etc.
long minJVMMemoryMisc = 512 * 1024 * 1024;
return memoryNeededForParquetRowGroups + minJVMMemoryMisc;
}

private long memoryInMB(long memoryInBytes) {
return memoryInBytes / (1024 * 1024);
}

private boolean isFlinkModeLocal(DataProperties dataProperties) {
// TODO: Enable the pipeline for Flink non-local modes as well
// https://github.com/google/fhir-data-pipes/issues/893
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 Google LLC
* Copyright 2020-2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -80,6 +80,7 @@ private static long getNetworkMemory(int numThreads) {
public void testNonAutoGenerationWithDefaultConfiguration() throws IOException {
DataProperties dataProperties = new DataProperties();
dataProperties.setAutoGenerateFlinkConfiguration(false);
dataProperties.setNumThreads(1);
try (MockedStatic<GlobalConfiguration> mockedStatic =
Mockito.mockStatic(GlobalConfiguration.class)) {
Configuration defaultConfiguration = new Configuration();
Expand Down
Loading