Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Implemented querying concurrent schema #45

Closed
16 changes: 10 additions & 6 deletions go/cmd/ccql/main.go
Expand Up @@ -15,6 +15,7 @@ import (

golib_log "github.com/outbrain/golib/log"
"gopkg.in/gcfg.v1"
"strings"
)

var AppVersion string
Expand All @@ -34,12 +35,13 @@ func main() {
osUser = usr.Username
}

osUser = "root"
help := flag.Bool("help", false, "Display usage")
user := flag.String("u", osUser, "MySQL username")
password := flag.String("p", "", "MySQL password")
askPassword := flag.Bool("ask-pass", false, "prompt for MySQL password")
credentialsFile := flag.String("C", "", "Credentials file, expecting [client] scope, with 'user', 'password' fields. Overrides -u and -p")
defaultSchema := flag.String("d", "information_schema", "Default schema to use")
schemaList := flag.String("s", "information_schema", "List of Schema to query from.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking, incompatible change. Please avoid doing this. Find a backwards compatible way.

hostsList := flag.String("h", "", "Comma or space delimited list of hosts in hostname[:port] format. If not given, hosts read from stdin")
hostsFile := flag.String("H", "", "Hosts file, hostname[:port] comma or space or newline delimited format. If not given, hosts read from stdin")
queriesText := flag.String("q", "", "Query/queries to execute")
Expand Down Expand Up @@ -94,9 +96,9 @@ func main() {
if *credentialsFile != "" {
mySQLConfig := struct {
Client struct {
User string
Password string
}
User string
Password string
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gofmt will fail on this change of indentation.

}{}
gcfg.RelaxedParserMode = true
err := gcfg.ReadFileInto(&mySQLConfig, *credentialsFile)
Expand All @@ -117,7 +119,9 @@ func main() {
*password = string(passwd)
}

if err := logic.QueryHosts(hosts, *user, *password, *defaultSchema, queries, *maxConcurrency, *timeout); err != nil {
schemas := strings.Split(*schemaList, ",")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please handle whitespace


if err := logic.QuerySchemas(hosts, *user, *password, schemas, queries, *maxConcurrency, *timeout); err != nil {
os.Exit(1)
}
}
}
45 changes: 44 additions & 1 deletion go/logic/ccql.go
Expand Up @@ -11,6 +11,7 @@ import (
// queryHost connects to a given host, issues the given set of queries, and outputs the results
// line per row in tab delimited format
func queryHost(host string, user string, password string, defaultSchema string, queries []string, timeout float64) error {
log.Println("Running for schema", defaultSchema)
mysqlURI := fmt.Sprintf("%s:%s@tcp(%s)/%s?timeout=%fs", user, password, host, defaultSchema, timeout)
db, _, err := sqlutils.GetDB(mysqlURI)
if err != nil {
Expand All @@ -28,7 +29,7 @@ func queryHost(host string, user string, password string, defaultSchema string,
output = append(output, rowCell.String)
}
rowOutput := strings.Join(output, "\t")
fmt.Println(rowOutput)
fmt.Println(defaultSchema, rowOutput)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking change

}
}
return nil
Expand Down Expand Up @@ -57,3 +58,45 @@ func QueryHosts(hosts []string, user string, password string, defaultSchema stri
}
return anyError
}

func QuerySchemas(hosts []string, user string, password string, schemas []string, queries []string, maxConcurrency uint, timeout float64) (anyError error) {
concurrentHosts := make(chan bool, maxConcurrency)
completedHosts := make(chan bool)

concurrentSchemas := make(chan bool, maxConcurrency)
completedSchemas := make(chan bool)

for _, host := range hosts {
go func(host string) {
concurrentHosts <- true
//For each host, run all queries for the respective schema
for _, schema := range schemas {
go func(schema string) {
concurrentSchemas <- true
if err := queryHost(host, user, password, schema, queries, timeout); err != nil {
anyError = err
log.Printf("%s %s", host, err.Error())
}
<-concurrentSchemas
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the <- should be in a defer function call at the beginning of this routine

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I encourage you to use WaitGroup

completedSchemas <- true
}(schema)
}

// Barrier. Wait for all to complete
for range schemas {
<-completedSchemas
}

<-concurrentHosts

completedHosts <- true
}(host)
}
// Barrier. Wait for all to complete
for range hosts {
<-completedHosts
}

return anyError
}