Skip to content

Commit

Permalink
added support for MySQL database, covered with tests, configured Trav…
Browse files Browse the repository at this point in the history
…is CI to use custom user/pwd for databases
  • Loading branch information
darklynx committed Jan 28, 2018
1 parent 74f733b commit 240df9e
Show file tree
Hide file tree
Showing 5 changed files with 587 additions and 33 deletions.
7 changes: 6 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
language: go
services:
- postgresql
- mysql
before_script:
- psql -c 'create database baskets;' -U postgres
- psql -c "CREATE DATABASE baskets;" -U postgres
- psql -c "CREATE USER rbaskets WITH PASSWORD 'pwd';" -U postgres
go:
- tip
before_install:
- mysql -e "CREATE DATABASE IF NOT EXISTS baskets ;"
- mysql -e "CREATE USER 'rbaskets'@'%' IDENTIFIED BY 'pwd' ;"
- mysql -e "GRANT ALL ON baskets.* TO 'rbaskets'@'%' ;"
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
- go get github.com/stretchr/testify/assert
Expand Down
36 changes: 33 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ It is strongly inspired by ideas and application design of the [RequestHub](http
- [Usage](#usage)
- [Persistent storage](#persistent-storage)
- [PostgreSQL](#postgresql)
- [MySQL](#mysql)
- [Docker](#docker)
- [Build docker image](#build-docker-image)
- [Run container as a service](#run-container-as-a-service)
Expand Down Expand Up @@ -143,7 +144,7 @@ Current implementation is based on PostgreSQL syntax. So running Request Baskets
To start the service with PostgreSQL database run:

```bash
$ request-baskets -db sql -conn "postgres://postgres:pwd@localhost/baskets?sslmode=disable"
$ request-baskets -db sql -conn "postgres://rbaskets:pwd@localhost/baskets?sslmode=disable"
2018/01/25 01:06:25 [info] generated master token: mSEAcYvpDlg...
2018/01/25 01:06:25 [info] using SQL database to store baskets
2018/01/25 01:06:25 [info] SQL database type: postgres
Expand All @@ -155,16 +156,45 @@ $ request-baskets -db sql -conn "postgres://postgres:pwd@localhost/baskets?sslmo

See the [Go driver of PostgreSQL](https://godoc.org/github.com/lib/pq) documentation for detailed description of connection string and its parameters.

If you do not have a configured instance of PostgreSQL server to test the Request Baskets service with you can quickly launch one using Docker with following command:
If you do not have a configured instance of PostgreSQL server to test the Request Baskets service with, you can quickly launch one using Docker with following command:

```bash
$ docker run --rm --name pg_baskets -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=pwd \
$ docker run --rm --name pg_baskets -e POSTGRES_USER=rbaskets -e POSTGRES_PASSWORD=pwd \
-e POSTGRES_DB=baskets -d -p 5432:5432 postgres

# following command will stop and destroy the instance of PostgreSQL container
$ docker stop pg_baskets
```

### MySQL

Added driver and support within the SQL basket database for [MySQL](https://www.mysql.com) ([MariaDB](https://mariadb.org)).

To start the service with MySQL database run:

```bash
$ request-baskets -db sql -conn "mysql://rbaskets:pwd@/baskets"
2018/01/28 23:39:59 [info] generated master token: aPgyuLxw723q...
2018/01/28 23:39:59 [info] using SQL database to store baskets
2018/01/28 23:39:59 [info] SQL database type: mysql
2018/01/28 23:39:59 [info] creating database schema
2018/01/28 23:39:59 [info] database is created, version: 1
2018/01/28 23:39:59 [info] HTTP server is listening on 127.0.0.1:55555
...
```

See the [Go driver of MySQL](https://github.com/go-sql-driver/mysql#usage) documentation for detailed description of connection string and its parameters.

If you do not have a configured instance of MySQL server to test the Request Baskets service with, you can quickly launch one using Docker with following command:

```bash
$ docker run --rm --name mysql_baskets -e MYSQL_USER=rbaskets -e MYSQL_PASSWORD=pwd \
-e MYSQL_DATABASE=baskets -e MYSQL_RANDOM_ROOT_PASSWORD=yes -d -p 3306:3306 mysql

# following command will stop and destroy the instance of MySQL container
$ docker stop mysql_baskets
```

## Docker

### Build docker image
Expand Down
115 changes: 87 additions & 28 deletions baskets_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"fmt"
"log"
"net/http"
"regexp"
"strings"

_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
)

Expand All @@ -26,15 +29,17 @@ var sqlSchema = []string{
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
)`,
`CREATE TABLE rb_responses (
basket_name varchar(250) REFERENCES rb_baskets (basket_name) ON DELETE CASCADE,
basket_name varchar(250) NOT NULL,
http_method varchar(20) NOT NULL,
response text NOT NULL,
PRIMARY KEY (basket_name, http_method)
PRIMARY KEY (basket_name, http_method),
FOREIGN KEY (basket_name) REFERENCES rb_baskets (basket_name) ON DELETE CASCADE
)`,
`CREATE TABLE rb_requests (
basket_name varchar(250) REFERENCES rb_baskets (basket_name) ON DELETE CASCADE,
basket_name varchar(250) NOT NULL,
request text NOT NULL,
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
created_at timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
FOREIGN KEY (basket_name) REFERENCES rb_baskets (basket_name) ON DELETE CASCADE
)`,
`CREATE INDEX rb_requests_name_time_index ON rb_requests (basket_name, created_at)`,
`CREATE TABLE rb_version (
Expand All @@ -51,7 +56,7 @@ type sqlBasket struct {

func (basket *sqlBasket) getInt(sql string, defaultValue int) int {
var value int
if err := basket.db.QueryRow(sql, basket.name).Scan(&value); err != nil {
if err := basket.db.QueryRow(unifySQL(basket.dbType, sql), basket.name).Scan(&value); err != nil {
log.Printf("[error] failed to get counter info about basket: %s - %s", basket.name, err)
return defaultValue
}
Expand All @@ -64,11 +69,18 @@ func (basket *sqlBasket) applyLimit(capacity int) {
size := basket.Size()

if size > capacity {
var cleanupSQL string

// Note: 'ctid' is PostgreSQL specific
// see example for MySQL here: https://stackoverflow.com/questions/5170546
_, err := basket.db.Exec("DELETE FROM rb_requests WHERE ctid IN (SELECT ctid FROM rb_requests WHERE basket_name = $1 ORDER BY created_at LIMIT $2)",
basket.name, size-capacity)
if err != nil {
switch basket.dbType {
case "postgres":
cleanupSQL = "DELETE FROM rb_requests WHERE ctid IN (SELECT ctid FROM rb_requests WHERE basket_name = $1 ORDER BY created_at LIMIT $2)"
default:
cleanupSQL = "DELETE FROM rb_requests WHERE basket_name = ? ORDER BY created_at LIMIT ?"
}

if _, err := basket.db.Exec(cleanupSQL, basket.name, size-capacity); err != nil {
log.Printf("[error] failed to shrink collected requests: %s - %s", basket.name, err)
}
}
Expand All @@ -77,7 +89,8 @@ func (basket *sqlBasket) applyLimit(capacity int) {
func (basket *sqlBasket) Config() BasketConfig {
config := BasketConfig{}

err := basket.db.QueryRow("SELECT capacity, forward_url, insecure_tls, expand_path FROM rb_baskets WHERE basket_name = $1",
err := basket.db.QueryRow(
unifySQL(basket.dbType, "SELECT capacity, forward_url, insecure_tls, expand_path FROM rb_baskets WHERE basket_name = $1"),
basket.name).Scan(&config.Capacity, &config.ForwardURL, &config.InsecureTLS, &config.ExpandPath)
if err != nil {
log.Printf("[error] failed to get basket config: %s - %s", basket.name, err)
Expand All @@ -87,7 +100,8 @@ func (basket *sqlBasket) Config() BasketConfig {
}

func (basket *sqlBasket) Update(config BasketConfig) {
_, err := basket.db.Exec("UPDATE rb_baskets SET capacity = $1, forward_url = $2, insecure_tls = $3, expand_path = $4 WHERE basket_name = $5",
_, err := basket.db.Exec(
unifySQL(basket.dbType, "UPDATE rb_baskets SET capacity = $1, forward_url = $2, insecure_tls = $3, expand_path = $4 WHERE basket_name = $5"),
config.Capacity, config.ForwardURL, config.InsecureTLS, config.ExpandPath, basket.name)
if err != nil {
log.Printf("[error] failed to update basket config: %s - %s", basket.name, err)
Expand All @@ -100,7 +114,8 @@ func (basket *sqlBasket) Update(config BasketConfig) {
func (basket *sqlBasket) Authorize(token string) bool {
var found int

err := basket.db.QueryRow("SELECT COUNT(*) FROM rb_baskets WHERE basket_name = $1 AND token = $2",
err := basket.db.QueryRow(
unifySQL(basket.dbType, "SELECT COUNT(*) FROM rb_baskets WHERE basket_name = $1 AND token = $2"),
basket.name, token).Scan(&found)
if err != nil {
log.Printf("[error] failed authorize access to basket: %s - %s", basket.name, err)
Expand All @@ -113,7 +128,8 @@ func (basket *sqlBasket) Authorize(token string) bool {
func (basket *sqlBasket) GetResponse(method string) *ResponseConfig {
var resp string

err := basket.db.QueryRow("SELECT response FROM rb_responses WHERE basket_name = $1 AND http_method = $2",
err := basket.db.QueryRow(
unifySQL(basket.dbType, "SELECT response FROM rb_responses WHERE basket_name = $1 AND http_method = $2"),
basket.name, method).Scan(&resp)
if err == sql.ErrNoRows {
// no response for this basket + HTTP method
Expand All @@ -135,9 +151,10 @@ func (basket *sqlBasket) GetResponse(method string) *ResponseConfig {
func (basket *sqlBasket) SetResponse(method string, response ResponseConfig) {
if respb, err := json.Marshal(response); err == nil {
// delete existing if present
basket.db.Exec("DELETE FROM rb_responses WHERE basket_name = $1 AND http_method = $2", basket.name, method)
basket.db.Exec(unifySQL(basket.dbType, "DELETE FROM rb_responses WHERE basket_name = $1 AND http_method = $2"), basket.name, method)
// insert new response (ignore concurrency)
_, err = basket.db.Exec("INSERT INTO rb_responses (basket_name, http_method, response) VALUES ($1, $2, $3)",
_, err = basket.db.Exec(
unifySQL(basket.dbType, "INSERT INTO rb_responses (basket_name, http_method, response) VALUES ($1, $2, $3)"),
basket.name, method, string(respb))

if err != nil {
Expand All @@ -149,12 +166,14 @@ func (basket *sqlBasket) SetResponse(method string, response ResponseConfig) {
func (basket *sqlBasket) Add(req *http.Request) *RequestData {
data := ToRequestData(req)
if datab, err := json.Marshal(data); err == nil {
_, err = basket.db.Exec("INSERT INTO rb_requests (basket_name, request) VALUES ($1, $2)", basket.name, string(datab))
_, err = basket.db.Exec(
unifySQL(basket.dbType, "INSERT INTO rb_requests (basket_name, request) VALUES ($1, $2)"), basket.name, string(datab))
if err != nil {
log.Printf("[error] failed to collect incoming HTTP request in basket: %s - %s", basket.name, err)
} else {
// update global counter
_, err = basket.db.Exec("UPDATE rb_baskets SET requests_count = requests_count + 1 WHERE basket_name = $1", basket.name)
_, err = basket.db.Exec(
unifySQL(basket.dbType, "UPDATE rb_baskets SET requests_count = requests_count + 1 WHERE basket_name = $1"), basket.name)
if err != nil {
log.Printf("[error] failed to update requests counter of basket: %s - %s", basket.name, err)
}
Expand All @@ -168,7 +187,7 @@ func (basket *sqlBasket) Add(req *http.Request) *RequestData {
}

func (basket *sqlBasket) Clear() {
if _, err := basket.db.Exec("DELETE FROM rb_requests WHERE basket_name = $1", basket.name); err != nil {
if _, err := basket.db.Exec(unifySQL(basket.dbType, "DELETE FROM rb_requests WHERE basket_name = $1"), basket.name); err != nil {
log.Printf("[error] failed to delete collected requests in basket: %s - %s", basket.name, err)
}
}
Expand All @@ -182,7 +201,8 @@ func (basket *sqlBasket) GetRequests(max int, skip int) RequestsPage {
basket.getInt("SELECT requests_count FROM rb_baskets WHERE basket_name = $1", 0), false}

if max > 0 {
requests, err := basket.db.Query("SELECT request FROM rb_requests WHERE basket_name = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3",
requests, err := basket.db.Query(
unifySQL(basket.dbType, "SELECT request FROM rb_requests WHERE basket_name = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3"),
basket.name, max+1, skip)
if err != nil {
log.Printf("[error] failed to get requests of basket: %s - %s", basket.name, err)
Expand Down Expand Up @@ -215,7 +235,8 @@ func (basket *sqlBasket) GetRequests(max int, skip int) RequestsPage {
func (basket *sqlBasket) FindRequests(query string, in string, max int, skip int) RequestsQueryPage {
page := RequestsQueryPage{make([]*RequestData, 0, max), false}
if max > 0 {
requests, err := basket.db.Query("SELECT request FROM rb_requests WHERE basket_name = $1 ORDER BY created_at DESC", basket.name)
requests, err := basket.db.Query(
unifySQL(basket.dbType, "SELECT request FROM rb_requests WHERE basket_name = $1 ORDER BY created_at DESC"), basket.name)
if err != nil {
log.Printf("[error] failed to find requests of basket: %s - %s", basket.name, err)
return page
Expand Down Expand Up @@ -265,7 +286,8 @@ func (sdb *sqlDatabase) Create(name string, config BasketConfig) (BasketAuth, er
return auth, fmt.Errorf("Failed to generate token: %s", err)
}

basket, err := sdb.db.Exec("INSERT INTO rb_baskets (basket_name, token, capacity, forward_url, insecure_tls, expand_path) VALUES($1, $2, $3, $4, $5, $6)",
basket, err := sdb.db.Exec(
unifySQL(sdb.dbType, "INSERT INTO rb_baskets (basket_name, token, capacity, forward_url, insecure_tls, expand_path) VALUES($1, $2, $3, $4, $5, $6)"),
name, token, config.Capacity, config.ForwardURL, config.InsecureTLS, config.ExpandPath)
if err != nil {
return auth, fmt.Errorf("Failed to create basket: %s - %s", name, err)
Expand All @@ -281,7 +303,7 @@ func (sdb *sqlDatabase) Create(name string, config BasketConfig) (BasketAuth, er

func (sdb *sqlDatabase) Get(name string) Basket {
var bname string
err := sdb.db.QueryRow("SELECT basket_name FROM rb_baskets WHERE basket_name = $1", name).Scan(&bname)
err := sdb.db.QueryRow(unifySQL(sdb.dbType, "SELECT basket_name FROM rb_baskets WHERE basket_name = $1"), name).Scan(&bname)

if err == sql.ErrNoRows {
log.Printf("[warn] no basket found: %s", name)
Expand All @@ -295,7 +317,7 @@ func (sdb *sqlDatabase) Get(name string) Basket {
}

func (sdb *sqlDatabase) Delete(name string) {
if _, err := sdb.db.Exec("DELETE FROM rb_baskets WHERE basket_name = $1", name); err != nil {
if _, err := sdb.db.Exec(unifySQL(sdb.dbType, "DELETE FROM rb_baskets WHERE basket_name = $1"), name); err != nil {
log.Printf("[error] failed to delete basket: %s - %s", name, err)
}
}
Expand All @@ -313,7 +335,7 @@ func (sdb *sqlDatabase) Size() int {
func (sdb *sqlDatabase) GetNames(max int, skip int) BasketNamesPage {
page := BasketNamesPage{make([]string, 0, max), sdb.Size(), false}

names, err := sdb.db.Query("SELECT basket_name FROM rb_baskets ORDER BY basket_name LIMIT $1 OFFSET $2", max+1, skip)
names, err := sdb.db.Query(unifySQL(sdb.dbType, "SELECT basket_name FROM rb_baskets ORDER BY basket_name LIMIT $1 OFFSET $2"), max+1, skip)
if err != nil {
log.Printf("[error] failed to get basket names: %s", err)
return page
Expand All @@ -337,7 +359,8 @@ func (sdb *sqlDatabase) GetNames(max int, skip int) BasketNamesPage {
func (sdb *sqlDatabase) FindNames(query string, max int, skip int) BasketNamesQueryPage {
page := BasketNamesQueryPage{make([]string, 0, max), false}

names, err := sdb.db.Query("SELECT basket_name FROM rb_baskets WHERE basket_name LIKE $1 ORDER BY basket_name LIMIT $2 OFFSET $3",
names, err := sdb.db.Query(
unifySQL(sdb.dbType, "SELECT basket_name FROM rb_baskets WHERE basket_name LIKE $1 ORDER BY basket_name LIMIT $2 OFFSET $3"),
"%"+query+"%", max+1, skip)
if err != nil {
log.Printf("[error] failed to find basket names: %s", err)
Expand Down Expand Up @@ -368,10 +391,13 @@ func (sdb *sqlDatabase) Release() {
func NewSQLDatabase(connection string) BasketsDatabase {
log.Print("[info] using SQL database to store baskets")

dbType := "postgres" // TODO: determine from connection string
log.Printf("[info] SQL database type: %s", dbType)
driver, source := parseConnection(connection)
if len(driver) == 0 {
return nil
}
log.Printf("[info] SQL database type: %s", driver)

db, err := sql.Open(dbType, connection)
db, err := sql.Open(driver, source)
if err != nil {
log.Printf("[error] failed to open database connection: %s - %s", connection, err)
return nil
Expand All @@ -382,13 +408,46 @@ func NewSQLDatabase(connection string) BasketsDatabase {
} else if err = initSchema(db); err != nil {
log.Printf("[error] failed to initialize SQL schema: %s", err)
} else {
return &sqlDatabase{db, dbType}
return &sqlDatabase{db, driver}
}

db.Close()
return nil
}

var pgParams = regexp.MustCompile("\\$\\d+")

func unifySQL(dbType string, sql string) string {
switch dbType {
case "mysql", "sqlite3":
// replace $n with ?
return pgParams.ReplaceAllString(sql, "?")
// case "postgres", "sqlserver":
default:
// statements are already designed to work with postgresql
return sql
}
}

func parseConnection(connection string) (string, string) {
if parts := strings.Split(connection, "://"); len(parts) > 1 {
driver := parts[0]
source := parts[1]

switch driver {
case "postgres":
return driver, connection
case "mysql", "sqlite3":
return driver, source
default:
return driver, connection
}
}

log.Printf("[error] failed to detect database type from connection: %s", connection)
return "", connection
}

func initSchema(db *sql.DB) error {
switch version := getSchemaVersion(db); version {
case 0:
Expand Down

0 comments on commit 240df9e

Please sign in to comment.