Skip to content

Commit

Permalink
fix(cubestore): return physical memory to the system at rest (#1981)
Browse files Browse the repository at this point in the history
We seem to hit limitations of `malloc` from glibc, which causes physical
memory to be retained by the allocator even after calls to `free`.

Call `malloc_trim` on allocation-heavy operations to ensure we return
freed memory to the system.

See the comment in the code for details.
  • Loading branch information
ilya-biryukov committed Feb 2, 2021
1 parent 54e0407 commit 7249a7d
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 0 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/cubestore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,4 @@ cloud-storage = "0.7.0"
tokio-util = { version = "0.6.2", features=["compat"] }
futures-timer = "3.0.2"
tokio-stream = { version = "0.1.2", features=["io-util"] }
scopeguard = "1.1.0"
5 changes: 5 additions & 0 deletions rust/cubestore/src/import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::metastore::is_valid_hll;
use crate::metastore::{Column, ColumnType, ImportFormat, MetaStore};
use crate::sql::timestamp_from_string;
use crate::store::{DataFrame, WALDataStore};
use crate::sys::malloc::trim_allocs;
use crate::table::{Row, TableValue};
use crate::CubeError;
use async_std::io::SeekFrom;
Expand All @@ -12,6 +13,7 @@ use core::mem;
use futures::{Stream, StreamExt};
use itertools::Itertools;
use mockall::automock;
use scopeguard::defer;
use std::fs;
use std::path::PathBuf;
use std::pin::Pin;
Expand Down Expand Up @@ -275,6 +277,9 @@ impl ImportService for ImportServiceImpl {
&mut temp_files,
)
.await?;

defer!(trim_allocs());

let mut rows = Vec::new();
while let Some(row) = row_stream.next().await {
if let Some(row) = row? {
Expand Down
1 change: 1 addition & 0 deletions rust/cubestore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub mod remotefs;
pub mod scheduler;
pub mod sql;
pub mod store;
pub mod sys;
pub mod table;
pub mod telemetry;
pub mod util;
Expand Down
5 changes: 5 additions & 0 deletions rust/cubestore/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::cluster::{Cluster, JobEvent};
use crate::metastore::job::JobType;
use crate::queryplanner::query_executor::QueryExecutor;
use crate::sql::parser::CubeStoreParser;
use crate::sys::malloc::trim_allocs;
use chrono::{TimeZone, Utc};
use datafusion::physical_plan::datetime_expressions::string_to_timestamp_nanos;
use datafusion::sql::parser::Statement as DFStatement;
Expand Down Expand Up @@ -269,6 +270,8 @@ impl Dialect for MySqlDialectWithBackTicks {
#[async_trait]
impl SqlService for SqlServiceImpl {
async fn exec_query(&self, q: &str) -> Result<DataFrame, CubeError> {
let _trim_allocs_guard; // scope guard to call trim_alloc() when necessary.

if !q.to_lowercase().starts_with("insert") {
trace!("Query: '{}'", q);
}
Expand Down Expand Up @@ -401,6 +404,8 @@ impl SqlService for SqlServiceImpl {
columns,
source,
}) => {
_trim_allocs_guard = scopeguard::guard((), |_| trim_allocs());

let data = if let SetExpr::Values(Values(data_series)) = &source.body {
data_series
} else {
Expand Down
6 changes: 6 additions & 0 deletions rust/cubestore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ use std::{
sync::Arc,
};

use crate::sys::malloc::trim_allocs;
use crate::table::parquet::ParquetTableStore;
use arrow::array::{Array, Int64Builder, StringBuilder};
use arrow::record_batch::RecordBatch;
use log::trace;
use mockall::automock;
use scopeguard::defer;

#[derive(Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct DataFrame {
Expand Down Expand Up @@ -300,6 +302,8 @@ impl ChunkStore {
#[async_trait]
impl ChunkDataStore for ChunkStore {
async fn partition(&self, wal_id: u64) -> Result<(), CubeError> {
defer!(trim_allocs());

let wal = self.meta_store.get_wal(wal_id).await?;
let table_id = wal.get_row().table_id();
let data = self.wal_store.get_wal(wal_id).await?;
Expand Down Expand Up @@ -328,6 +332,8 @@ impl ChunkDataStore for ChunkStore {
}

async fn repartition(&self, partition_id: u64) -> Result<(), CubeError> {
defer!(trim_allocs());

let partition = self.meta_store.get_partition(partition_id).await?;
if partition.get_row().is_active() {
return Err(CubeError::internal(format!(
Expand Down
24 changes: 24 additions & 0 deletions rust/cubestore/src/sys/malloc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/// Ask the memory allocator to returned the freed memory to the system.
/// This only has effect when compiled for glibc, this is a no-op on other systems.
///
/// Cubestore produces allocation patterns that hit the limitations of glibc`s malloc, which results
/// in too many physical memory pages being retained in the allocator's arena. This leads to the
/// resident set size growing over the acceptable limits.
/// Probably related to https://sourceware.org/bugzilla/show_bug.cgi?id=11261.
///
/// Use this function after code that produces considerable amount of memory allocations that
/// **have been already freed**.
#[cfg(target_os = "linux")] // We use `linux` to test for glibc.
pub fn trim_allocs() {
unsafe {
malloc_trim(0);
}
}

#[cfg(not(target_os = "linux"))]
pub fn trim_memory() {}

#[cfg(target_os = "linux")] // we assume glibc is linked on linux.
extern "C" {
fn malloc_trim(pad: usize) -> i32;
}
1 change: 1 addition & 0 deletions rust/cubestore/src/sys/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod malloc;

0 comments on commit 7249a7d

Please sign in to comment.