Skip to content

Commit

Permalink
Merge branch 'master' into fix-restore-with-async-replicated-database
Browse files Browse the repository at this point in the history
  • Loading branch information
antonio2368 committed Mar 7, 2024
2 parents 38e94e2 + 9e7894d commit e2741a6
Show file tree
Hide file tree
Showing 117 changed files with 1,724 additions and 1,911 deletions.
855 changes: 50 additions & 805 deletions .github/workflows/pull_request.yml

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions .github/workflows/reusable_build_stage.yml
@@ -0,0 +1,32 @@
### FIXME: merge reusable_test.yml and reusable_build.yml as they are almost identical
# and then merge reusable_build_stage.yml and reusable_test_stage.yml

name: BuildStageWF
'on':
workflow_call:
inputs:
stage:
description: stage name
type: string
required: true
data:
description: ci data
type: string
required: true

jobs:
s:
if: ${{ !failure() && !cancelled() }}
strategy:
fail-fast: false
matrix:
job_name_and_runner_type: ${{ fromJson(inputs.data).stages_data[inputs.stage] }}
uses: ./.github/workflows/reusable_build.yml
with:
build_name: ${{ matrix.job_name_and_runner_type.job_name }}
runner_type: ${{ matrix.job_name_and_runner_type.runner_type }}
# don't forget to pass force flag (no ci cache/no reuse) - once it's needed
force: false
# for now let's do I deep checkout for builds
checkout_depth: 0
data: ${{ inputs.data }}
25 changes: 25 additions & 0 deletions .github/workflows/reusable_test_stage.yml
@@ -0,0 +1,25 @@
name: StageWF
'on':
workflow_call:
inputs:
stage:
description: stage name
type: string
required: true
data:
description: ci data
type: string
required: true

