|
28 | 28 | import com.clickhouse.client.ClickHouseException;
|
29 | 29 | import com.clickhouse.client.ClickHouseRequest;
|
30 | 30 | import com.clickhouse.client.ClickHouseResponse;
|
| 31 | +import lombok.extern.slf4j.Slf4j; |
31 | 32 |
|
32 | 33 | import java.io.IOException;
|
33 | 34 | import java.util.ArrayList;
|
34 | 35 | import java.util.HashMap;
|
35 | 36 | import java.util.List;
|
36 | 37 | import java.util.Map;
|
37 | 38 |
|
| 39 | +@Slf4j |
38 | 40 | public class ClickhouseFileSinkAggCommitter
|
39 | 41 | implements SinkAggregatedCommitter<CKFileCommitInfo, CKFileAggCommitInfo> {
|
40 | 42 |
|
@@ -117,14 +119,15 @@ private void attachFileToClickhouse(Shard shard, List<String> clickhouseLocalFil
|
117 | 119 | throws ClickHouseException {
|
118 | 120 | ClickHouseRequest<?> request = getProxy().getClickhouseConnection(shard);
|
119 | 121 | for (String clickhouseLocalFile : clickhouseLocalFiles) {
|
120 |
| - ClickHouseResponse response = |
121 |
| - request.query( |
122 |
| - String.format( |
123 |
| - "ALTER TABLE %s ATTACH PART '%s'", |
124 |
| - clickhouseTable.getLocalTableName(), |
125 |
| - clickhouseLocalFile.substring( |
126 |
| - clickhouseLocalFile.lastIndexOf("/") + 1))) |
127 |
| - .executeAndWait(); |
| 122 | + String attachSql = |
| 123 | + String.format( |
| 124 | + "ALTER TABLE %s ATTACH PART '%s'", |
| 125 | + clickhouseTable.getLocalTableName(), |
| 126 | + clickhouseLocalFile.substring( |
| 127 | + clickhouseLocalFile.lastIndexOf("/") + 1)); |
| 128 | + |
| 129 | + log.info("Attach file to clickhouse table: {}", attachSql); |
| 130 | + ClickHouseResponse response = request.query(attachSql).executeAndWait(); |
128 | 131 | response.close();
|
129 | 132 | }
|
130 | 133 | }
|
|
0 commit comments