Skip to content

Commit

Permalink
CSV uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
calherries committed May 10, 2024
1 parent c7d051c commit 81cc54a
Show file tree
Hide file tree
Showing 2 changed files with 353 additions and 1 deletion.
77 changes: 76 additions & 1 deletion src/metabase/driver/clickhouse.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"Driver for ClickHouse databases"
#_{:clj-kondo/ignore [:unsorted-required-namespaces]}
(:require [clojure.string :as str]
[honey.sql :as sql]
[metabase [config :as config]]
[metabase.driver :as driver]
[metabase.driver.clickhouse-introspection]
Expand All @@ -12,7 +13,11 @@
[metabase.driver.sql-jdbc [common :as sql-jdbc.common]
[connection :as sql-jdbc.conn]]
[metabase.driver.sql-jdbc.execute :as sql-jdbc.execute]
[metabase.driver.sql.query-processor :as sql.qp]
[metabase.driver.sql.util :as sql.u]
[metabase.query-processor.writeback :as qp.writeback]
[metabase.test.data.sql :as sql.tx]
[metabase.upload :as upload]
[metabase.util.log :as log]))

(set! *warn-on-reflection* true)
Expand All @@ -32,7 +37,9 @@
:test/jvm-timezone-setting false
:connection-impersonation false
:schemas true
:datetime-diff true}]
:uploads true
:datetime-diff true
:upload-with-auto-pk false}]

(defmethod driver/database-supports? [:clickhouse feature] [_driver _feature _db] supported?))

Expand Down Expand Up @@ -112,6 +119,74 @@
:semantic-version {:major (.getInt rset 2)
:minor (.getInt rset 3)}})))))

(defmethod driver/upload-type->database-type :clickhouse
[_driver upload-type]
(case upload-type
::upload/varchar-255 "Nullable(String)"
::upload/text "Nullable(String)"
::upload/int "Nullable(Int64)"
::upload/float "Nullable(Float64)"
::upload/boolean "Nullable(Boolean)"
::upload/date "Nullable(Date32)"
::upload/datetime "Nullable(DateTime64(3))"
;; FIXME: should be `Nullable(DateTime64(3))`
::upload/offset-datetime nil))

(defmethod driver/table-name-length-limit :clickhouse
[_driver]
;; FIXME: This is a lie because you're really limited by a filesystems' limits, because Clickhouse uses
;; filenames as table/column names. But its an approximation
206)

