From 7d04568998876e9d2c833686a38eef1a8819f26c Mon Sep 17 00:00:00 2001 From: Shicong Date: Fri, 11 Oct 2024 15:28:37 +0800 Subject: [PATCH 1/5] feat: add execute_update example for rust --- rust/bin/main.rs | 29 +++++++++++++++++++++++++++++ rust/src/client.rs | 46 +++++++++++++++++++++++++--------------------- 2 files changed, 54 insertions(+), 21 deletions(-) diff --git a/rust/bin/main.rs b/rust/bin/main.rs index 3550645..0c6e168 100644 --- a/rust/bin/main.rs +++ b/rust/bin/main.rs @@ -117,5 +117,34 @@ async fn main() -> Result<()> { // +---------------------------+-----+-------+------+ print_batches(&result); + // Closes the prepared statement to notify releasing resources on server side. + client.close_prepared(prepared_stmt).await?; + + // There provides a dedicated interface `execute_update` for executing DMLs, including Insert, Delete. + // This interface directly returns the affected rows which might be convenient for some use cases. + // + // Note, Datalayers does not support Update and the development for Delete is in progress. + sql = r#" + INSERT INTO rust.demo (ts, sid, value, flag) VALUES + ('2024-09-03T10:00:00+08:00', 1, 4.5, 0), + ('2024-09-03T10:05:00+08:00', 2, 11.6, 1); + "#; + let affected_rows = client.execute_update(sql).await?; + // The output should be: + // Affected rows: 2 + println!("Affected rows: {}", affected_rows); + + // Checks that the data are inserted successfully. + sql = "SELECT * FROM rust.demo where ts >= '2024-09-03T10:00:00+08:00'"; + result = client.execute(sql).await?; + // The result should be: + // +---------------------------+-----+-------+------+ + // | ts | sid | value | flag | + // +---------------------------+-----+-------+------+ + // | 2024-09-03T10:00:00+08:00 | 1 | 4.5 | 0 | + // | 2024-09-03T10:05:00+08:00 | 2 | 11.6 | 1 | + // +---------------------------+-----+-------+------+ + print_batches(&result); + Ok(()) } diff --git a/rust/src/client.rs b/rust/src/client.rs index ff4e948..13b853e 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -64,7 +64,7 @@ impl Client { .handshake(&config.username, &config.password) .await .inspect_err(|e| { - println!("Failed to do handshake: {}", filter_message(&e.to_string())); + println!("{}", filter_message(&e.to_string())); exit(1) }); @@ -83,10 +83,7 @@ impl Client { .execute(sql.to_string(), None) .await .inspect_err(|e| { - println!( - "Failed to execute a sql: {}", - filter_message(&e.to_string()) - ); + println!("{}", filter_message(&e.to_string())); exit(1) })?; let ticket = flight_info @@ -100,16 +97,25 @@ impl Client { Ok(batches) } + pub async fn execute_update(&mut self, sql: &str) -> Result { + let affected_rows = self + .inner + .execute_update(sql.to_string(), None) + .await + .inspect_err(|e| { + println!("{}", filter_message(&e.to_string())); + exit(1) + })?; + Ok(affected_rows) + } + pub async fn prepare(&mut self, sql: &str) -> Result> { let prepared_stmt = self .inner .prepare(sql.to_string(), None) .await .inspect_err(|e| { - println!( - "Failed to execute a sql: {}", - filter_message(&e.to_string()) - ); + println!("{}", filter_message(&e.to_string())); exit(1) })?; Ok(prepared_stmt) @@ -124,10 +130,7 @@ impl Client { .set_parameters(binding) .context("Failed to bind a record batch to the prepared statement")?; let flight_info = prepared_stmt.execute().await.inspect_err(|e| { - println!( - "Failed to execute the prepared statement: {}", - filter_message(&e.to_string()) - ); + println!("{}", filter_message(&e.to_string())); exit(1) })?; let ticket = flight_info @@ -141,19 +144,20 @@ impl Client { Ok(batches) } + pub async fn close_prepared(&self, prepared_stmt: PreparedStatement) -> Result<()> { + prepared_stmt + .close() + .await + .context("Failed to close a prepared statement") + } + async fn do_get(&mut self, ticket: Ticket) -> Result> { let stream = self.inner.do_get(ticket).await.inspect_err(|e| { - println!( - "Failed to perform do_get: {}", - filter_message(&e.to_string()) - ); + println!("{}", filter_message(&e.to_string())); exit(1) })?; let batches = stream.try_collect::>().await.inspect_err(|e| { - println!( - "Failed to consume flight record batch stream: {}", - filter_message(&e.to_string()) - ); + println!("{}", filter_message(&e.to_string())); exit(1) })?; if batches.is_empty() { From 70a8ea40973c5cdeb8144e1ead3aedd60dda76fc Mon Sep 17 00:00:00 2001 From: Shicong Date: Fri, 11 Oct 2024 16:16:49 +0800 Subject: [PATCH 2/5] feat: add execute_update example for python --- python/client.py | 18 ++++++++++++++++++ python/main.py | 26 ++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/python/client.py b/python/client.py index 3410533..f9f287a 100644 --- a/python/client.py +++ b/python/client.py @@ -102,6 +102,15 @@ def execute(self, sql: str) -> pandas.DataFrame: df = reader.read_pandas() return df + def execute_update(self, sql: str) -> int: + """ + Executes the sql on Datalayers and returns the affected rows. + This method is meant to be used for executing DMLs, including Insert and Delete. + Note, Datalayers does not support Update and the development of Delete is in progress. + """ + + return self.inner.execute_update(sql, None) + def prepare(self, sql: str) -> PreparedStatement: """ Creates a prepared statement. @@ -122,6 +131,15 @@ def execute_prepared( df = reader.read_pandas() return df + def close_prepared(self, prepared_stmt: PreparedStatement): + """ + Closes the prepared statement. + Note, generally you should not call this method explicitly. + Use with clause to manage the life cycle of a prepared statement instead. + """ + + prepared_stmt.close() + def close(self): """ Closes the inner Arrow FlightSQL client. diff --git a/python/main.py b/python/main.py index c84a23c..2e34b70 100644 --- a/python/main.py +++ b/python/main.py @@ -105,6 +105,32 @@ def main(): # 1 2024-09-02 10:05:00+08:00 2 15.3 1 print(result) + # There provides a dedicated interface `execute_update` for executing DMLs, including Insert, Delete. + # This interface directly returns the affected rows which might be convenient for some use cases. + # + # Note, Datalayers does not support Update and the development for Delete is in progress. + sql = """ + INSERT INTO python.demo (ts, sid, value, flag) VALUES + ('2024-09-03T10:00:00+08:00', 1, 4.5, 0), + ('2024-09-03T10:05:00+08:00', 2, 11.6, 1); + """ + #! It's expected that the affected rows is 2. + #! However, the flightsql-dbapi library does not implement the `execute_update` correctly + #! and the returned affected rows is always 0. + affected_rows = client.execute_update(sql) + # The output should be: + # Affected rows: 2 + # print("Affected rows: {}".format(affected_rows)) + + # Checks that the data are inserted successfully. + sql = "SELECT * FROM python.demo where ts >= '2024-09-03T10:00:00+08:00'" + result = client.execute(sql) + # The result should be: + # ts sid value flag + # 0 2024-09-03 10:00:00+08:00 1 4.5 0 + # 1 2024-09-03 10:05:00+08:00 2 11.6 1 + print(result) + if __name__ == "__main__": main() From c6eb8166e08d208f3070f218638f71ca7e862870 Mon Sep 17 00:00:00 2001 From: Shicong Date: Fri, 11 Oct 2024 16:52:11 +0800 Subject: [PATCH 3/5] feat: add execute_update example for go --- go/client.go | 15 +++++++++++++++ go/main.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/go/client.go b/go/client.go index 448244a..6a4c107 100644 --- a/go/client.go +++ b/go/client.go @@ -102,6 +102,16 @@ func (client *Client) Execute(sql string) ([]arrow.Record, error) { return client.doGet(flightInfo.GetEndpoint()[0].GetTicket()) } +// Executes the sql on Datalayers and returns the affected rows. +// The supported sqls are Insert and Delete. Note, the development for supporting Delete is in progress. +func (client *Client) ExecuteUpdate(sql string) (int64, error) { + affectedRows, err := client.inner.ExecuteUpdate(client.ctx, sql) + if err != nil { + return 0, fmt.Errorf("failed to execute a sql: %v", err) + } + return affectedRows, nil +} + // Creates a prepared statement. func (client *Client) Prepare(sql string) (*flightsql.PreparedStatement, error) { return client.inner.Prepare(client.ctx, sql) @@ -119,6 +129,11 @@ func (client *Client) ExecutePrepared(preparedStmt *flightsql.PreparedStatement, return client.doGet(flightInfo.GetEndpoint()[0].GetTicket()) } +// Closes the prepared statement. +func (client *Client) ClosePrepared(preparedStmt *flightsql.PreparedStatement) error { + return preparedStmt.Close(client.ctx) +} + // Calls the `DoGet` method of the FlightSQL client. func (client *Client) doGet(ticket *flight.Ticket) ([]arrow.Record, error) { reader, err := client.inner.DoGet(client.ctx, ticket) diff --git a/go/main.go b/go/main.go index ec185c5..91def0a 100644 --- a/go/main.go +++ b/go/main.go @@ -147,4 +147,40 @@ func main() { // 2024-09-01 10:05:00 +0800 CST 2 15.30 1 // 2024-09-02 10:05:00 +0800 CST 2 15.30 1 PrintRecords(result) + + // Closes the prepared statement to notify releasing resources on server side. + if err = client.ClosePrepared(preparedStmt); err != nil { + fmt.Println("Failed to close a prepared statement: ", err) + return + } + + // There provides a dedicated interface `execute_update` for executing DMLs, including Insert, Delete. + // This interface directly returns the affected rows which might be convenient for some use cases. + // + // Note, Datalayers does not support Update and the development for Delete is in progress. + sql = ` + INSERT INTO go.demo (ts, sid, value, flag) VALUES + ('2024-09-03T10:00:00+08:00', 1, 4.5, 0), + ('2024-09-03T10:05:00+08:00', 2, 11.6, 1);` + affectedRows, err := client.ExecuteUpdate(sql) + if err != nil { + fmt.Println("Failed to insert data: ", err) + return + } + // The output should be: + // Affected rows: 2 + fmt.Println("Affected rows: ", affectedRows) + + // Checks that the data are inserted successfully. + sql = "SELECT * FROM go.demo where ts >= '2024-09-03T10:00:00+08:00'" + result, err = client.Execute(sql) + if err != nil { + fmt.Println("Failed to scan data: ", err) + return + } + // The result should be: + // ts sid value flag + // 2024-09-03 10:00:00 +0800 CST 1 4.50 0 + // 2024-09-03 10:05:00 +0800 CST 2 11.60 1 + PrintRecords(result) } From d4a0cea9ee8e2efd2f5310290d4e9b55dc82a54c Mon Sep 17 00:00:00 2001 From: Shicong Date: Fri, 11 Oct 2024 16:58:11 +0800 Subject: [PATCH 4/5] chore: add warn for execute_update impl of go and python --- go/main.go | 7 +++++-- python/main.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/go/main.go b/go/main.go index 91def0a..51d5ede 100644 --- a/go/main.go +++ b/go/main.go @@ -162,14 +162,17 @@ func main() { INSERT INTO go.demo (ts, sid, value, flag) VALUES ('2024-09-03T10:00:00+08:00', 1, 4.5, 0), ('2024-09-03T10:05:00+08:00', 2, 11.6, 1);` - affectedRows, err := client.ExecuteUpdate(sql) + _, err = client.ExecuteUpdate(sql) if err != nil { fmt.Println("Failed to insert data: ", err) return } + // It's expected that the affected rows is 2. + // However, the Go implementation of Arrow Flight SQL client might have some flaws and the affected rows is always 0. + // // The output should be: // Affected rows: 2 - fmt.Println("Affected rows: ", affectedRows) + // fmt.Println("Affected rows: ", affectedRows) // Checks that the data are inserted successfully. sql = "SELECT * FROM go.demo where ts >= '2024-09-03T10:00:00+08:00'" diff --git a/python/main.py b/python/main.py index 2e34b70..b9159e1 100644 --- a/python/main.py +++ b/python/main.py @@ -115,7 +115,7 @@ def main(): ('2024-09-03T10:05:00+08:00', 2, 11.6, 1); """ #! It's expected that the affected rows is 2. - #! However, the flightsql-dbapi library does not implement the `execute_update` correctly + #! However, the flightsql-dbapi library seems does not implement the `execute_update` correctly #! and the returned affected rows is always 0. affected_rows = client.execute_update(sql) # The output should be: From 560b7989213352abb1ac7113cc606edd3dab3925 Mon Sep 17 00:00:00 2001 From: Shicong Date: Mon, 17 Feb 2025 16:12:58 +0800 Subject: [PATCH 5/5] remove warn on execute update --- go/main.go | 7 ++----- python/main.py | 5 +---- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/go/main.go b/go/main.go index 51d5ede..91def0a 100644 --- a/go/main.go +++ b/go/main.go @@ -162,17 +162,14 @@ func main() { INSERT INTO go.demo (ts, sid, value, flag) VALUES ('2024-09-03T10:00:00+08:00', 1, 4.5, 0), ('2024-09-03T10:05:00+08:00', 2, 11.6, 1);` - _, err = client.ExecuteUpdate(sql) + affectedRows, err := client.ExecuteUpdate(sql) if err != nil { fmt.Println("Failed to insert data: ", err) return } - // It's expected that the affected rows is 2. - // However, the Go implementation of Arrow Flight SQL client might have some flaws and the affected rows is always 0. - // // The output should be: // Affected rows: 2 - // fmt.Println("Affected rows: ", affectedRows) + fmt.Println("Affected rows: ", affectedRows) // Checks that the data are inserted successfully. sql = "SELECT * FROM go.demo where ts >= '2024-09-03T10:00:00+08:00'" diff --git a/python/main.py b/python/main.py index b9159e1..6cf3cfe 100644 --- a/python/main.py +++ b/python/main.py @@ -114,13 +114,10 @@ def main(): ('2024-09-03T10:00:00+08:00', 1, 4.5, 0), ('2024-09-03T10:05:00+08:00', 2, 11.6, 1); """ - #! It's expected that the affected rows is 2. - #! However, the flightsql-dbapi library seems does not implement the `execute_update` correctly - #! and the returned affected rows is always 0. affected_rows = client.execute_update(sql) # The output should be: # Affected rows: 2 - # print("Affected rows: {}".format(affected_rows)) + print("Affected rows: {}".format(affected_rows)) # Checks that the data are inserted successfully. sql = "SELECT * FROM python.demo where ts >= '2024-09-03T10:00:00+08:00'"