Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions snuba/cli/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@
@click.option(
"--clickhouse-port", type=int, help="Clickhouse native port to write to.",
)
@click.option(
"--clickhouse-secure",
type=bool,
default=False,
help="If true, an encrypted connection will be used",
)
@click.option(
"--clickhouse-ca-certs",
type=str,
default=None,
help="An optional path to certificates directory.",
)
@click.option(
"--dry-run",
type=bool,
Expand All @@ -33,6 +45,8 @@ def cleanup(
*,
clickhouse_host: Optional[str],
clickhouse_port: Optional[int],
clickhouse_secure: bool,
clickhouse_ca_certs: Optional[str],
dry_run: bool,
storage_name: str,
log_level: Optional[str] = None,
Expand Down Expand Up @@ -60,6 +74,8 @@ def cleanup(
clickhouse_user,
clickhouse_password,
database,
clickhouse_secure,
clickhouse_ca_certs,
)
elif not cluster.is_single_node():
raise click.ClickException("Provide ClickHouse host and port for cleanup")
Expand Down
16 changes: 16 additions & 0 deletions snuba/cli/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,26 @@ def reverse(
required=True,
default=os.environ.get("CLICKHOUSE_DATABASE", "default"),
)
@click.option(
"--secure",
type=bool,
default=False,
help="If true, an encrypted connection will be used",
)
@click.option(
"--ca-certs",
type=str,
default=None,
help="An optional path to certificates directory.",
)
def add_node(
node_type: str,
storage_set_names: Sequence[str],
host_name: str,
port: int,
database: str,
secure: bool,
ca_certs: Optional[str],
) -> None:
"""
Runs all migrations on a brand new ClickHouse node. This should be performed
Expand Down Expand Up @@ -239,4 +253,6 @@ def add_node(
user=user,
password=password,
database=database,
secure=secure,
ca_certs=ca_certs,
)
16 changes: 16 additions & 0 deletions snuba/cli/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@
@click.option(
"--clickhouse-port", type=int, help="Clickhouse native port to write to.",
)
@click.option(
"--clickhouse-secure",
type=bool,
default=False,
help="If true, an encrypted connection will be used",
)
@click.option(
"--clickhouse-ca-certs",
type=str,
default=None,
help="An optional path to certificates directory.",
)
@click.option(
"--storage",
"storage_name",
Expand All @@ -28,6 +40,8 @@ def optimize(
*,
clickhouse_host: Optional[str],
clickhouse_port: Optional[int],
clickhouse_secure: bool,
clickhouse_ca_certs: Optional[str],
storage_name: str,
log_level: Optional[str] = None,
) -> None:
Expand Down Expand Up @@ -61,6 +75,8 @@ def optimize(
clickhouse_user,
clickhouse_password,
database,
clickhouse_secure,
clickhouse_ca_certs,
send_receive_timeout=ClickhouseClientSettings.OPTIMIZE.value.timeout,
)
elif not storage.get_cluster().is_single_node():
Expand Down
9 changes: 7 additions & 2 deletions snuba/clickhouse/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from urllib.parse import urlencode

import rapidjson
from urllib3.connectionpool import HTTPConnectionPool
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
from urllib3.exceptions import HTTPError

from snuba import settings
Expand Down Expand Up @@ -204,14 +204,19 @@ def __init__(
port: int,
user: str,
password: str,
secure: bool,
ca_certs: Optional[str],
metrics: MetricsBackend, # deprecated
statement: InsertStatement,
encoding: Optional[str],
options: Optional[Mapping[str, Any]] = None,
chunk_size: Optional[int] = None,
buffer_size: int = 0,
):
self.__pool = HTTPConnectionPool(host, port)
if secure:
self.__pool = HTTPSConnectionPool(host, port, ca_certs=ca_certs)
else:
self.__pool = HTTPConnectionPool(host, port)
self.__executor = ThreadPoolExecutor()

self.__options = options if options is not None else {}
Expand Down
6 changes: 6 additions & 0 deletions snuba/clickhouse/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def __init__(
user: str,
password: str,
database: str,
secure: bool = False,
ca_certs: Optional[str] = None,
connect_timeout: int = 1,
send_receive_timeout: Optional[int] = 300,
max_pool_size: int = settings.CLICKHOUSE_MAX_POOL_SIZE,
Expand All @@ -37,6 +39,8 @@ def __init__(
self.user = user
self.password = password
self.database = database
self.secure = secure
self.ca_certs = ca_certs
self.connect_timeout = connect_timeout
self.send_receive_timeout = send_receive_timeout
self.client_settings = client_settings
Expand Down Expand Up @@ -177,6 +181,8 @@ def _create_conn(self) -> Client:
user=self.user,
password=self.password,
database=self.database,
secure=self.secure,
ca_certs=self.ca_certs,
connect_timeout=self.connect_timeout,
send_receive_timeout=self.send_receive_timeout,
settings=self.client_settings,
Expand Down
10 changes: 10 additions & 0 deletions snuba/clusters/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ def __init__(
password: str,
database: str,
http_port: int,
secure: bool,
ca_certs: Optional[str],
storage_sets: Set[str],
single_node: bool,
# The cluster name and distributed cluster name only apply if single_node is set to False
Expand All @@ -164,6 +166,8 @@ def __init__(
self.__password = password
self.__database = database
self.__http_port = http_port
self.__secure = secure
self.__ca_certs = ca_certs
self.__single_node = single_node
self.__cluster_name = cluster_name
self.__distributed_cluster_name = distributed_cluster_name
Expand Down Expand Up @@ -207,6 +211,8 @@ def get_node_connection(
self.__user,
self.__password,
self.__database,
self.__secure,
self.__ca_certs,
client_settings=settings,
send_receive_timeout=timeout,
)
Expand Down Expand Up @@ -234,6 +240,8 @@ def get_batch_writer(
port=self.__http_port,
user=self.__user,
password=self.__password,
secure=self.__secure,
ca_certs=self.__ca_certs,
metrics=metrics,
statement=insert_statement.with_database(self.__database),
encoding=encoding,
Expand Down Expand Up @@ -290,6 +298,8 @@ def __get_cluster_nodes(self, cluster_name: str) -> Sequence[ClickhouseNode]:
password=cluster.get("password", ""),
database=cluster.get("database", "default"),
http_port=cluster["http_port"],
secure=cluster["secure"],
ca_certs=cluster["ca_certs"],
storage_sets=cluster["storage_sets"],
single_node=cluster["single_node"],
cluster_name=cluster["cluster_name"] if "cluster_name" in cluster else None,
Expand Down
4 changes: 4 additions & 0 deletions snuba/migrations/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,8 @@ def add_node(
user: str,
password: str,
database: str,
secure: bool = False,
ca_certs: Optional[str] = None,
) -> None:
client_settings = ClickhouseClientSettings.MIGRATE.value
clickhouse = ClickhousePool(
Expand All @@ -386,6 +388,8 @@ def add_node(
user,
password,
database,
secure,
ca_certs,
client_settings=client_settings.settings,
send_receive_timeout=client_settings.timeout,
)
Expand Down
2 changes: 2 additions & 0 deletions snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
"password": os.environ.get("CLICKHOUSE_PASSWORD", ""),
"database": os.environ.get("CLICKHOUSE_DATABASE", "default"),
"http_port": int(os.environ.get("CLICKHOUSE_HTTP_PORT", 8123)),
"secure": os.environ.get("CLICKHOUSE_SECURE", "0") == "1",
"ca_certs": os.environ.get("CLICKHOUSE_CA_CERTS"),
"storage_sets": {
"discover",
"events",
Expand Down
2 changes: 2 additions & 0 deletions snuba/settings/settings_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"password": os.environ.get("CLICKHOUSE_PASSWORD", ""),
"database": os.environ.get("CLICKHOUSE_DATABASE", "default"),
"http_port": int(os.environ.get("CLICKHOUSE_HTTP_PORT", 8123)),
"secure": os.environ.get("CLICKHOUSE_SECURE", "0") == "1",
"ca_certs": os.environ.get("CLICKHOUSE_CA_CERTS"),
"storage_sets": {
"discover",
"events",
Expand Down
6 changes: 5 additions & 1 deletion tests/clusters/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
"password": "",
"database": "default",
"http_port": 8123,
"secure": False,
"ca_certs": None,
"storage_sets": ALL_STORAGE_SETS,
"single_node": True,
},
Expand All @@ -68,6 +70,8 @@
"password": "",
"database": "default",
"http_port": 8123,
"secure": False,
"ca_certs": None,
"storage_sets": {"transactions"},
"single_node": False,
"cluster_name": "clickhouse_hosts",
Expand Down Expand Up @@ -138,7 +142,7 @@ def test_get_local_nodes() -> None:

def test_cache_connections() -> None:
cluster_1 = cluster.ClickhouseCluster(
"localhost", 8000, "default", "", "default", 8001, {"events"}, True
"localhost", 8000, "default", "", "default", 8001, False, None, {"events"}, True
)

assert cluster_1.get_query_connection(
Expand Down
4 changes: 4 additions & 0 deletions tests/migrations/test_table_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
password="",
database="default",
http_port=8123,
secure=False,
ca_certs=None,
storage_sets={"events"},
single_node=True,
)
Expand All @@ -22,6 +24,8 @@
password="",
database="default",
http_port=8123,
secure=False,
ca_certs=None,
storage_sets={"events"},
single_node=False,
cluster_name="cluster_1",
Expand Down
26 changes: 24 additions & 2 deletions tests/test_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,18 @@ def do_query(
return QueryResult({}, {})

strategy = SimpleQueryPlanExecutionStrategy(
ClickhouseCluster("localhost", 1024, "default", "", "default", 80, set(), True),
ClickhouseCluster(
"localhost",
1024,
"default",
"",
"default",
80,
False,
None,
set(),
True
),
[],
[
ColumnSplitQueryStrategy(
Expand Down Expand Up @@ -140,7 +151,18 @@ def do_query(
)

strategy = SimpleQueryPlanExecutionStrategy(
ClickhouseCluster("localhost", 1024, "default", "", "default", 80, set(), True),
ClickhouseCluster(
"localhost",
1024,
"default",
"",
"default",
80,
False,
None,
set(),
True
),
[],
[
ColumnSplitQueryStrategy(id_column, project_column, timestamp_column),
Expand Down