Skip to content

Commit

Permalink
dev: introduce unified retry SQL and place SQL into 'heredoc'
Browse files Browse the repository at this point in the history
  • Loading branch information
sunsingerus committed May 31, 2019
1 parent 7ff23e7 commit 619842e
Show file tree
Hide file tree
Showing 12 changed files with 479 additions and 47 deletions.
9 changes: 9 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions manifests/operator/clickhouse-operator-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ data:
<users>
<clickhouse_operator>
<networks>
<ip>127.0.0.1</ip>
<ip>0.0.0.0/0</ip>
<ip>::/0</ip>
</networks>
<password>clickhouse_operator_password</password>
Expand Down
20 changes: 10 additions & 10 deletions pkg/model/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,25 @@ func (c *Conn) Query(query string) (*sql.Rows, error) {
}

dsn := c.makeDsn()
glog.V(1).Infof("Query ClickHouse DSN: %s", dsn)
//glog.V(1).Infof("Query ClickHouse DSN: %s", dsn)
connect, err := sql.Open("clickhouse", dsn)
if err != nil {
glog.V(1).Infof("Q1 %v", err)
glog.V(1).Infof("sql.Open(%s) FAILED %v", dsn, err)
return nil, err
}

if err := connect.Ping(); err != nil {
glog.V(1).Infof("Q2 %v", err)
glog.V(1).Infof("connect.Ping(%s) FAILED %v", dsn, err)
return nil, err
}

rows, err := connect.Query(query)
if err != nil {
glog.V(1).Infof("Q3 %v", err)
glog.V(1).Infof("connect.Query(%s) FAILED %v", dsn, err)
return nil, err
}

glog.V(1).Infof("clickhouseSQL(%s)'%s'", c.Hostname, query)
// glog.V(1).Infof("clickhouse.Query(%s):'%s'", c.Hostname, query)

return rows, nil
}
Expand All @@ -100,26 +100,26 @@ func (c *Conn) Exec(query string) error {
}

dsn := c.makeDsn()
glog.V(1).Infof("Exec ClickHouse DSN: %s", dsn)
//glog.V(1).Infof("Exec ClickHouse DSN: %s", dsn)
connect, err := sql.Open("clickhouse", dsn)
if err != nil {
glog.V(1).Infof("E1 %v", err)
glog.V(1).Infof("sql.Open(%s) FAILED %v", dsn, err)
return err
}

if err := connect.Ping(); err != nil {
glog.V(1).Infof("E2 %v", err)
glog.V(1).Infof("connect.Ping(%d) FAILED %v", dsn, err)
return err
}

_, err = connect.Exec(query)

if err != nil {
glog.V(1).Infof("E3 %v", err)
glog.V(1).Infof("connect.Exec(%s) FAILED %v", dsn, err)
return err
}

glog.V(1).Infof("clickhouse.Exec(%s)'%s'", c.Hostname, query)
// glog.V(1).Infof("clickhouse.Exec(%s):'%s'", c.Hostname, query)

return nil
}
103 changes: 66 additions & 37 deletions pkg/model/schemer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,22 @@
package model

import (
"fmt"
sqlmodule "database/sql"

chi "github.com/altinity/clickhouse-operator/pkg/apis/clickhouse.altinity.com/v1"
"github.com/altinity/clickhouse-operator/pkg/model/clickhouse"
"github.com/altinity/clickhouse-operator/pkg/util"

"github.com/MakeNowJust/heredoc"
"github.com/golang/glog"
"time"
)

const (
// Comma-separated ''-enclosed list of database names to be ignored
ignoredDBs = "'system'"

// Max number of retries for SQL queries
maxRetries = 10
// Max number of tries for SQL queries
maxTries = 10
)

