Skip to content

Commit

Permalink
Implement gRPC support in driver (untested).
Browse files Browse the repository at this point in the history
  • Loading branch information
akrennmair committed Jan 7, 2024
1 parent 88caa22 commit 4c80753
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 3 deletions.
114 changes: 111 additions & 3 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/akrennmair/updog/internal/convert"
"github.com/akrennmair/updog/internal/queryparser"
updogv1 "github.com/akrennmair/updog/proto/updog/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func init() {
Expand Down Expand Up @@ -115,7 +117,12 @@ func (d *updogDriver) openFile(file string, optValues url.Values) (driver.Conn,
}

func (d *updogDriver) openConn(host string, port string) (driver.Conn, error) {
return nil, errors.New("gRPC support not implemented")
conn, err := grpc.Dial(host+":"+port, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("failed to dial: %w", err)
}

return &grpcConn{conn: conn, client: updogv1.NewQueryServiceClient(conn)}, nil
}

type fileConn struct {
Expand Down Expand Up @@ -227,10 +234,10 @@ func (stmt *fileStmt) Close() error {
return nil
}

func (stmt *fileStmt) NumInput() int {
func numInput(q *updogv1.Query) int {
maxPlaceholder := int32(0)

queryparser.Walk(stmt.q, func(e *updogv1.Query_Expression) bool {
queryparser.Walk(q, func(e *updogv1.Query_Expression) bool {
if v, ok := e.Value.(*updogv1.Query_Expression_Eq); ok {
if v.Eq.Placeholder > maxPlaceholder {
maxPlaceholder = v.Eq.Placeholder
Expand All @@ -242,6 +249,10 @@ func (stmt *fileStmt) NumInput() int {
return int(maxPlaceholder)
}

func (stmt *fileStmt) NumInput() int {
return numInput(stmt.q)
}

func (stmt *fileStmt) Exec(args []driver.Value) (driver.Result, error) {
return nil, errors.New("only queries are supported")
}
Expand Down Expand Up @@ -344,3 +355,100 @@ func (r *rows) ColumnTypeNullable(index int) (nullable, ok bool) {
func (r *rows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) {
return 0, 0, false
}

type grpcConn struct {
conn *grpc.ClientConn
client updogv1.QueryServiceClient
}

func (c *grpcConn) Prepare(query string) (driver.Stmt, error) {
return c.prepare(query)
}

func (c *grpcConn) prepare(query string) (*grpcStmt, error) {
q, err := queryparser.ParseQuery(query)
if err != nil {
return nil, fmt.Errorf("parsing query failed: %v", err)
}

return &grpcStmt{
c: c,
q: q,
}, nil
}

func (c *grpcConn) Close() error {
return c.conn.Close()
}

func (c *grpcConn) Begin() (driver.Tx, error) {
return c, nil
}

func (c *grpcConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
return c, nil
}

func (c *grpcConn) Commit() error {
return nil
}

func (c *grpcConn) Rollback() error {
return nil
}

func (c *grpcConn) Ping(ctx context.Context) error {
return nil
}

func (c *grpcConn) ResetSession(ctx context.Context) error {
return nil
}

func (c *grpcConn) IsValid() bool {
return c.conn != nil && c.client != nil
}

type grpcStmt struct {
c *grpcConn
q *updogv1.Query
}

func (stmt *grpcStmt) Close() error {
return nil
}

func (stmt *grpcStmt) Exec(args []driver.Value) (driver.Result, error) {
return nil, errors.New("only queries are supported")
}

func (stmt *grpcStmt) Query(args []driver.Value) (driver.Rows, error) {
var values []string

for _, a := range args {
values = append(values, fmt.Sprint(a))
}

return stmt.query(values)
}

func (stmt *grpcStmt) query(values []string) (driver.Rows, error) {
q := queryparser.ReplacePlaceholders(stmt.q, values)

result, err := stmt.c.client.Query(context.Background(), &updogv1.QueryRequest{
Queries: []*updogv1.Query{q},
})
if err != nil {
return nil, err
}

if len(result.Results) != 1 {
return nil, fmt.Errorf("expected 1 result, got %d", len(result.Results))
}

return newRows(convert.ToResult(result.Results[0]), q.GroupBy), nil
}

func (stmt *grpcStmt) NumInput() int {
return numInput(stmt.q)
}
21 changes: 21 additions & 0 deletions internal/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,24 @@ func ToProtobufResult(result *updog.Result, qid int32) *proto.Result {

return pbr
}

func ToResult(pr *proto.Result) *updog.Result {
r := &updog.Result{Count: pr.TotalCount}

for _, g := range pr.Groups {
gg := updog.ResultGroup{
Count: g.Count,
}

for _, f := range g.Fields {
gg.Fields = append(gg.Fields, updog.ResultField{
Column: f.Column,
Value: f.Value,
})
}

r.Groups = append(r.Groups, gg)
}

return r
}

0 comments on commit 4c80753

Please sign in to comment.