diff --git a/lib/go-tc/deliveryservice_ssl_keys.go b/lib/go-tc/deliveryservice_ssl_keys.go index 830bebe998..1eddccaa79 100644 --- a/lib/go-tc/deliveryservice_ssl_keys.go +++ b/lib/go-tc/deliveryservice_ssl_keys.go @@ -85,6 +85,11 @@ func (v *DeliveryServiceSSLKeys) UnmarshalJSON(b []byte) (err error) { return err } +type RiakPingResp struct { + Status string `json:"status"` + Server string `json:"server"` +} + // DNSSECKeys is the DNSSEC keys object stored in Riak. The map key strings are both DeliveryServiceNames and CDNNames. type DNSSECKeys map[string]DNSSECKeySet @@ -94,14 +99,14 @@ type DNSSECKeySet struct { } type DNSSECKey struct { - InceptionDateUnix int64 `json:"inceptionDate"` - ExpirationDateUnix int64 `json:"expirationDate"` - Name string `json:"name"` - TTLSeconds uint64 `json:"ttl,string"` - Status string `json:"status"` - EffectiveDateUnix int64 `json:"effectiveDate"` - Public string `json:"public"` - Private string `json:"private"` + InceptionDateUnix int64 `json:"inceptionDate"` + ExpirationDateUnix int64 `json:"expirationDate"` + Name string `json:"name"` + TTLSeconds uint64 `json:"ttl,string"` + Status string `json:"status"` + EffectiveDateUnix int64 `json:"effectiveDate"` + Public string `json:"public"` + Private string `json:"private"` DSRecord *DNSSECKeyDSRecord `json:"dsRecord,omitempty"` } diff --git a/traffic_ops/traffic_ops_golang/ping/keys.go b/traffic_ops/traffic_ops_golang/ping/keys.go new file mode 100644 index 0000000000..000197187a --- /dev/null +++ b/traffic_ops/traffic_ops_golang/ping/keys.go @@ -0,0 +1,45 @@ +package ping + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import ( + "errors" + "net/http" + + "github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/api" + "github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/riaksvc" +) + +func Keys(w http.ResponseWriter, r *http.Request) { + inf, userErr, sysErr, errCode := api.NewInfo(r, nil, nil) + if userErr != nil || sysErr != nil { + api.HandleErr(w, r, errCode, userErr, sysErr) + return + } + defer inf.Close() + + pingResp, err := riaksvc.Ping(inf.Tx.Tx, inf.Config.RiakAuthOptions) + if err != nil { + api.HandleErr(w, r, http.StatusInternalServerError, nil, errors.New("error pinging Riak keys: "+err.Error())) + return + } + *inf.CommitTx = true + api.WriteResp(w, r, pingResp.Status) +} diff --git a/traffic_ops/traffic_ops_golang/ping/riak.go b/traffic_ops/traffic_ops_golang/ping/riak.go new file mode 100644 index 0000000000..9058ff5ab8 --- /dev/null +++ b/traffic_ops/traffic_ops_golang/ping/riak.go @@ -0,0 +1,45 @@ +package ping + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import ( + "errors" + "net/http" + + "github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/api" + "github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/riaksvc" +) + +func Riak(w http.ResponseWriter, r *http.Request) { + inf, userErr, sysErr, errCode := api.NewInfo(r, nil, nil) + if userErr != nil || sysErr != nil { + api.HandleErr(w, r, errCode, userErr, sysErr) + return + } + defer inf.Close() + + pingResp, err := riaksvc.Ping(inf.Tx.Tx, inf.Config.RiakAuthOptions) + if err != nil { + api.HandleErr(w, r, http.StatusInternalServerError, nil, errors.New("error pinging Riak: "+err.Error())) + return + } + *inf.CommitTx = true + api.WriteResp(w, r, pingResp) +} diff --git a/traffic_ops/traffic_ops_golang/riaksvc/dsutil.go b/traffic_ops/traffic_ops_golang/riaksvc/dsutil.go index 9a62735336..98b9186680 100644 --- a/traffic_ops/traffic_ops_golang/riaksvc/dsutil.go +++ b/traffic_ops/traffic_ops_golang/riaksvc/dsutil.go @@ -135,6 +135,37 @@ func PutDeliveryServiceSSLKeysObjTx(key tc.DeliveryServiceSSLKeys, tx *sql.Tx, a return err } +func Ping(tx *sql.Tx, authOpts *riak.AuthOptions) (tc.RiakPingResp, error) { + servers, err := GetRiakServers(tx) + if err != nil { + return tc.RiakPingResp{}, errors.New("getting riak servers: " + err.Error()) + } + log.Errorf("DEBUG: GetRiakServers got: %+v\n", servers) + for _, server := range servers { + cluster, err := RiakServersToCluster([]ServerAddr{server}, authOpts) + if err != nil { + log.Errorf("RiakServersToCluster error for server %+v: %+v\n", server, err.Error()) + continue // try another server + } + if err = cluster.Start(); err != nil { + log.Errorln("starting Riak cluster (for ping): " + err.Error()) + continue + } + if err := PingCluster(cluster); err != nil { + if err := cluster.Stop(); err != nil { + log.Errorln("stopping Riak cluster (after ping error): " + err.Error()) + } + log.Errorf("Riak PingCluster error for server %+v: %+v\n", server, err.Error()) + continue + } + if err := cluster.Stop(); err != nil { + log.Errorln("stopping Riak cluster (after ping success): " + err.Error()) + } + return tc.RiakPingResp{Status: "OK", Server: server.FQDN + ":" + server.Port}, nil + } + return tc.RiakPingResp{}, errors.New("failed to ping any Riak server") +} + func GetDNSSECKeys(cdnName string, tx *sql.Tx, authOpts *riak.AuthOptions) (tc.DNSSECKeys, bool, error) { key := tc.DNSSECKeys{} found := false diff --git a/traffic_ops/traffic_ops_golang/riaksvc/riak_services.go b/traffic_ops/traffic_ops_golang/riaksvc/riak_services.go index fd8a699b0a..08a3248ee8 100644 --- a/traffic_ops/traffic_ops_golang/riaksvc/riak_services.go +++ b/traffic_ops/traffic_ops_golang/riaksvc/riak_services.go @@ -26,6 +26,7 @@ import ( "errors" "fmt" "io/ioutil" + "strconv" "time" "github.com/apache/trafficcontrol/lib/go-log" @@ -114,6 +115,32 @@ func DeleteObject(key string, bucket string, cluster StorageCluster) error { return nil } +// PingCluster pings the given Riak cluster, and returns nil on success, or any error +func PingCluster(cluster StorageCluster) error { + if cluster == nil { + return errors.New("ERROR: No valid cluster on which to execute a command") + } + pingCommandBuilder := riak.PingCommandBuilder{} + iCmd, err := pingCommandBuilder.Build() + if err != nil { + return errors.New("building riak ping command: " + err.Error()) + } + if err := cluster.Execute(iCmd); err != nil { + return errors.New("executing riak ping command: " + err.Error()) + } + cmd, ok := iCmd.(*riak.PingCommand) + if !ok { + return fmt.Errorf("unexpected riak command type: %T", iCmd) + } + if err := cmd.Error(); err != nil { + return errors.New("riak ping command returned error: " + err.Error()) + } + if !cmd.Success() { + return errors.New("riak ping command returned failure, but no error") + } + return nil +} + // fetch an object from riak storage func FetchObjectValues(key string, bucket string, cluster StorageCluster) ([]*riak.Object, error) { if cluster == nil { @@ -166,6 +193,65 @@ func SaveObject(obj *riak.Object, bucket string, cluster StorageCluster) error { return nil } +type ServerAddr struct { + FQDN string + Port string +} + +func GetRiakServers(tx *sql.Tx) ([]ServerAddr, error) { + rows, err := tx.Query(` +SELECT CONCAT(s.host_name, '.', s.domain_name) FROM server s +JOIN type t ON s.type = t.id +JOIN status st ON s.status = st.id +WHERE t.name = 'RIAK' AND st.name = 'ONLINE' +`) + if err != nil { + return nil, errors.New("querying riak servers: " + err.Error()) + } + defer rows.Close() + servers := []ServerAddr{} + portStr := strconv.Itoa(RiakPort) + for rows.Next() { + s := ServerAddr{Port: portStr} + if err := rows.Scan(&s.FQDN); err != nil { + return nil, errors.New("scanning riak servers: " + err.Error()) + } + servers = append(servers, s) + } + return servers, nil +} + +func RiakServersToCluster(servers []ServerAddr, authOptions *riak.AuthOptions) (StorageCluster, error) { + if authOptions == nil { + return nil, errors.New("ERROR: no riak auth information from riak.conf, cannot authenticate to any riak servers") + } + nodes := []*riak.Node{} + for _, srv := range servers { + nodeOpts := &riak.NodeOptions{ + RemoteAddress: srv.FQDN + ":" + srv.Port, + AuthOptions: authOptions, + } + nodeOpts.AuthOptions.TlsConfig.ServerName = srv.FQDN + node, err := riak.NewNode(nodeOpts) + if err != nil { + return nil, errors.New("creating riak node: " + err.Error()) + } + nodes = append(nodes, node) + } + if len(nodes) == 0 { + return nil, errors.New("ERROR: no available riak servers") + } + opts := &riak.ClusterOptions{ + Nodes: nodes, + ExecutionAttempts: MaxCommandExecutionAttempts, + } + cluster, err := riak.NewCluster(opts) + if err != nil { + return nil, errors.New("creating riak cluster: " + err.Error()) + } + return RiakStorageCluster{Cluster: cluster}, nil +} + // returns a riak cluster of online riak nodes. func GetRiakCluster(db *sql.DB, authOptions *riak.AuthOptions) (StorageCluster, error) { riakServerQuery := ` @@ -188,7 +274,6 @@ func GetRiakCluster(db *sql.DB, authOptions *riak.AuthOptions) (StorageCluster, for rows.Next() { var s tc.Server - var n *riak.Node if err := rows.Scan(&s.HostName, &s.DomainName); err != nil { return nil, err } @@ -253,59 +338,16 @@ func WithClusterX(db *sqlx.DB, authOpts *riak.AuthOptions, f func(StorageCluster return f(cluster) } -// returns a riak cluster of online riak nodes. func GetRiakClusterTx(tx *sql.Tx, authOptions *riak.AuthOptions) (StorageCluster, error) { - // TODO remove duplication with GetRiakCluster - - riakServerQuery := ` - SELECT s.host_name, s.domain_name FROM server s - INNER JOIN type t on s.type = t.id - INNER JOIN status st on s.status = st.id - WHERE t.name = 'RIAK' AND st.name = 'ONLINE' - ` - - if authOptions == nil { - return nil, errors.New("ERROR: no riak auth information from riak.conf, cannot authenticate to any riak servers") - } - - var nodes []*riak.Node - rows, err := tx.Query(riakServerQuery) + servers, err := GetRiakServers(tx) if err != nil { - return nil, err + return nil, errors.New("getting riak servers: " + err.Error()) } - defer rows.Close() - - for rows.Next() { - var s tc.Server - var n *riak.Node - if err := rows.Scan(&s.HostName, &s.DomainName); err != nil { - return nil, err - } - addr := fmt.Sprintf("%s.%s:%d", s.HostName, s.DomainName, RiakPort) - nodeOpts := &riak.NodeOptions{ - RemoteAddress: addr, - AuthOptions: authOptions, - } - nodeOpts.AuthOptions.TlsConfig.ServerName = fmt.Sprintf("%s.%s", s.HostName, s.DomainName) - n, err := riak.NewNode(nodeOpts) - if err != nil { - return nil, err - } - nodes = append(nodes, n) - } - - if len(nodes) == 0 { - return nil, errors.New("ERROR: no available riak servers") - } - - opts := &riak.ClusterOptions{ - Nodes: nodes, - ExecutionAttempts: MaxCommandExecutionAttempts, + cluster, err := RiakServersToCluster(servers, authOptions) + if err != nil { + return nil, errors.New("creating riak cluster from servers: " + err.Error()) } - - cluster, err := riak.NewCluster(opts) - - return RiakStorageCluster{Cluster: cluster}, err + return cluster, nil } func WithClusterTx(tx *sql.Tx, authOpts *riak.AuthOptions, f func(StorageCluster) error) error { diff --git a/traffic_ops/traffic_ops_golang/routes.go b/traffic_ops/traffic_ops_golang/routes.go index c33e666282..62cf182bf4 100644 --- a/traffic_ops/traffic_ops_golang/routes.go +++ b/traffic_ops/traffic_ops_golang/routes.go @@ -165,6 +165,8 @@ func Routes(d ServerData) ([]Route, []RawRoute, http.Handler, error) { //Ping {1.1, http.MethodGet, `ping$`, ping.PingHandler(), 0, NoAuth, nil}, + {1.1, http.MethodGet, `riak/ping/?(\.json)?$`, ping.Riak, auth.PrivLevelReadOnly, Authenticated, nil}, + {1.1, http.MethodGet, `keys/ping/?(\.json)?$`, ping.Keys, auth.PrivLevelReadOnly, Authenticated, nil}, //Profile: CRUD {1.1, http.MethodGet, `profiles/?(\.json)?$`, api.ReadHandler(profile.GetTypeSingleton()), auth.PrivLevelReadOnly, Authenticated, nil},