jobs:
s:
if: ${{ !failure() && !cancelled() }}
strategy:
fail-fast: false
matrix:
job_name_and_runner_type: ${{ fromJson(inputs.data).stages_data[inputs.stage] }}
uses: ./.github/workflows/reusable_test.yml
with:
test_name: ${{ matrix.job_name_and_runner_type.job_name }}
runner_type: ${{ matrix.job_name_and_runner_type.runner_type }}
data: ${{ inputs.data }}
2 changes: 1 addition & 1 deletion contrib/aws
3 changes: 2 additions & 1 deletion docker/test/style/Dockerfile
Expand Up @@ -18,7 +18,8 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
python3-pip \
yamllint \
locales \
&& pip3 install black==23.1.0 boto3 codespell==2.2.1 mypy==1.3.0 PyGithub unidiff pylint==2.6.2 \
&& pip3 install black==23.12.0 boto3 codespell==2.2.1 mypy==1.8.0 PyGithub unidiff pylint==3.1.0 \
requests types-requests \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* /var/cache/debconf /tmp/* \
&& rm -rf /root/.cache/pip
Expand Down
24 changes: 24 additions & 0 deletions docker/test/upgrade/run.sh
Expand Up @@ -79,6 +79,18 @@ remove_keeper_config "async_replication" "1"
# create_if_not_exists feature flag doesn't exist on some older versions
remove_keeper_config "create_if_not_exists" "[01]"

#todo: remove these after 24.3 released.
sudo cat /etc/clickhouse-server/config.d/azure_storage_conf.xml \
| sed "s|<object_storage_type>azure|<object_storage_type>azure_blob_storage|" \
> /etc/clickhouse-server/config.d/azure_storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/azure_storage_conf.xml.tmp /etc/clickhouse-server/config.d/azure_storage_conf.xml

#todo: remove these after 24.3 released.
sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
| sed "s|<object_storage_type>local|<object_storage_type>local_blob_storage|" \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml

# latest_logs_cache_size_threshold setting doesn't exist on some older versions
remove_keeper_config "latest_logs_cache_size_threshold" "[[:digit:]]\+"

Expand Down Expand Up @@ -113,6 +125,18 @@ sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
> /etc/clickhouse-server/config.d/keeper_port.xml.tmp
sudo mv /etc/clickhouse-server/config.d/keeper_port.xml.tmp /etc/clickhouse-server/config.d/keeper_port.xml

#todo: remove these after 24.3 released.
sudo cat /etc/clickhouse-server/config.d/azure_storage_conf.xml \
| sed "s|<object_storage_type>azure|<object_storage_type>azure_blob_storage|" \
> /etc/clickhouse-server/config.d/azure_storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/azure_storage_conf.xml.tmp /etc/clickhouse-server/config.d/azure_storage_conf.xml

#todo: remove these after 24.3 released.
sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
| sed "s|<object_storage_type>local|<object_storage_type>local_blob_storage|" \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp
sudo mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml

# async_replication setting doesn't exist on some older versions
remove_keeper_config "async_replication" "1"

Expand Down
Expand Up @@ -14,8 +14,6 @@

- `N` – The number of elements to return.

If the parameter is omitted, default value is the size of input.

- `column` – The value (Integer, String, Float and other Generic types).

**Example**
Expand All @@ -36,13 +34,12 @@
Gets all the String implementations of all numbers in column:

``` sql
SELECT groupArraySorted(str) FROM (SELECT toString(number) as str FROM numbers(5));
SELECT groupArraySorted(5)(str) FROM (SELECT toString(number) as str FROM numbers(5));

```

``` text
┌─groupArraySorted(str)────────┐
│ ['0','1','2','3','4'] │
└──────────────────────────────┘
```

┌─groupArraySorted(5)(str)─┐
│ ['0','1','2','3','4'] │
└──────────────────────────┘
```
2 changes: 1 addition & 1 deletion programs/local/LocalServer.cpp
Expand Up @@ -841,7 +841,7 @@ void LocalServer::addOptions(OptionsDescription & options_description)

/// If structure argument is omitted then initial query is not generated
("structure,S", po::value<std::string>(), "structure of the initial table (list of column and type names)")
("file,f", po::value<std::string>(), "path to file with data of the initial table (stdin if not specified)")
("file,F", po::value<std::string>(), "path to file with data of the initial table (stdin if not specified)")

("input-format", po::value<std::string>(), "input format of the initial table data")
("output-format", po::value<std::string>(), "default output format")
Expand Down
23 changes: 10 additions & 13 deletions .pylintrc → pyproject.toml
@@ -1,6 +1,4 @@
# vim: ft=config

[BASIC]
[tool.pylint.BASIC]
max-module-lines=2000
# due to SQL
max-line-length=200
Expand All @@ -9,11 +7,13 @@ max-branches=50
max-nested-blocks=10
max-statements=200

[FORMAT]
ignore-long-lines = (# )?<?https?://\S+>?$
[tool.pylint.FORMAT]
#ignore-long-lines = (# )?<?https?://\S+>?$

[MESSAGES CONTROL]
disable = missing-docstring,
[tool.pylint.'MESSAGES CONTROL']
# pytest.mark.parametrize is not callable (not-callable)
disable = '''
missing-docstring,
too-few-public-methods,
invalid-name,
too-many-arguments,
Expand All @@ -26,18 +26,15 @@ disable = missing-docstring,
wildcard-import,
unused-wildcard-import,
singleton-comparison,
# pytest.mark.parametrize is not callable (not-callable)
not-callable,
# https://github.com/PyCQA/pylint/issues/3882
# [Python 3.9] Value 'Optional' is unsubscriptable (unsubscriptable-object) (also Union)
unsubscriptable-object,
# Drop them one day:
redefined-outer-name,
broad-except,
bare-except,
no-else-return,
global-statement
'''

[SIMILARITIES]
[tool.pylint.SIMILARITIES]
# due to SQL
min-similarity-lines=1000

5 changes: 3 additions & 2 deletions src/Analyzer/Passes/QueryAnalysisPass.cpp
Expand Up @@ -6651,7 +6651,6 @@ void QueryAnalyzer::initializeTableExpressionData(const QueryTreeNodePtr & table
if (column_default && column_default->kind == ColumnDefaultKind::Alias)
{
auto alias_expression = buildQueryTree(column_default->expression, scope.context);
alias_expression = buildCastFunction(alias_expression, column_name_and_type.type, scope.context, false /*resolve*/);
auto column_node = std::make_shared<ColumnNode>(column_name_and_type, std::move(alias_expression), table_expression_node);
column_name_to_column_node.emplace(column_name_and_type.name, column_node);
alias_columns_to_resolve.emplace_back(column_name_and_type.name, column_node);
Expand Down Expand Up @@ -6684,7 +6683,9 @@ void QueryAnalyzer::initializeTableExpressionData(const QueryTreeNodePtr & table
alias_column_resolve_scope,
false /*allow_lambda_expression*/,
false /*allow_table_expression*/);

