Skip to content

Commit

Permalink
add template for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
s-kat committed May 1, 2023
1 parent 9b936fd commit 624d846
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/Storages/S3Queue/S3QueueSource.cpp
Expand Up @@ -259,7 +259,7 @@ StorageS3QueueSource::StorageS3QueueSource(
, zookeeper(current_zookeeper)
, zookeeper_path(zookeeper_path_)
, create_reader_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1)
, create_reader_scheduler(threadPoolCallbackRunner<ReaderHolder>(create_reader_pool, "CreateS3QueueReader"))
, create_reader_scheduler(threadPoolCallbackRunner<ReaderHolder>(create_reader_pool, "CreateS3QReader"))
{
reader = createReader();
if (reader)
Expand Down
Empty file.
11 changes: 11 additions & 0 deletions tests/integration/test_storage_s3_queue/configs/defaultS3.xml
@@ -0,0 +1,11 @@
<clickhouse>
<s3>
<s3_mock>
<endpoint>http://resolver:8080</endpoint>
<header>Authorization: Bearer TOKEN</header>
</s3_mock>
<s3_mock_restricted_directory>
<endpoint>http://resolver:8080/root-with-auth/restricteddirectory/</endpoint>
</s3_mock_restricted_directory>
</s3>
</clickhouse>
@@ -0,0 +1,43 @@
<clickhouse>
<named_collections>
<s3_conf1>
<url>http://minio1:9001/root/test_table</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_conf1>
<s3_parquet>
<url>http://minio1:9001/root/test_parquet</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet>
<s3_parquet_gz>
<url>http://minio1:9001/root/test_parquet_gz</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet_gz>
<s3_orc>
<url>http://minio1:9001/root/test_orc</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_orc>
<s3_native>
<url>http://minio1:9001/root/test_native</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_native>
<s3_arrow>
<url>http://minio1:9001/root/test.arrow</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_arrow>
<s3_parquet2>
<url>http://minio1:9001/root/test.parquet</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet2>
<s3_json_no_sign>
<url>http://minio1:9001/root/test_cache4.jsonl</url>
<no_sign_request>true</no_sign_request>
</s3_json_no_sign>
</named_collections>
</clickhouse>
7 changes: 7 additions & 0 deletions tests/integration/test_storage_s3_queue/configs/users.xml
@@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<stream_like_engine_allow_direct_select>1</stream_like_engine_allow_direct_select>
</default>
</profiles>
</clickhouse>
123 changes: 123 additions & 0 deletions tests/integration/test_storage_s3_queue/test.py
@@ -0,0 +1,123 @@
import gzip
import json
import logging
import os
import io
import random
import threading
import time

import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.network import PartitionManager
from helpers.mock_servers import start_mock_servers
from helpers.test_tools import exec_query_with_retry
from helpers.s3_tools import prepare_s3_bucket

cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
)


MINIO_INTERNAL_PORT = 9001

SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))


def put_s3_file_content(started_cluster, bucket, filename, data):
buf = io.BytesIO(data)
started_cluster.minio_client.put_object(bucket, filename, buf, len(data))


# Returns content of given S3 file as string.
def get_s3_file_content(started_cluster, bucket, filename, decode=True):
# type: (ClickHouseCluster, str, str, bool) -> str

data = started_cluster.minio_client.get_object(bucket, filename)
data_str = b""
for chunk in data.stream():
data_str += chunk
if decode:
return data_str.decode()
return data_str


@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"instance",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
main_configs=[
"configs/defaultS3.xml",
"configs/named_collections.xml"
],
)

logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")

prepare_s3_bucket(cluster)
# logging.info("S3 bucket created")
# run_s3_mocks(cluster)

yield cluster
finally:
cluster.shutdown()


def run_query(instance, query, stdin=None, settings=None):
# type: (ClickHouseInstance, str, object, dict) -> str

logging.info("Running query '{}'...".format(query))
result = instance.query(query, stdin=stdin, settings=settings)
logging.info("Query finished")

return result


def test_get_file(started_cluster):
auth = "'minio','minio123',"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = [
[12549, 2463, 19893],
[64021, 38652, 66703],
[81611, 39650, 83516],
[11079, 59507, 61546],
[51764, 69952, 6876],
[41165, 90293, 29095],
[40167, 78432, 48309],
[81629, 81327, 11855],
[55852, 21643, 98507],
[6738, 54643, 41155],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
filename = f"test.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)

instance.query(
f"create table test ({table_format}) engine=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV') SETTINGS mode = 'unordered', keeper_path = '/clickhouse/testing'"
)

get_query = f"SELECT * FROM test"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == values

get_query = f"SELECT * FROM test"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []

0 comments on commit 624d846

Please sign in to comment.