Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6023292
Upgrade Java Client to V2 syncQuery & syncInsert
mzitnik Sep 30, 2025
e5e84d6
Refactor to use the new client v2 api
mzitnik Oct 5, 2025
42b8130
Add timeout to query operation
mzitnik Oct 5, 2025
04a0f20
Clean NodeClient
mzitnik Oct 5, 2025
7424653
Change binary reader
mzitnik Oct 26, 2025
69c8157
Update client version
mzitnik Oct 26, 2025
9550fc9
Fix project to use snapshots
mzitnik Oct 27, 2025
595ee08
Merge branch 'main' into update-java-client-version
mzitnik Oct 27, 2025
54222aa
merge with main
mzitnik Oct 27, 2025
5a388f2
run spotlessScalaApply and implement readAllBytes since java 8 does n…
mzitnik Oct 27, 2025
fba180d
Remove unneeded remarks
mzitnik Oct 30, 2025
c138afb
Chanage to client version 0.9.3
mzitnik Nov 2, 2025
6677a1b
Update socket timeout in new client
mzitnik Nov 2, 2025
2c63484
Change max connections to 20
mzitnik Nov 2, 2025
5e4fe1e
ConnectTimeout to 1200000
mzitnik Nov 2, 2025
79a2e56
Add 3 sec to sleep
mzitnik Nov 2, 2025
4aebe3c
Setting a new setConnectionRequestTimeout for experiment
mzitnik Nov 2, 2025
3d24e80
spotlessScalaApply fix
mzitnik Nov 2, 2025
26385b6
Fix/json reader fixedstring v2 (#448)
ShimonSte Nov 6, 2025
92dc13e
Added reader and writer tests (#449)
ShimonSte Nov 9, 2025
f1727c7
Fix BinaryReader to handle new Java client types
Nov 9, 2025
bf844d2
Add high-precision decimal tests with tolerance
Nov 9, 2025
b3face6
Merge fix/binary-reader-java-client-types into update-java-client-ver…
Nov 9, 2025
8beb2bc
Simplify build-and-test workflow trigger to run on all pushes
ShimonSte Nov 9, 2025
2687b1f
Fix Scala 2.13 compatibility for nested arrays
ShimonSte Nov 10, 2025
18b4fcb
Update java version to 0.9.4
mzitnik Nov 13, 2025
1e056f0
Enable compression
mzitnik Nov 13, 2025
d35b624
add logging TPCDSClusterSuite & change client buffers
mzitnik Nov 15, 2025
5dfbb77
Change InputStream read code
mzitnik Nov 15, 2025
fe3548f
Remove hard coded settings for experiments
mzitnik Nov 15, 2025
724785f
Clean log from insert method
mzitnik Nov 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,7 @@
#

name: "Build and Test"
on:
push:
branches:
- "branch-*"
- "main"
pull_request:
branches:
- "branch-*"
- "main"
workflow_dispatch:
on: [push]

jobs:
run-tests:
Expand Down
27 changes: 27 additions & 0 deletions .github/workflows/cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ on:
jobs:
run-tests-with-clickhouse-cloud:
runs-on: ubuntu-22.04
# Only run on main repository where secrets are available
if: github.repository == 'ClickHouse/spark-clickhouse-connector'
strategy:
max-parallel: 1
fail-fast: false
Expand All @@ -44,6 +46,31 @@ jobs:
distribution: zulu
java-version: 8
cache: gradle
- name: Wake up ClickHouse Cloud instance
env:
CLICKHOUSE_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}
run: |
echo "Waking up ClickHouse Cloud instance..."
max_attempts=3
attempt=1

while [ $attempt -le $max_attempts ]; do
echo "Attempt $attempt of $max_attempts"
if curl -sS "https://${CLICKHOUSE_CLOUD_HOST}:8443/?query=SELECT+1" \
--user "default:${CLICKHOUSE_PASSWORD}" \
--max-time 60 > /dev/null; then
echo "Instance is awake!"
break
else
if [ $attempt -eq $max_attempts ]; then
echo "Failed to wake instance after $max_attempts attempts"
exit 1
fi
echo "Retrying in 10 seconds..."
sleep 10
((attempt++))
fi
done
- run: >-
./gradlew clean cloudTest --no-daemon --refresh-dependencies
-Dspark_binary_version=${{ matrix.spark }}
Expand Down
14 changes: 12 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,16 @@ allprojects {
version = getProjectVersion()

repositories {
maven { url = "$mavenCentralMirror" }
maven {
url = "$mavenCentralMirror"
}

maven {
url = "$mavenSnapshotsRepo"
mavenContent {
snapshotsOnly()
}
}
}
}

Expand Down Expand Up @@ -218,7 +227,7 @@ project(':clickhouse-core') {
api "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jackson_version"
api "com.fasterxml.jackson.module:jackson-module-scala_$scala_binary_version:$jackson_version"

api("com.clickhouse:clickhouse-jdbc:$clickhouse_jdbc_version:all") { transitive = false }
api("com.clickhouse:client-v2:${clickhouse_client_v2_version}:all") { transitive = false }

compileOnly "jakarta.annotation:jakarta.annotation-api:$jakarta_annotation_api_version"

Expand All @@ -239,6 +248,7 @@ project(":clickhouse-core-it") {
testImplementation(testFixtures(project(":clickhouse-core")))

testImplementation("com.clickhouse:clickhouse-jdbc:$clickhouse_jdbc_version:all") { transitive = false }

testImplementation "org.slf4j:slf4j-log4j12:$slf4j_version"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@

package com.clickhouse.spark.client

import com.clickhouse.spark.Logging
import com.clickhouse.client._
import com.clickhouse.client.config.ClickHouseClientOption
import com.clickhouse.data.{ClickHouseCompression, ClickHouseFormat}
import com.clickhouse.client.api.{Client, ServerException}
import com.clickhouse.client.api.enums.Protocol
import com.clickhouse.client.api.insert.{InsertResponse, InsertSettings}
import com.clickhouse.client.api.query.{QueryResponse, QuerySettings}
import com.clickhouse.data.ClickHouseFormat
import com.clickhouse.shaded.org.apache.commons.io.IOUtils
import com.clickhouse.spark.Logging

import java.util.concurrent.TimeUnit
import com.clickhouse.spark.exception.{CHClientException, CHException, CHServerException}
import com.clickhouse.spark.format.{
JSONCompactEachRowWithNamesAndTypesSimpleOutput,
Expand All @@ -30,7 +36,8 @@ import com.clickhouse.spark.spec.NodeSpec
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode

import java.io.InputStream
import java.io.{ByteArrayInputStream, InputStream}
import java.time.temporal.ChronoUnit
import java.util.UUID
import scala.util.{Failure, Success, Try}

Expand All @@ -40,7 +47,7 @@ object NodeClient {

class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
// TODO: add configurable timeout
private val timeout: Int = 30000
private val timeout: Int = 60000

private lazy val userAgent: String = {
val title = getClass.getPackage.getImplementationTitle
Expand Down Expand Up @@ -78,24 +85,26 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
private def shouldInferRuntime(): Boolean =
nodeSpec.infer_runtime_env.equalsIgnoreCase("true") || nodeSpec.infer_runtime_env == "1"

private val node: ClickHouseNode = ClickHouseNode.builder()
.options(nodeSpec.options)
.host(nodeSpec.host)
.port(nodeSpec.protocol, nodeSpec.port)
.database(nodeSpec.database)
.credentials(ClickHouseCredentials.fromUserAndPassword(nodeSpec.username, nodeSpec.password))
.build()
private def createClickHouseURL(nodeSpec: NodeSpec): String = {
val ssl: Boolean = nodeSpec.options.getOrDefault("ssl", "false").toBoolean
if (ssl) {
s"https://${nodeSpec.host}:${nodeSpec.port}"
} else {
s"http://${nodeSpec.host}:${nodeSpec.port}"
}
}

private val client: ClickHouseClient = ClickHouseClient.builder()
.option(ClickHouseClientOption.FORMAT, ClickHouseFormat.RowBinary)
.option(
ClickHouseClientOption.PRODUCT_NAME,
userAgent
)
.nodeSelector(ClickHouseNodeSelector.of(node.getProtocol))
private val client = new Client.Builder()
.setUsername(nodeSpec.username)
.setPassword(nodeSpec.password)
.setDefaultDatabase(nodeSpec.database)
.setOptions(nodeSpec.options)
.setClientName(userAgent)
.addEndpoint(createClickHouseURL(nodeSpec))
.build()

override def close(): Unit = client.close()
override def close(): Unit =
client.close()

private def nextQueryId(): String = UUID.randomUUID.toString

Expand All @@ -119,15 +128,13 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
database: String,
table: String,
inputFormat: String,
inputCompressionType: ClickHouseCompression = ClickHouseCompression.NONE,
data: InputStream,
settings: Map[String, String] = Map.empty
): Either[CHException, SimpleOutput[ObjectNode]] =
syncInsert(
database,
table,
inputFormat,
inputCompressionType,
data,
"JSONEachRow",
JSONEachRowSimpleOutput.deserialize,
Expand All @@ -149,24 +156,32 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
database: String,
table: String,
inputFormat: String,
inputCompressionType: ClickHouseCompression,
data: InputStream,
outputFormat: String,
deserializer: InputStream => SimpleOutput[OUT],
settings: Map[String, String]
): Either[CHException, SimpleOutput[OUT]] = {
def readAllBytes(inputStream: InputStream): Array[Byte] =
IOUtils.toByteArray(inputStream)
val queryId = nextQueryId()
val sql = s"INSERT INTO `$database`.`$table` FORMAT $inputFormat"
onExecuteQuery(queryId, sql)
val req = client.write(node)
.query(sql, queryId)
.decompressClientRequest(inputCompressionType)
.format(ClickHouseFormat.valueOf(outputFormat))
settings.foreach { case (k, v) => req.set(k, v) }
Try(req.data(data).executeAndWait()) match {
case Success(resp) => Right(deserializer(resp.getInputStream))
case Failure(ex: ClickHouseException) =>
Left(CHServerException(ex.getErrorCode, ex.getMessage, Some(nodeSpec), Some(ex)))
val insertSettings: InsertSettings = new InsertSettings();
settings.foreach { case (k, v) => insertSettings.setOption(k, v) }
insertSettings.setDatabase(database)
// TODO: check what type of compression is supported by the client v2
insertSettings.compressClientRequest(true)
val payload: Array[Byte] = readAllBytes(data)
val is: InputStream = new ByteArrayInputStream("".getBytes())
Try(client.insert(
table,
new ByteArrayInputStream(payload),
ClickHouseFormat.valueOf(inputFormat),
insertSettings
).get()) match {
case Success(resp: InsertResponse) => Right(deserializer(is))
case Failure(se: ServerException) =>
Left(CHServerException(se.getCode, se.getMessage, Some(nodeSpec), Some(se)))
case Failure(ex) => Left(CHClientException(ex.getMessage, Some(nodeSpec), Some(ex)))
}
}
Expand All @@ -179,16 +194,15 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
): Either[CHException, SimpleOutput[OUT]] = {
val queryId = nextQueryId()
onExecuteQuery(queryId, sql)
val req = client.read(node)
.query(sql, queryId).asInstanceOf[ClickHouseRequest[_]]
.format(ClickHouseFormat.valueOf(outputFormat)).asInstanceOf[ClickHouseRequest[_]]
.option(ClickHouseClientOption.CONNECTION_TIMEOUT, timeout).asInstanceOf[ClickHouseRequest[_]]
settings.foreach { case (k, v) => req.set(k, v).asInstanceOf[ClickHouseRequest[_]] }
Try(req.executeAndWait()) match {
case Success(resp) => Right(deserializer(resp.getInputStream))
case Failure(ex: ClickHouseException) =>
Left(CHServerException(ex.getErrorCode, ex.getMessage, Some(nodeSpec), Some(ex)))
case Failure(ex) => Left(CHClientException(ex.getMessage, Some(nodeSpec), Some(ex)))
val querySettings: QuerySettings = new QuerySettings()
val clickHouseFormat = ClickHouseFormat.valueOf(outputFormat)
querySettings.setFormat(clickHouseFormat)
querySettings.setQueryId(queryId)
settings.foreach { case (k, v) => querySettings.setOption(k, v) }
Try(client.query(sql, querySettings).get(timeout, TimeUnit.MILLISECONDS)) match {
case Success(response: QueryResponse) => Right(deserializer(response.getInputStream))
case Failure(se: ServerException) => Left(CHServerException(se.getCode, se.getMessage, Some(nodeSpec), Some(se)))
case Failure(ex: Exception) => Left(CHClientException(ex.getMessage, Some(nodeSpec), Some(ex)))
}
}

Expand All @@ -203,28 +217,26 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
}

// //////////////////////////////////////////////////////////////////////////////
// ///////////////////////// ret ClickHouseResponse /////////////////////////////
// ///////////////////////// ret QueryResponse /////////////////////////////
// //////////////////////////////////////////////////////////////////////////////

def queryAndCheck(
sql: String,
outputFormat: String,
outputCompressionType: ClickHouseCompression,
settings: Map[String, String] = Map.empty
): ClickHouseResponse = {
): QueryResponse = {
val queryId = nextQueryId()
onExecuteQuery(queryId, sql)
val req = client.read(node)
.query(sql, queryId).asInstanceOf[ClickHouseRequest[_]]
.compressServerResponse(outputCompressionType).asInstanceOf[ClickHouseRequest[_]]
.format(ClickHouseFormat.valueOf(outputFormat)).asInstanceOf[ClickHouseRequest[_]]
.option(ClickHouseClientOption.CONNECTION_TIMEOUT, timeout).asInstanceOf[ClickHouseRequest[_]]
settings.foreach { case (k, v) => req.set(k, v).asInstanceOf[ClickHouseRequest[_]] }
Try(req.executeAndWait()) match {
case Success(resp) => resp
case Failure(ex: ClickHouseException) =>
throw CHServerException(ex.getErrorCode, ex.getMessage, Some(nodeSpec), Some(ex))
case Failure(ex) => throw CHClientException(ex.getMessage, Some(nodeSpec), Some(ex))
val querySettings: QuerySettings = new QuerySettings()
val clickHouseFormat = ClickHouseFormat.valueOf(outputFormat)
querySettings.setFormat(clickHouseFormat)
querySettings.setQueryId(queryId)
settings.foreach { case (k, v) => querySettings.setOption(k, v) }

Try(client.query(sql, querySettings).get(timeout, TimeUnit.MILLISECONDS)) match {
case Success(response: QueryResponse) => response
case Failure(se: ServerException) => throw CHServerException(se.getCode, se.getMessage, Some(nodeSpec), Some(se))
case Failure(ex: Exception) => throw CHClientException(ex.getMessage, Some(nodeSpec), Some(ex))
}
}

Expand All @@ -238,5 +250,5 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
|""".stripMargin
)
def ping(timeout: Int = timeout) =
client.ping(node, timeout)
client.ping(timeout)
}
5 changes: 3 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#

mavenCentralMirror=https://repo1.maven.org/maven2/
mavenSnapshotsRepo=https://s01.oss.sonatype.org/content/repositories/snapshots/
mavenSnapshotsRepo=https://central.sonatype.com/repository/maven-snapshots/
mavenReleasesRepo=https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/

systemProp.scala_binary_version=2.12
Expand All @@ -23,7 +23,8 @@ systemProp.known_spark_binary_versions=3.3,3.4,3.5

group=com.clickhouse.spark

clickhouse_jdbc_version=0.6.3
clickhouse_jdbc_version=0.9.4
clickhouse_client_v2_version=0.9.4

spark_33_version=3.3.4
spark_34_version=3.4.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class TPCDSClusterSuite extends SparkClickHouseClusterTest {
spark.sql("CREATE DATABASE tpcds_sf1_cluster WITH DBPROPERTIES (cluster = 'single_replica')")

TPCDSTestUtils.tablePrimaryKeys.foreach { case (table, primaryKeys) =>
println(s"before table ${table} ${primaryKeys}")
val start: Long = System.currentTimeMillis()
spark.sql(
s"""
|CREATE TABLE tpcds_sf1_cluster.$table
Expand All @@ -51,6 +53,7 @@ class TPCDSClusterSuite extends SparkClickHouseClusterTest {
|SELECT * FROM tpcds.sf1.$table;
|""".stripMargin
)
println(s"time took table ${table} ${System.currentTimeMillis() - start}")
}

TPCDSTestUtils.tablePrimaryKeys.keys.foreach { table =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.clickhouse.single

import com.clickhouse.spark.base.ClickHouseSingleMixIn
import org.apache.spark.SparkConf

class ClickHouseSingleArrowWriterSuite extends ClickHouseArrowWriterSuite with ClickHouseSingleMixIn

abstract class ClickHouseArrowWriterSuite extends ClickHouseWriterTestBase {

override protected def sparkConf: SparkConf = super.sparkConf
.set("spark.clickhouse.write.format", "arrow")
.set("spark.clickhouse.read.format", "json")

}
Loading