auto & resolved_expression = alias_column_to_resolve->getExpression();
if (!resolved_expression->getResultType()->equals(*alias_column_to_resolve->getResultType()))
resolved_expression = buildCastFunction(resolved_expression, alias_column_to_resolve->getResultType(), scope.context, true);
column_name_to_column_node = std::move(alias_column_resolve_scope.column_name_to_column_node);
column_name_to_column_node[alias_column_to_resolve_name] = alias_column_to_resolve;
}
Expand Down
42 changes: 23 additions & 19 deletions src/Common/PageCache.cpp
Expand Up @@ -191,7 +191,7 @@ size_t PageCache::maxChunks() const { return chunks_per_mmap_target * max_mmaps;

size_t PageCache::getPinnedSize() const
{
std::unique_lock lock(global_mutex);
std::lock_guard lock(global_mutex);
return (total_chunks - lru.size()) * bytes_per_page * pages_per_chunk;
}

Expand All @@ -202,8 +202,11 @@ PageCache::MemoryStats PageCache::getResidentSetSize() const
if (use_madv_free)
{
std::unordered_set<UInt64> cache_mmap_addrs;
for (const auto & m : mmaps)
cache_mmap_addrs.insert(reinterpret_cast<UInt64>(m.ptr));
{
std::lock_guard lock(global_mutex);
for (const auto & m : mmaps)
cache_mmap_addrs.insert(reinterpret_cast<UInt64>(m.ptr));
}

ReadBufferFromFile in("/proc/self/smaps");

Expand Down Expand Up @@ -283,6 +286,7 @@ PageCache::MemoryStats PageCache::getResidentSetSize() const
}
#endif

std::lock_guard lock(global_mutex);
stats.page_cache_rss = bytes_per_page * pages_per_chunk * total_chunks;
return stats;
}
Expand All @@ -294,12 +298,12 @@ PinnedPageChunk PageCache::getOrSet(PageCacheKey key, bool detached_if_missing,
bool incremented_profile_events = false;

{
std::unique_lock lock(global_mutex);
std::lock_guard lock(global_mutex);

auto * it = chunk_by_key.find(key);
if (it == chunk_by_key.end())
{
chunk = getFreeChunk(lock);
chunk = getFreeChunk();
chassert(!chunk->key.has_value());

if (!detached_if_missing)
Expand Down Expand Up @@ -331,14 +335,14 @@ PinnedPageChunk PageCache::getOrSet(PageCacheKey key, bool detached_if_missing,
/// otherwise we may detach a chunk pinned by someone else, which may be unexpected
/// for that someone else. Or maybe the latter is fine, dropCache() already does it.)
if (chunk->pages_populated.get(0) && reinterpret_cast<volatile std::atomic<char>*>(chunk->data)->load(std::memory_order_relaxed) == 0)
evictChunk(chunk, lock);
evictChunk(chunk);
}

if (inject_eviction && chunk->key.has_value() && rng() % 10 == 0)
{
/// Simulate eviction of the chunk or some of its pages.
if (rng() % 2 == 0)
evictChunk(chunk, lock);
evictChunk(chunk);
else
for (size_t i = 0; i < 20; ++i)
chunk->pages_populated.unset(rng() % (chunk->size / chunk->page_size));
Expand All @@ -353,7 +357,7 @@ PinnedPageChunk PageCache::getOrSet(PageCacheKey key, bool detached_if_missing,
}

{
std::unique_lock chunk_lock(chunk->chunk_mutex);
std::lock_guard chunk_lock(chunk->chunk_mutex);

if (chunk->pages_state == PageChunkState::Limbo)
{
Expand Down Expand Up @@ -383,7 +387,7 @@ void PageCache::removeRef(PageChunk * chunk) noexcept
return;

{
std::unique_lock lock(global_mutex);
std::lock_guard lock(global_mutex);

prev_pin_count = chunk->pin_count.fetch_sub(1);
if (prev_pin_count > 1)
Expand All @@ -398,7 +402,7 @@ void PageCache::removeRef(PageChunk * chunk) noexcept
}

{
std::unique_lock chunk_lock(chunk->chunk_mutex);
std::lock_guard chunk_lock(chunk->chunk_mutex);

/// Need to be extra careful here because we unlocked global_mutex above, so other
/// getOrSet()/removeRef() calls could have happened during this brief period.
Expand All @@ -421,7 +425,7 @@ static void logUnexpectedSyscallError(std::string name)
#endif
}

void PageCache::sendChunkToLimbo(PageChunk * chunk [[maybe_unused]], std::unique_lock<std::mutex> & /* chunk_mutex */) const noexcept
void PageCache::sendChunkToLimbo(PageChunk * chunk [[maybe_unused]], std::lock_guard<std::mutex> & /* chunk_mutex */) const noexcept
{
#ifdef MADV_FREE // if we're not on a very old version of Linux
chassert(chunk->size == bytes_per_page * pages_per_chunk);
Expand Down Expand Up @@ -454,7 +458,7 @@ void PageCache::sendChunkToLimbo(PageChunk * chunk [[maybe_unused]], std::unique
#endif
}

std::pair<size_t, size_t> PageCache::restoreChunkFromLimbo(PageChunk * chunk, std::unique_lock<std::mutex> & /* chunk_mutex */) const noexcept
std::pair<size_t, size_t> PageCache::restoreChunkFromLimbo(PageChunk * chunk, std::lock_guard<std::mutex> & /* chunk_mutex */) const noexcept
{
static_assert(sizeof(std::atomic<char>) == 1, "char is not atomic?");
// Make sure our strategic memory reads/writes are not reordered or optimized out.
Expand Down Expand Up @@ -505,10 +509,10 @@ std::pair<size_t, size_t> PageCache::restoreChunkFromLimbo(PageChunk * chunk, st
return {pages_restored, pages_evicted};
}

PageChunk * PageCache::getFreeChunk(std::unique_lock<std::mutex> & lock /* global_mutex */)
PageChunk * PageCache::getFreeChunk()
{
if (lru.empty() || (mmaps.size() < max_mmaps && lru.front().key.has_value()))
addMmap(lock);
addMmap();
if (lru.empty())
throw Exception(ErrorCodes::MEMORY_LIMIT_EXCEEDED, "All chunks in the entire page cache ({:.3} GiB) are pinned.",
bytes_per_page * pages_per_chunk * total_chunks * 1. / (1l << 30));
Expand All @@ -519,12 +523,12 @@ PageChunk * PageCache::getFreeChunk(std::unique_lock<std::mutex> & lock /* globa
size_t prev_pin_count = chunk->pin_count.fetch_add(1);
chassert(prev_pin_count == 0);

evictChunk(chunk, lock);
evictChunk(chunk);

return chunk;
}

void PageCache::evictChunk(PageChunk * chunk, std::unique_lock<std::mutex> & /* global_mutex */)
void PageCache::evictChunk(PageChunk * chunk)
{
if (chunk->key.has_value())
{
Expand All @@ -548,7 +552,7 @@ void PageCache::evictChunk(PageChunk * chunk, std::unique_lock<std::mutex> & /*
chunk->pages_populated.unsetAll();
}

void PageCache::addMmap(std::unique_lock<std::mutex> & /* global_mutex */)
void PageCache::addMmap()
{
/// ASLR by hand.
void * address_hint = reinterpret_cast<void *>(std::uniform_int_distribution<size_t>(0x100000000000UL, 0x700000000000UL)(rng));
Expand All @@ -564,13 +568,13 @@ void PageCache::addMmap(std::unique_lock<std::mutex> & /* global_mutex */)

void PageCache::dropCache()
{
std::unique_lock lock(global_mutex);
std::lock_guard lock(global_mutex);

/// Detach and free unpinned chunks.
bool logged_error = false;
for (PageChunk & chunk : lru)
{
evictChunk(&chunk, lock);
evictChunk(&chunk);

if (use_madv_free)
{
Expand Down

0 comments on commit e2741a6

Please sign in to comment.