(defn- quote-name [s]
(let [parts (str/split (name s) #"\.")]
(str/join "." (map #(str "`" % "`") parts))))

(defn- create-table!-sql
[driver table-name column-definitions & {:keys [primary-key]}]
(str/join "\n"
[(first (sql/format {:create-table (keyword table-name)
:with-columns (mapv (fn [[name type-spec]]
(vec (cons name [[:raw type-spec]])))
column-definitions)}
:quoted true
:dialect (sql.qp/quote-style driver)))
"ENGINE = MergeTree"
(format "PRIMARY KEY (%s)" (str/join ", " (map quote-name primary-key)))
"ORDER BY ()"]))

(defmethod driver/create-table! :clickhouse
[driver db-id table-name column-definitions & {:keys [primary-key]}]
(let [sql (create-table!-sql driver table-name column-definitions :primary-key primary-key)]
(qp.writeback/execute-write-sql! db-id sql)))

(defmethod driver/insert-into! :clickhouse
[driver db-id table-name column-names values]
(when (seq values)
(sql-jdbc.execute/do-with-connection-with-options
driver
db-id
{:write? true}
(fn [^java.sql.Connection conn]
(let [sql (format "insert into %s (%s)" (quote-name table-name) (str/join ", " (map quote-name column-names)))]
(with-open [ps (.prepareStatement conn sql)]
(doseq [row values]
(when (seq row)
(doseq [[idx v] (map-indexed (fn [x y] [(inc x) y]) row)]
(condp isa? (type v)
java.lang.String (.setString ps idx v)
java.lang.Boolean (.setBoolean ps idx v)
java.lang.Long (.setLong ps idx v)
java.lang.Double (.setFloat ps idx v)
java.math.BigInteger (.setObject ps idx v)
java.time.LocalDate (.setObject ps idx v)
java.time.LocalDateTime (.setObject ps idx v)
(.setString ps idx v)))
(.addBatch ps)))
(doall (.executeBatch ps))))))))

(defmethod sql.tx/session-schema :clickhouse [_] "default")

;;; ------------------------------------------ User Impersonation ------------------------------------------

(defmethod driver.sql/set-role-statement :clickhouse
Expand Down
277 changes: 277 additions & 0 deletions src/metabase/driver/scratch.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
(ns metabase.driver.scratch
(:require [clojure.java.io :as io]
[clojure.java.jdbc :as jdbc]
[metabase [config :as config]]
[metabase.driver.clickhouse-introspection]
[metabase.driver.clickhouse-nippy]
[metabase.driver.clickhouse-qp]
[metabase.driver.sql-jdbc [common :as sql-jdbc.common]
[connection :as sql-jdbc.conn]]
[metabase.driver.sql-jdbc.execute :as sql-jdbc.execute]
[metabase.query-processor.writeback :as qp.writeback]
[metabase.test :as mt])
(:import [java.io ByteArrayInputStream]
[java.nio.charset StandardCharsets]
[com.clickhouse.data.value ClickHouseArrayValue]
[com.clickhouse.data ClickHouseFormat ClickHouseFile ClickHouseCompression ClickHouseDataStreamFactory
ClickHousePipedOutputStream ClickHouseInputStream]
[com.clickhouse.client ClickHouseNode ClickHouseProtocol ClickHouseCredentials ClickHouseClient]))

(def server
(.. (ClickHouseNode/builder)
(host "127.0.0.1")
(port ClickHouseProtocol/HTTP)
(database "default")
(credentials (ClickHouseCredentials/fromUserAndPassword "default" ""))
(build)))

(def file
(ClickHouseFile/of "/Users/callumherries/meta/mb49/t.csv" ClickHouseCompression/NONE ClickHouseFormat/CSV))

(def client
(ClickHouseClient/newInstance (into-array ClickHouseProtocol [(.getProtocol server)])))

(.. client
(write server)
(set "input_format_csv_skip_first_lines" "1")
(set "format_csv_delimiter" ",")
(table "t")
(data file)
(executeAndWait))

(defn- new-input-stream [content]
(ByteArrayInputStream. (.getBytes content StandardCharsets/US_ASCII)))

(let [input-stream (new-input-stream "1,1\n2,3")]
(.. client
(write server)
(set "input_format_csv_skip_first_lines" "1")
(set "format_csv_delimiter" ",")
(format ClickHouseFormat/CSV)
(table "t")
(data input-stream)
(executeAndWait)))

(q "SELECT * FROM t")
(e! "DELETE FROM t WHERE 1=1")

(e! "CREATE TABLE t (one Int32, two Int32) ENGINE = MergeTree ORDER BY ()")
(e! "DROP TABLE IF EXISTS t")

(comment
(defmethod driver/create-table! :clickhouse
[driver db-id table-name column-definitions & {:keys [primary-key]}]
(let [sql (create-table!-sql driver table-name column-definitions :primary-key primary-key)]
(qp.writeback/execute-write-sql! db-id sql)))

(def db-id 6)

(def table-name "t")

(defn e! [e]
(jdbc/with-db-transaction [conn (sql-jdbc.conn/db->pooled-connection-spec db-id)]
(jdbc/execute! conn e)))

(defn q [e]
(jdbc/with-db-transaction [conn (sql-jdbc.conn/db->pooled-connection-spec db-id)]
(jdbc/query conn e)))

(e! "CREATE TABLE t (one Int32, two Int32) ENGINE = MergeTree ORDER BY ()")
(e! "INSERT INTO t FROM INFILE 't.csv' FORMAT CSV")
(q "SELECT * FROM t")
)

;; ClickHouseNode server = ClickHouseNode.builder()
;; .host( "127.0.0.1" )
;; .port( ClickHouseProtocol.HTTP )
;; .database( "mydatabase" )
;; .credentials( ClickHouseCredentials
;; .fromUserAndPassword( "username", "password" ) )
;; .build();

;; // Create a CSV file reference

;; ClickHouseFile file = ClickHouseFile.of(
;; "/path/to/file.csv", ClickHouseCompression.NONE, ClickHouseFormat.CSV );

;; // Create a client

;; ClickHouseClient client = ClickHouseClient.newInstance( server.getProtocol() );

;; // Specify settings, load data to the specified table and wait for completion

;; client.write( server )
;; .set( "input_format_csv_skip_first_lines", "1" )
;; .set( "format_csv_delimiter", "," )
;; .table( "mytable" )
;; .data( file )
;; .executeAndWait();


;; https://github.com/ClickHouse/clickhouse-java/blob/6a4ffc4ae0b76a6f65a57ee0db3246d45fbc78c1/examples/jdbc/src/main/java/com/clickhouse/examples/jdbc/Advanced.java#L37
;; private static ByteArrayInputStream newInputStream (String content)
;; {return new ByteArrayInputStream (content.getBytes (StandardCharsets.US_ASCII));
;; }

;; static String exteralTables(String url) throws SQLException {
;; String sql = "select a.name as n1, b.name as n2 from {tt 'table1'} a inner join {tt 'table2'} b on a.id=b.id";
;; try (Connection conn = getConnection(url); PreparedStatement ps = conn.prepareStatement(sql)) {
;; ps.setObject(1,
;; ClickHouseExternalTable.builder().name("table1").columns("id Int32, name Nullable(String)")
;; .format(ClickHouseFormat.CSV)
;; .content(newInputStream("1,a\n2,b")).build());
;; ps.setObject(2,
;; ClickHouseExternalTable.builder().name("table2").columns("id Int32, name String")
;; .format(ClickHouseFormat.JSONEachRow)
;; .content(newInputStream("{\"id\":3,\"name\":\"c\"}\n{\"id\":1,\"name\":\"d\"}")).build());
;; try (ResultSet rs = ps.executeQuery()) {
;; if (!rs.next()) {
;; throw new IllegalStateException("Should have at least one record");
;; }

;; // n1=a, n2=d
;; return String.format("n1=%s, n2=%s", rs.getString(1), rs.getString(2));
;; }
;; }
;; }

;; static long insert(ClickHouseNode server, String table) throws ClickHouseException {
;; try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) {
;; ClickHouseRequest.Mutation request = client.read(server).write().table(table)
;; .format(ClickHouseFormat.RowBinary);
;; ClickHouseConfig config = request.getConfig();
;; CompletableFuture<ClickHouseResponse> future;
;; // back-pressuring is not supported, you can adjust the first two arguments
;; try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
;; .createPipedOutputStream(config, (Runnable) null)) {
;; // in async mode, which is default, execution happens in a worker thread
;; future = request.data(stream.getInputStream()).execute();

;; // writing happens in main thread
;; for (int i = 0; i < 10_000; i++) {
;; BinaryStreamUtils.writeString(stream, String.valueOf(i % 16));
;; BinaryStreamUtils.writeNonNull(stream);
;; BinaryStreamUtils.writeString(stream, UUID.randomUUID().toString());
;; }
;; }

;; // response should be always closed
;; try (ClickHouseResponse response = future.get()) {
;; ClickHouseResponseSummary summary = response.getSummary();
;; return summary.getWrittenRows();
;; }
;; } catch (InterruptedException e) {
;; Thread.currentThread().interrupt();
;; throw ClickHouseException.forCancellation(e, server);
;; } catch (ExecutionException | IOException e) {
;; throw ClickHouseException.of(e, server);
;; }
;; }


(defn create-file [file-path content]
(with-open [wrtr (io/writer file-path)]
(.write wrtr content)))

(create-file "f.csv" "1,one\n2,two")

(mt/db)

(def spec (sql-jdbc.conn/connection-details->spec :clickhouse (:details (mt/db))))

(sql-jdbc.execute/do-with-connection-with-options
:clickhouse spec nil
(fn [^java.sql.Connection conn]
(jdbc/execute! {:connection conn} "SELECT 1")))

(let [props (doto (java.util.Properties.)
(.setProperty "localFile" "true"))
f (io/file "f.csv")
spec (sql-jdbc.conn/connection-details->spec :clickhouse (:details (mt/db)))]
#_(when (.exists f)
(io/delete-file f))

(qp.writeback/execute-write-sql! (mt/id) (str "drop table if exists test_load_infile_with_params;"))
(qp.writeback/execute-write-sql! (mt/id) (str "create table test_load_infile_with_params(n Int32, s String) engine=MergeTree ORDER BY ()"))
;; (qp.writeback/execute-write-sql! (mt/id) "insert into test_load_infile_with_params from infile 'f.csv' FORMAT CSV")
(with-open [conn (.getConnection (metabase.driver.sql-jdbc.execute/do-with-resolved-connection-data-source :clickhouse spec {}))
stmt (.prepareStatement conn "insert into test_load_infile_with_params from infile ? format CSV")]
(.setString stmt 1 (.getName f))
(.addBatch stmt)
(.setString stmt 1 (.getName f))
(.addBatch stmt)
(.setString stmt 1 (str (.getName f) "!"))
(.addBatch stmt)
(.executeBatch stmt))
#_(sql-jdbc.execute/do-with-connection-with-options
:clickhouse spec nil
(fn [^java.sql.Connection conn]
(with-open [stmt (.prepareStatement conn "insert into test_load_infile_with_params from infile ? format CSV")]
(.setString stmt 1 (.getName f))
(.addBatch stmt)
(.setString stmt 1 (.getName f))
(.addBatch stmt)
(.setString stmt 1 (str (.getName f) "!"))
(.addBatch stmt)
(.executeBatch stmt)))))

;; Use non-csv formatted data
;; // 'format RowBinary' is the hint to use streaming mode, you may use different
;; // format like JSONEachRow as needed
;; sql = String.format("insert into %s format RowBinary", TABLE_NAME);
;; try (PreparedStatement ps = conn.prepareStatement(sql)) {
;; // it's streaming so there's only one parameter(could be one of String, byte[],
;; // InputStream, File, ClickHouseWriter), and you don't have to process batch by
;; // batch
;; ps.setObject(1, new ClickHouseWriter() {
;; @Override
;; public void write(ClickHouseOutputStream output) throws IOException {
;; // this will be executed in a separate thread
;; for (int i = 0; i < 1_000_000; i++) {
;; output.writeUnicodeString("a-" + i);
;; output.writeBoolean(false); // non-null
;; output.writeUnicodeString("b-" + i);
;; }
;; }
;; });
;; ps.executeUpdate();
;; }
;; ByteArrayInputStream
;; try (InputStream inputStream = Files.newInputStream(file);
;; ClickHouseStatement statement = clickHouseConnection.createStatement()) {
;; statement.write()
;; .table(tableName))
;; .option("format_csv_delimiter", ";")
;; .data(inputStream, ClickHouseFormat.CSV)
;; .send();
;; }



;;;; EITHER THIS

;; // 4. fastest(close to Java client) but requires manual serialization and it's
;; // NOT portable(as it's limited to ClickHouse)
;; // 'format RowBinary' is the hint to use streaming mode, you may use different
;; // format like JSONEachRow as needed
;; sql = String.format("insert into %s format RowBinary", TABLE_NAME);
;; try (PreparedStatement ps = conn.prepareStatement(sql)) {
;; // it's streaming so there's only one parameter(could be one of String, byte[],
;; // InputStream, File, ClickHouseWriter), and you don't have to process batch by
;; // batch
;; ps.setObject(1, new ClickHouseWriter() {
;; @Override
;; public void write(ClickHouseOutputStream output) throws IOException {
;; // this will be executed in a separate thread
;; for (int i = 0; i < 1_000_000; i++) {
;; output.writeUnicodeString("a-" + i);
;; output.writeBoolean(false); // non-null
;; output.writeUnicodeString("b-" + i);
;; }
;; }
;; });
;; ps.executeUpdate();
;; }

;; return count;
;; }

0 comments on commit 81cc54a

Please sign in to comment.