diff --git a/go/client.go b/go/client.go index 448244a..6a4c107 100644 --- a/go/client.go +++ b/go/client.go @@ -102,6 +102,16 @@ func (client *Client) Execute(sql string) ([]arrow.Record, error) { return client.doGet(flightInfo.GetEndpoint()[0].GetTicket()) } +// Executes the sql on Datalayers and returns the affected rows. +// The supported sqls are Insert and Delete. Note, the development for supporting Delete is in progress. +func (client *Client) ExecuteUpdate(sql string) (int64, error) { + affectedRows, err := client.inner.ExecuteUpdate(client.ctx, sql) + if err != nil { + return 0, fmt.Errorf("failed to execute a sql: %v", err) + } + return affectedRows, nil +} + // Creates a prepared statement. func (client *Client) Prepare(sql string) (*flightsql.PreparedStatement, error) { return client.inner.Prepare(client.ctx, sql) @@ -119,6 +129,11 @@ func (client *Client) ExecutePrepared(preparedStmt *flightsql.PreparedStatement, return client.doGet(flightInfo.GetEndpoint()[0].GetTicket()) } +// Closes the prepared statement. +func (client *Client) ClosePrepared(preparedStmt *flightsql.PreparedStatement) error { + return preparedStmt.Close(client.ctx) +} + // Calls the `DoGet` method of the FlightSQL client. func (client *Client) doGet(ticket *flight.Ticket) ([]arrow.Record, error) { reader, err := client.inner.DoGet(client.ctx, ticket) diff --git a/go/main.go b/go/main.go index ec185c5..91def0a 100644 --- a/go/main.go +++ b/go/main.go @@ -147,4 +147,40 @@ func main() { // 2024-09-01 10:05:00 +0800 CST 2 15.30 1 // 2024-09-02 10:05:00 +0800 CST 2 15.30 1 PrintRecords(result) + + // Closes the prepared statement to notify releasing resources on server side. + if err = client.ClosePrepared(preparedStmt); err != nil { + fmt.Println("Failed to close a prepared statement: ", err) + return + } + + // There provides a dedicated interface `execute_update` for executing DMLs, including Insert, Delete. + // This interface directly returns the affected rows which might be convenient for some use cases. + // + // Note, Datalayers does not support Update and the development for Delete is in progress. + sql = ` + INSERT INTO go.demo (ts, sid, value, flag) VALUES + ('2024-09-03T10:00:00+08:00', 1, 4.5, 0), + ('2024-09-03T10:05:00+08:00', 2, 11.6, 1);` + affectedRows, err := client.ExecuteUpdate(sql) + if err != nil { + fmt.Println("Failed to insert data: ", err) + return + } + // The output should be: + // Affected rows: 2 + fmt.Println("Affected rows: ", affectedRows) + + // Checks that the data are inserted successfully. + sql = "SELECT * FROM go.demo where ts >= '2024-09-03T10:00:00+08:00'" + result, err = client.Execute(sql) + if err != nil { + fmt.Println("Failed to scan data: ", err) + return + } + // The result should be: + // ts sid value flag + // 2024-09-03 10:00:00 +0800 CST 1 4.50 0 + // 2024-09-03 10:05:00 +0800 CST 2 11.60 1 + PrintRecords(result) } diff --git a/python/client.py b/python/client.py index 3410533..f9f287a 100644 --- a/python/client.py +++ b/python/client.py @@ -102,6 +102,15 @@ def execute(self, sql: str) -> pandas.DataFrame: df = reader.read_pandas() return df + def execute_update(self, sql: str) -> int: + """ + Executes the sql on Datalayers and returns the affected rows. + This method is meant to be used for executing DMLs, including Insert and Delete. + Note, Datalayers does not support Update and the development of Delete is in progress. + """ + + return self.inner.execute_update(sql, None) + def prepare(self, sql: str) -> PreparedStatement: """ Creates a prepared statement. @@ -122,6 +131,15 @@ def execute_prepared( df = reader.read_pandas() return df + def close_prepared(self, prepared_stmt: PreparedStatement): + """ + Closes the prepared statement. + Note, generally you should not call this method explicitly. + Use with clause to manage the life cycle of a prepared statement instead. + """ + + prepared_stmt.close() + def close(self): """ Closes the inner Arrow FlightSQL client. diff --git a/python/main.py b/python/main.py index c84a23c..6cf3cfe 100644 --- a/python/main.py +++ b/python/main.py @@ -105,6 +105,29 @@ def main(): # 1 2024-09-02 10:05:00+08:00 2 15.3 1 print(result) + # There provides a dedicated interface `execute_update` for executing DMLs, including Insert, Delete. + # This interface directly returns the affected rows which might be convenient for some use cases. + # + # Note, Datalayers does not support Update and the development for Delete is in progress. + sql = """ + INSERT INTO python.demo (ts, sid, value, flag) VALUES + ('2024-09-03T10:00:00+08:00', 1, 4.5, 0), + ('2024-09-03T10:05:00+08:00', 2, 11.6, 1); + """ + affected_rows = client.execute_update(sql) + # The output should be: + # Affected rows: 2 + print("Affected rows: {}".format(affected_rows)) + + # Checks that the data are inserted successfully. + sql = "SELECT * FROM python.demo where ts >= '2024-09-03T10:00:00+08:00'" + result = client.execute(sql) + # The result should be: + # ts sid value flag + # 0 2024-09-03 10:00:00+08:00 1 4.5 0 + # 1 2024-09-03 10:05:00+08:00 2 11.6 1 + print(result) + if __name__ == "__main__": main() diff --git a/rust/bin/main.rs b/rust/bin/main.rs index 3550645..0c6e168 100644 --- a/rust/bin/main.rs +++ b/rust/bin/main.rs @@ -117,5 +117,34 @@ async fn main() -> Result<()> { // +---------------------------+-----+-------+------+ print_batches(&result); + // Closes the prepared statement to notify releasing resources on server side. + client.close_prepared(prepared_stmt).await?; + + // There provides a dedicated interface `execute_update` for executing DMLs, including Insert, Delete. + // This interface directly returns the affected rows which might be convenient for some use cases. + // + // Note, Datalayers does not support Update and the development for Delete is in progress. + sql = r#" + INSERT INTO rust.demo (ts, sid, value, flag) VALUES + ('2024-09-03T10:00:00+08:00', 1, 4.5, 0), + ('2024-09-03T10:05:00+08:00', 2, 11.6, 1); + "#; + let affected_rows = client.execute_update(sql).await?; + // The output should be: + // Affected rows: 2 + println!("Affected rows: {}", affected_rows); + + // Checks that the data are inserted successfully. + sql = "SELECT * FROM rust.demo where ts >= '2024-09-03T10:00:00+08:00'"; + result = client.execute(sql).await?; + // The result should be: + // +---------------------------+-----+-------+------+ + // | ts | sid | value | flag | + // +---------------------------+-----+-------+------+ + // | 2024-09-03T10:00:00+08:00 | 1 | 4.5 | 0 | + // | 2024-09-03T10:05:00+08:00 | 2 | 11.6 | 1 | + // +---------------------------+-----+-------+------+ + print_batches(&result); + Ok(()) } diff --git a/rust/src/client.rs b/rust/src/client.rs index ff4e948..13b853e 100644 --- a/rust/src/client.rs +++ b/rust/src/client.rs @@ -64,7 +64,7 @@ impl Client { .handshake(&config.username, &config.password) .await .inspect_err(|e| { - println!("Failed to do handshake: {}", filter_message(&e.to_string())); + println!("{}", filter_message(&e.to_string())); exit(1) }); @@ -83,10 +83,7 @@ impl Client { .execute(sql.to_string(), None) .await .inspect_err(|e| { - println!( - "Failed to execute a sql: {}", - filter_message(&e.to_string()) - ); + println!("{}", filter_message(&e.to_string())); exit(1) })?; let ticket = flight_info @@ -100,16 +97,25 @@ impl Client { Ok(batches) } + pub async fn execute_update(&mut self, sql: &str) -> Result { + let affected_rows = self + .inner + .execute_update(sql.to_string(), None) + .await + .inspect_err(|e| { + println!("{}", filter_message(&e.to_string())); + exit(1) + })?; + Ok(affected_rows) + } + pub async fn prepare(&mut self, sql: &str) -> Result> { let prepared_stmt = self .inner .prepare(sql.to_string(), None) .await .inspect_err(|e| { - println!( - "Failed to execute a sql: {}", - filter_message(&e.to_string()) - ); + println!("{}", filter_message(&e.to_string())); exit(1) })?; Ok(prepared_stmt) @@ -124,10 +130,7 @@ impl Client { .set_parameters(binding) .context("Failed to bind a record batch to the prepared statement")?; let flight_info = prepared_stmt.execute().await.inspect_err(|e| { - println!( - "Failed to execute the prepared statement: {}", - filter_message(&e.to_string()) - ); + println!("{}", filter_message(&e.to_string())); exit(1) })?; let ticket = flight_info @@ -141,19 +144,20 @@ impl Client { Ok(batches) } + pub async fn close_prepared(&self, prepared_stmt: PreparedStatement) -> Result<()> { + prepared_stmt + .close() + .await + .context("Failed to close a prepared statement") + } + async fn do_get(&mut self, ticket: Ticket) -> Result> { let stream = self.inner.do_get(ticket).await.inspect_err(|e| { - println!( - "Failed to perform do_get: {}", - filter_message(&e.to_string()) - ); + println!("{}", filter_message(&e.to_string())); exit(1) })?; let batches = stream.try_collect::>().await.inspect_err(|e| { - println!( - "Failed to consume flight record batch stream: {}", - filter_message(&e.to_string()) - ); + println!("{}", filter_message(&e.to_string())); exit(1) })?; if batches.is_empty() {