type Schemer struct {
Expand All @@ -48,25 +51,36 @@ func (s *Schemer) newConn(hostname string) *clickhouse.Conn {
return clickhouse.New(hostname, s.Username, s.Password, s.Port)
}

// ClusterGetCreateDatabases returns set of 'CREATE DATABASE ...' SQLs
// ClusterGetCreateDatabases returns list of DB names and list of 'CREATE DATABASE ...' SQLs for these DBs
func (s *Schemer) ClusterGetCreateDatabases(chi *chi.ClickHouseInstallation, cluster *chi.ChiCluster) ([]string, []string, error) {
sql := `
// Results
dbNames := make([]string, 0)
createStatements := make([]string, 0)

sql := heredoc.Docf(`
SELECT
distinct name AS name,
concat('CREATE DATABASE IF NOT EXISTS ', name) AS create_db_query
FROM cluster('%s', system, databases)
WHERE name not in (%s)
ORDER BY name
SETTINGS skip_unavailable_shards = 1`
sql = fmt.Sprintf(sql, cluster.Name, ignoredDBs)
SETTINGS skip_unavailable_shards = 1
`,
cluster.Name,
ignoredDBs)

dbNames := make([]string, 0)
createStatements := make([]string, 0)
glog.V(1).Info(CreateChiServiceFQDN(chi))
conn := s.newConn(CreateChiServiceFQDN(chi))
if rows, err := conn.Query(sql); err != nil {
var rows *sqlmodule.Rows = nil
var err error
err = util.Retry(maxTries, sql, func() error {
rows, err = conn.Query(sql)
return err
})
if err != nil {
return nil, nil, err
} else {
// Some data fetched
for rows.Next() {
var name, create string
if err := rows.Scan(&name, &create); err == nil {
Expand All @@ -80,26 +94,37 @@ func (s *Schemer) ClusterGetCreateDatabases(chi *chi.ClickHouseInstallation, clu
return dbNames, createStatements, nil
}

// ClusterGetCreateTables returns set of 'CREATE TABLE ...' SQLs
// ClusterGetCreateTables returns list of table names and list of 'CREATE TABLE ...' SQLs for these tables
func (s *Schemer) ClusterGetCreateTables(chi *chi.ClickHouseInstallation, cluster *chi.ChiCluster) ([]string, []string, error) {
sql := `
// Results
tableNames := make([]string, 0)
createStatements := make([]string, 0)

sql := heredoc.Docf(`
SELECT
distinct name,
replaceRegexpOne(create_table_query, 'CREATE (TABLE|VIEW|MATERIALIZED VIEW)', 'CREATE \\1 IF NOT EXISTS')
FROM cluster('%s', system, tables)
WHERE database not in (%s)
AND name not like '.inner.%%'
ORDER BY multiIf(engine not in ('Distributed', 'View', 'MaterializedView'), 1, engine = 'MaterializedView', 2, engine = 'Distributed', 3, 4), name
SETTINGS skip_unavailable_shards = 1`
sql = fmt.Sprintf(sql, cluster.Name, ignoredDBs)
SETTINGS skip_unavailable_shards = 1
`,
cluster.Name,
ignoredDBs)

tableNames := make([]string, 0)
createStatements := make([]string, 0)
glog.V(1).Info(CreateChiServiceFQDN(chi))
conn := s.newConn(CreateChiServiceFQDN(chi))
if rows, err := conn.Query(sql); err != nil {
var rows *sqlmodule.Rows = nil
var err error
err = util.Retry(maxTries, sql, func() error {
rows, err = conn.Query(sql)
return err
})
if err != nil {
return nil, nil, err
} else {
// Some data fetched
for rows.Next() {
var name, create string
if err := rows.Scan(&name, &create); err == nil {
Expand All @@ -115,25 +140,34 @@ func (s *Schemer) ClusterGetCreateTables(chi *chi.ClickHouseInstallation, cluste

// ReplicaGetDropTables returns set of 'DROP TABLE ...' SQLs
func (s *Schemer) ReplicaGetDropTables(replica *chi.ChiReplica) ([]string, []string, error) {
// Results
tableNames := make([]string, 0)
dropStatements := make([]string, 0)

// There isn't a separate query for deleting views. To delete a view, use DROP TABLE
// See https://clickhouse.yandex/docs/en/query_language/create/

sql := `
sql := heredoc.Docf(`
SELECT
distinct name,
concat('DROP TABLE IF EXISTS ', database, '.', name)
FROM system.tables
WHERE database not in (%s)
AND engine like 'Replicated%%'`
sql = fmt.Sprintf(sql, ignoredDBs)
AND engine like 'Replicated%%'
`,
ignoredDBs)

tableNames := make([]string, 0)
dropStatements := make([]string, 0)
glog.V(1).Info(CreatePodFQDN(replica))
conn := s.newConn(CreatePodFQDN(replica))
if rows, err := conn.Query(sql); err != nil {
var rows *sqlmodule.Rows = nil
var err error
err = util.Retry(maxTries, sql, func() error {
rows, err = conn.Query(sql)
return err
})
if err != nil {
return nil, nil, err
} else {
// Some data fetched
for rows.Next() {
var name, create string
if err := rows.Scan(&name, &create); err == nil {
Expand Down Expand Up @@ -182,18 +216,13 @@ func (s *Schemer) applySQLs(hosts []string, sqls []string, retry bool) error {
// Skip malformed SQL query, move to the next SQL query
continue
}
// Now retry this SQL query on particular host
for retryCount := 0; retryCount < maxRetries; retryCount++ {
glog.V(1).Infof("applySQL(%s)", sql)
err = conn.Exec(sql)
if (err == nil) || !retry {
// Either all is good or we are not interested in retries anyway
// Move on to the next SQL query on this host
break
}
glog.V(1).Infof("attempt %d of %d failed, sleep and retry", retryCount, maxRetries)
seconds := (retryCount + 1) * 5
time.Sleep(time.Duration(seconds) * time.Second)
err = util.Retry(maxTries, sql, func() error {
return conn.Exec(sql)
})
if err != nil {
// Do not run any more SQL queries on this host in case of failure
// Move to next host
break
}
}
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/util/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2019 Altinity Ltd and/or its affiliates. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"github.com/golang/glog"
"time"
)

// Retry
func Retry(tries int, desc string, f func() error) error {
var err error
for try := 1; try <= tries; try++ {
err = f()
if err == nil {
// All ok, no need to retry more
if try > 1 {
// Done, but after some retries, this is not 'clean'
glog.V(1).Infof("attempt %d of %d is finally DONE: %s", try, tries, desc)
}
return nil
}
if try < tries {
// Try failed, need to sleep and retry
seconds := try * 5
glog.V(1).Infof("attempt %d of %d FAILED, sleep %d sec and retry: %s", try, tries, seconds, desc)
time.Sleep(time.Duration(seconds) * time.Second)
} else {
// On last try no need to wait more
glog.V(1).Infof("all %d attempts FAILED, ABORT retry: %s", tries, desc)
}
}
return err
}
13 changes: 13 additions & 0 deletions vendor/github.com/MakeNowJust/heredoc/.circleci/config.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions vendor/github.com/MakeNowJust/heredoc/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 619842e

Please sign in to comment.