Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions go/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,16 @@ func (client *Client) Execute(sql string) ([]arrow.Record, error) {
return client.doGet(flightInfo.GetEndpoint()[0].GetTicket())
}

// Executes the sql on Datalayers and returns the affected rows.
// The supported sqls are Insert and Delete. Note, the development for supporting Delete is in progress.
func (client *Client) ExecuteUpdate(sql string) (int64, error) {
affectedRows, err := client.inner.ExecuteUpdate(client.ctx, sql)
if err != nil {
return 0, fmt.Errorf("failed to execute a sql: %v", err)
}
return affectedRows, nil
}

// Creates a prepared statement.
func (client *Client) Prepare(sql string) (*flightsql.PreparedStatement, error) {
return client.inner.Prepare(client.ctx, sql)
Expand All @@ -119,6 +129,11 @@ func (client *Client) ExecutePrepared(preparedStmt *flightsql.PreparedStatement,
return client.doGet(flightInfo.GetEndpoint()[0].GetTicket())
}

// Closes the prepared statement.
func (client *Client) ClosePrepared(preparedStmt *flightsql.PreparedStatement) error {
return preparedStmt.Close(client.ctx)
}

// Calls the `DoGet` method of the FlightSQL client.
func (client *Client) doGet(ticket *flight.Ticket) ([]arrow.Record, error) {
reader, err := client.inner.DoGet(client.ctx, ticket)
Expand Down
36 changes: 36 additions & 0 deletions go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,40 @@ func main() {
// 2024-09-01 10:05:00 +0800 CST 2 15.30 1
// 2024-09-02 10:05:00 +0800 CST 2 15.30 1
PrintRecords(result)

// Closes the prepared statement to notify releasing resources on server side.
if err = client.ClosePrepared(preparedStmt); err != nil {
fmt.Println("Failed to close a prepared statement: ", err)
return
}

// There provides a dedicated interface `execute_update` for executing DMLs, including Insert, Delete.
// This interface directly returns the affected rows which might be convenient for some use cases.
//
// Note, Datalayers does not support Update and the development for Delete is in progress.
sql = `
INSERT INTO go.demo (ts, sid, value, flag) VALUES
('2024-09-03T10:00:00+08:00', 1, 4.5, 0),
('2024-09-03T10:05:00+08:00', 2, 11.6, 1);`
affectedRows, err := client.ExecuteUpdate(sql)
if err != nil {
fmt.Println("Failed to insert data: ", err)
return
}
// The output should be:
// Affected rows: 2
fmt.Println("Affected rows: ", affectedRows)

// Checks that the data are inserted successfully.
sql = "SELECT * FROM go.demo where ts >= '2024-09-03T10:00:00+08:00'"
result, err = client.Execute(sql)
if err != nil {
fmt.Println("Failed to scan data: ", err)
return
}
// The result should be:
// ts sid value flag
// 2024-09-03 10:00:00 +0800 CST 1 4.50 0
// 2024-09-03 10:05:00 +0800 CST 2 11.60 1
PrintRecords(result)
}
18 changes: 18 additions & 0 deletions python/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ def execute(self, sql: str) -> pandas.DataFrame:
df = reader.read_pandas()
return df

def execute_update(self, sql: str) -> int:
"""
Executes the sql on Datalayers and returns the affected rows.
This method is meant to be used for executing DMLs, including Insert and Delete.
Note, Datalayers does not support Update and the development of Delete is in progress.
"""

return self.inner.execute_update(sql, None)

def prepare(self, sql: str) -> PreparedStatement:
"""
Creates a prepared statement.
Expand All @@ -122,6 +131,15 @@ def execute_prepared(
df = reader.read_pandas()
return df

def close_prepared(self, prepared_stmt: PreparedStatement):
"""
Closes the prepared statement.
Note, generally you should not call this method explicitly.
Use with clause to manage the life cycle of a prepared statement instead.
"""

prepared_stmt.close()

def close(self):
"""
Closes the inner Arrow FlightSQL client.
Expand Down
23 changes: 23 additions & 0 deletions python/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,29 @@ def main():
# 1 2024-09-02 10:05:00+08:00 2 15.3 1
print(result)

# There provides a dedicated interface `execute_update` for executing DMLs, including Insert, Delete.
# This interface directly returns the affected rows which might be convenient for some use cases.
#
# Note, Datalayers does not support Update and the development for Delete is in progress.
sql = """
INSERT INTO python.demo (ts, sid, value, flag) VALUES
('2024-09-03T10:00:00+08:00', 1, 4.5, 0),
('2024-09-03T10:05:00+08:00', 2, 11.6, 1);
"""
affected_rows = client.execute_update(sql)
# The output should be:
# Affected rows: 2
print("Affected rows: {}".format(affected_rows))

# Checks that the data are inserted successfully.
sql = "SELECT * FROM python.demo where ts >= '2024-09-03T10:00:00+08:00'"
result = client.execute(sql)
# The result should be:
# ts sid value flag
# 0 2024-09-03 10:00:00+08:00 1 4.5 0
# 1 2024-09-03 10:05:00+08:00 2 11.6 1
print(result)


if __name__ == "__main__":
main()
29 changes: 29 additions & 0 deletions rust/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,34 @@ async fn main() -> Result<()> {
// +---------------------------+-----+-------+------+
print_batches(&result);

// Closes the prepared statement to notify releasing resources on server side.
client.close_prepared(prepared_stmt).await?;

// There provides a dedicated interface `execute_update` for executing DMLs, including Insert, Delete.
// This interface directly returns the affected rows which might be convenient for some use cases.
//
// Note, Datalayers does not support Update and the development for Delete is in progress.
sql = r#"
INSERT INTO rust.demo (ts, sid, value, flag) VALUES
('2024-09-03T10:00:00+08:00', 1, 4.5, 0),
('2024-09-03T10:05:00+08:00', 2, 11.6, 1);
"#;
let affected_rows = client.execute_update(sql).await?;
// The output should be:
// Affected rows: 2
println!("Affected rows: {}", affected_rows);

// Checks that the data are inserted successfully.
sql = "SELECT * FROM rust.demo where ts >= '2024-09-03T10:00:00+08:00'";
result = client.execute(sql).await?;
// The result should be:
// +---------------------------+-----+-------+------+
// | ts | sid | value | flag |
// +---------------------------+-----+-------+------+
// | 2024-09-03T10:00:00+08:00 | 1 | 4.5 | 0 |
// | 2024-09-03T10:05:00+08:00 | 2 | 11.6 | 1 |
// +---------------------------+-----+-------+------+
print_batches(&result);

Ok(())
}
46 changes: 25 additions & 21 deletions rust/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl Client {
.handshake(&config.username, &config.password)
.await
.inspect_err(|e| {
println!("Failed to do handshake: {}", filter_message(&e.to_string()));
println!("{}", filter_message(&e.to_string()));
exit(1)
});

Expand All @@ -83,10 +83,7 @@ impl Client {
.execute(sql.to_string(), None)
.await
.inspect_err(|e| {
println!(
"Failed to execute a sql: {}",
filter_message(&e.to_string())
);
println!("{}", filter_message(&e.to_string()));
exit(1)
})?;
let ticket = flight_info
Expand All @@ -100,16 +97,25 @@ impl Client {
Ok(batches)
}

pub async fn execute_update(&mut self, sql: &str) -> Result<i64> {
let affected_rows = self
.inner
.execute_update(sql.to_string(), None)
.await
.inspect_err(|e| {
println!("{}", filter_message(&e.to_string()));
exit(1)
})?;
Ok(affected_rows)
}

pub async fn prepare(&mut self, sql: &str) -> Result<PreparedStatement<Channel>> {
let prepared_stmt = self
.inner
.prepare(sql.to_string(), None)
.await
.inspect_err(|e| {
println!(
"Failed to execute a sql: {}",
filter_message(&e.to_string())
);
println!("{}", filter_message(&e.to_string()));
exit(1)
})?;
Ok(prepared_stmt)
Expand All @@ -124,10 +130,7 @@ impl Client {
.set_parameters(binding)
.context("Failed to bind a record batch to the prepared statement")?;
let flight_info = prepared_stmt.execute().await.inspect_err(|e| {
println!(
"Failed to execute the prepared statement: {}",
filter_message(&e.to_string())
);
println!("{}", filter_message(&e.to_string()));
exit(1)
})?;
let ticket = flight_info
Expand All @@ -141,19 +144,20 @@ impl Client {
Ok(batches)
}

pub async fn close_prepared(&self, prepared_stmt: PreparedStatement<Channel>) -> Result<()> {
prepared_stmt
.close()
.await
.context("Failed to close a prepared statement")
}

async fn do_get(&mut self, ticket: Ticket) -> Result<Vec<RecordBatch>> {
let stream = self.inner.do_get(ticket).await.inspect_err(|e| {
println!(
"Failed to perform do_get: {}",
filter_message(&e.to_string())
);
println!("{}", filter_message(&e.to_string()));
exit(1)
})?;
let batches = stream.try_collect::<Vec<_>>().await.inspect_err(|e| {
println!(
"Failed to consume flight record batch stream: {}",
filter_message(&e.to_string())
);
println!("{}", filter_message(&e.to_string()));
exit(1)
})?;
if batches.is_empty() {
Expand Down