English | 简体中文
[TDengine] provides Go database/sql driver as [taosSql][driver-go].
v2 is not compatible with v3 version and corresponds to the TDengine version as follows:
| driver-go version | TDengine version | major features |
|---|---|---|
| v3.5.1 | 3.2.1.0+ / 3.1.1.13+ | native stmt query and geometry support |
| v3.5.0 | 3.0.5.0+ | tmq: get assignment and seek offset |
| v3.3.1 | 3.0.4.1+ | schemaless insert over websocket |
| v3.1.0 | 3.0.2.2+ | provide tmq apis close to kafka |
| v3.0.4 | 3.0.2.2+ | add request id |
| v3.0.3 | 3.0.1.5+ | statement insert over websocket |
| v3.0.2 | 3.0.1.5+ | bulk pulling over websocket |
| v3.0.1 | 3.0.0.0+ | tmq over websocket |
| v3.0.0 | 3.0.0.0+ | adapt to TDengine 3.0 query and insert |
Go 1.14+ is highly recommended for newly created projects.
go mod init taos-demoimport taosSql:
import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosSql"
)Use go mod for module management:
go mod tidyOr go get to directly install it:
go get github.com/taosdata/driver-go/v3/taosSqlA simple use case:
package main
import (
"database/sql"
"fmt"
"time"
_ "github.com/taosdata/driver-go/v3/taosSql"
)
func main() {
var taosUri = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosUri)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
}
defer taos.Close()
taos.Exec("create database if not exists test")
taos.Exec("use test")
taos.Exec("create table if not exists tb1 (ts timestamp, a int)")
_, err = taos.Exec("insert into tb1 values(now, 0)(now+1s,1)(now+2s,2)(now+3s,3)")
if err != nil {
fmt.Println("failed to insert, err:", err)
return
}
rows, err := taos.Query("select * from tb1")
if err != nil {
fmt.Println("failed to select from table, err:", err)
return
}
defer rows.Close()
for rows.Next() {
var r struct {
ts time.Time
a int
}
err := rows.Scan(&r.ts, &r.a)
if err != nil {
fmt.Println("scan error:\n", err)
return
}
fmt.Println(r.ts, r.a)
}
}APIs that are worthy to have a check:
-
sql.Open(DRIVER_NAME string, dataSourceName string) *DBThis API will create a
database/sqlDB object, results with type*DB.DRIVER_NAMEshould be set astaosSql, anddataSourceNameshould be a URI likeuser:password@tcp(host:port)/dbname. For HA use case, useuser:password@cfg(/etc/taos)/dbnameto apply configs in/etc/taos/taos.cfg. -
func (db *DB) Exec(query string, args ...interface{}) (Result, error)Execute non resultset SQLs, like
create,alteretc. -
func (db *DB) Query(query string, args ...interface{}) (*Rows, error)Execute a query with resultset.
-
func (db *DB) Close() errorClose an DB object and disconnect.
Create consumer:
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)Subscribe single topic:
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) errorSubscribe topics:
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) errorPoll message:
func (c *Consumer) Poll(timeoutMs int) tmq.EventCommit message:
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)Get assignment:
func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)Seek offset:
func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) errorUnsubscribe:
func (c *Consumer) Unsubscribe() errorClose consumer:
func (c *Consumer) Close() errorExample code: examples/tmq/main.go.
InfluxDB format:
func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) errorExample code: examples/schemaless/influx/main.go.
OpenTSDB telnet format:
func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) errorExample code: examples/schemaless/telnet/main.go.
OpenTSDB json format:
func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) errorExample code: examples/schemaless/json/main.go.
Prepare sql:
func (stmt *InsertStmt) Prepare(sql string) errorSet the child table name:
func (stmt *InsertStmt) SetSubTableName(name string) errorSet the table name:
func (stmt *InsertStmt) SetTableName(name string) errorSet the subtable name and tags:
func (stmt *InsertStmt) SetTableNameWithTags(tableName string, tags *param.Param) errorBind parameters:
func (stmt *InsertStmt) BindParam(params []*param.Param, bindType *param.ColumnType) errorAdd batch:
func (stmt *InsertStmt) AddBatch() errorimplement:
func (stmt *InsertStmt) Execute() errorGet the number of affected rows:
func (stmt *InsertStmt) GetAffectedRows() intClose stmt:
func (stmt *InsertStmt) Close() errorExample code: examples/stmtinsert/main.go.
A simple use case:
package main
import (
"database/sql"
"fmt"
"time"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/"
taos, err := sql.Open("taosRestful", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
}
defer taos.Close()
taos.Exec("create database if not exists test")
taos.Exec("create table if not exists test.tb1 (ts timestamp, a int)")
_, err = taos.Exec("insert into test.tb1 values(now, 0)(now+1s,1)(now+2s,2)(now+3s,3)")
if err != nil {
fmt.Println("failed to insert, err:", err)
return
}
rows, err := taos.Query("select * from test.tb1")
if err != nil {
fmt.Println("failed to select from table, err:", err)
return
}
defer rows.Close()
for rows.Next() {
var r struct {
ts time.Time
a int
}
err := rows.Scan(&r.ts, &r.a)
if err != nil {
fmt.Println("scan error:\n", err)
return
}
fmt.Println(r.ts, r.a)
}
}import
import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)The driverName of sql.Open is taosRestful
The DSN format is:
database username:database password@connection-method(domain or ip:port)/[database][? Parameter]
Example:
root:taosdata@http(localhost:6041)/test?readBufferSize=52428800
Parameters:
disableCompressionWhether to accept compressed data, default istrueDo not accept compressed data, set tofalseif the transferred data is compressed using gzip.readBufferSizeThe default size of the buffer for reading data is 4K (4096), which can be adjusted upwards when there is a lot of data in the query result.
Since the restful interface is stateless, the use db syntax will not work, you need to put the db name into the sql statement, e.g. create table if not exists tb1 (ts timestamp, a int) to create table if not exists test.tb1 (ts timestamp, a int) otherwise it will report an error [0x217] Database not specified or available.
You can also put the db name in the DSN by changing root:taosdata@http(localhost:6041)/ to root:taosdata@http(localhost:6041)/test. Executing the create database statement when the specified db does not exist will not report an error, while executing other queries or inserts will report an error. The example is as follows:
package main
import (
"database/sql"
"fmt"
"time"
_ "github.com/taosdata/driver-go/v3/taosRestful"
)
func main() {
var taosDSN = "root:taosdata@http(localhost:6041)/test"
taos, err := sql.Open("taosRestful", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
}
defer taos.Close()
taos.Exec("create database if not exists test")
taos.Exec("create table if not exists tb1 (ts timestamp, a int)")
_, err = taos.Exec("insert into tb1 values(now, 0)(now+1s,1)(now+2s,2)(now+3s,3)")
if err != nil {
fmt.Println("failed to insert, err:", err)
return
}
rows, err := taos.Query("select * from tb1")
if err != nil {
fmt.Println("failed to select from table, err:", err)
return
}
defer rows.Close()
for rows.Next() {
var r struct {
ts time.Time
a int
}
err := rows.Scan(&r.ts, &r.a)
if err != nil {
fmt.Println("scan error:\n", err)
return
}
fmt.Println(r.ts, r.a)
}
}A simple use case:
package main
import (
"database/sql"
"fmt"
"time"
_ "github.com/taosdata/driver-go/v3/taosWS"
)
func main() {
var taosDSN = "root:taosdata@ws(localhost:6041)/"
taos, err := sql.Open("taosWS", taosDSN)
if err != nil {
fmt.Println("failed to connect TDengine, err:", err)
return
}
defer taos.Close()
taos.Exec("create database if not exists test")
taos.Exec("create table if not exists test.tb1 (ts timestamp, a int)")
_, err = taos.Exec("insert into test.tb1 values(now, 0)(now+1s,1)(now+2s,2)(now+3s,3)")
if err != nil {
fmt.Println("failed to insert, err:", err)
return
}
rows, err := taos.Query("select * from test.tb1")
if err != nil {
fmt.Println("failed to select from table, err:", err)
return
}
defer rows.Close()
for rows.Next() {
var r struct {
ts time.Time
a int
}
err := rows.Scan(&r.ts, &r.a)
if err != nil {
fmt.Println("scan error:\n", err)
return
}
fmt.Println(r.ts, r.a)
}
}import
import (
"database/sql"
_ "github.com/taosdata/driver-go/v3/taosWS"
)The driverName of sql.Open is taosWS
The DSN format is:
database username:database password@connection-method(domain or ip:port)/[database][? parameter]
Example:
root:taosdata@ws(localhost:6041)/test?writeTimeout=10s&readTimeout=10m
Parameters:
writeTimeoutThe timeout to send data via websocket.readTimeoutThe timeout to receive response data via websocket.enableCompressionWhether to compress the transmitted data, the default isfalseand no compressed data is sent.
Use tmq over websocket. The server needs to start taoAdapter.
func NewConfig(url string, chanLength uint) *Config
Create a configuration, pass in the websocket address and the length of the sending channel.
func (c *Config) SetConnectUser(user string) error
Set username.
func (c *Config) SetConnectPass(pass string) error
Set password.
func (c *Config) SetClientID(clientID string) error
Set the client ID.
func (c *Config) SetGroupID(groupID string) error
Set the subscription group ID.
func (c *Config) SetWriteWait(writeWait time.Duration) error
Set the waiting time for sending messages.
func (c *Config) SetMessageTimeout(timeout time.Duration) error
Set the message timeout.
func (c *Config) SetErrorHandler(f func(consumer *Consumer, err error))
Set the error handler.
func (c *Config) SetCloseHandler(f func())
Set the close handler.
func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)
Create a consumer.
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
Subscribe a topic.
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error
Subscribe to topics.
func (c *Consumer) Poll(timeoutMs int) tmq.Event
Poll messages.
func (c *Consumer) Commit() ([]tmq.TopicPartition, error)
Commit message.
func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)
Get assignment.
func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error
Seek offset.
func (c *Consumer) Close() error
Close the connection.
Example code: examples/tmqoverws/main.go.
Use stmt via websocket. The server needs to start taoAdapter.
-
func NewConfig(url string, chanLength uint) *ConfigCreate a configuration item, pass in the websocket address and the length of the sending pipe.
-
func (c *Config) SetCloseHandler(f func())Set close handler.
-
func (c *Config) SetConnectDB(db string) errorSet connect DB.
-
func (c *Config) SetConnectPass(pass string) errorSet password.
-
func (c *Config) SetConnectUser(user string) errorSet username.
-
func (c *Config) SetErrorHandler(f func(connector *Connector, err error))Set error handler.
-
func (c *Config) SetMessageTimeout(timeout time.Duration) errorSet the message timeout.
-
func (c *Config) SetWriteWait(writeWait time.Duration) errorSet the waiting time for sending messages.
-
func NewConnector(config *Config) (*Connector, error)Create a connection.
-
func (c *Connector) Init() (*Stmt, error)Initialize the parameters.
-
func (c *Connector) Close() errorClose the connection.
-
func (s *Stmt) Prepare(sql string) errorParameter binding preprocessing SQL statement.
-
func (s *Stmt) SetTableName(name string) errorBind the table name parameter.
-
func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType) errorBind tags.
-
func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) errorParameter bind multiple rows of data.
-
func (s *Stmt) AddBatch() errorAdd to a parameter-bound batch.
-
func (s *Stmt) Exec() errorExecute a parameter binding.
-
func (s *Stmt) GetAffectedRows() intGets the number of affected rows inserted by the parameter binding.
-
func (s *Stmt) Close() errorCloses the parameter binding.
For a complete example of parameter binding, see GitHub example file
driver-go
├── af //advanced function
├── common //common function and constants
├── errors // error type
├── examples //examples
├── taosRestful // database operation standard interface (restful)
├── taosSql // database operation standard interface
├── types // inner type
├── wrapper // cgo wrapper
└── ws // websocket
driver-go: https://github.com/taosdata/driver-go
TDengine: https://github.com/taosdata/TDengine