Skip to content
This repository was archived by the owner on Nov 24, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions lib/go-tc/deliveryservice_ssl_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func (v *DeliveryServiceSSLKeys) UnmarshalJSON(b []byte) (err error) {
return err
}

type RiakPingResp struct {
Status string `json:"status"`
Server string `json:"server"`
}

// DNSSECKeys is the DNSSEC keys object stored in Riak. The map key strings are both DeliveryServiceNames and CDNNames.
type DNSSECKeys map[string]DNSSECKeySet

Expand All @@ -94,14 +99,14 @@ type DNSSECKeySet struct {
}

type DNSSECKey struct {
InceptionDateUnix int64 `json:"inceptionDate"`
ExpirationDateUnix int64 `json:"expirationDate"`
Name string `json:"name"`
TTLSeconds uint64 `json:"ttl,string"`
Status string `json:"status"`
EffectiveDateUnix int64 `json:"effectiveDate"`
Public string `json:"public"`
Private string `json:"private"`
InceptionDateUnix int64 `json:"inceptionDate"`
ExpirationDateUnix int64 `json:"expirationDate"`
Name string `json:"name"`
TTLSeconds uint64 `json:"ttl,string"`
Status string `json:"status"`
EffectiveDateUnix int64 `json:"effectiveDate"`
Public string `json:"public"`
Private string `json:"private"`
DSRecord *DNSSECKeyDSRecord `json:"dsRecord,omitempty"`
}

Expand Down
45 changes: 45 additions & 0 deletions traffic_ops/traffic_ops_golang/ping/keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package ping

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import (
"errors"
"net/http"

"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/api"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/riaksvc"
)

func Keys(w http.ResponseWriter, r *http.Request) {
inf, userErr, sysErr, errCode := api.NewInfo(r, nil, nil)
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, errCode, userErr, sysErr)
return
}
defer inf.Close()

pingResp, err := riaksvc.Ping(inf.Tx.Tx, inf.Config.RiakAuthOptions)
if err != nil {
api.HandleErr(w, r, http.StatusInternalServerError, nil, errors.New("error pinging Riak keys: "+err.Error()))
return
}
*inf.CommitTx = true
api.WriteResp(w, r, pingResp.Status)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. Is this function supposed to be identical to the Riak function? Am I just missing something?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Identical, except it directly returns the status string. Yes, it reproduces the existing API. I make no warranty as to the quality or sanity of the existing API.

45 changes: 45 additions & 0 deletions traffic_ops/traffic_ops_golang/ping/riak.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package ping

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import (
"errors"
"net/http"

"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/api"
"github.com/apache/trafficcontrol/traffic_ops/traffic_ops_golang/riaksvc"
)

func Riak(w http.ResponseWriter, r *http.Request) {
inf, userErr, sysErr, errCode := api.NewInfo(r, nil, nil)
if userErr != nil || sysErr != nil {
api.HandleErr(w, r, errCode, userErr, sysErr)
return
}
defer inf.Close()

pingResp, err := riaksvc.Ping(inf.Tx.Tx, inf.Config.RiakAuthOptions)
if err != nil {
api.HandleErr(w, r, http.StatusInternalServerError, nil, errors.New("error pinging Riak: "+err.Error()))
return
}
*inf.CommitTx = true
api.WriteResp(w, r, pingResp)
}
31 changes: 31 additions & 0 deletions traffic_ops/traffic_ops_golang/riaksvc/dsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,37 @@ func PutDeliveryServiceSSLKeysObjTx(key tc.DeliveryServiceSSLKeys, tx *sql.Tx, a
return err
}

func Ping(tx *sql.Tx, authOpts *riak.AuthOptions) (tc.RiakPingResp, error) {
servers, err := GetRiakServers(tx)
if err != nil {
return tc.RiakPingResp{}, errors.New("getting riak servers: " + err.Error())
}
log.Errorf("DEBUG: GetRiakServers got: %+v\n", servers)
for _, server := range servers {
cluster, err := RiakServersToCluster([]ServerAddr{server}, authOpts)
if err != nil {
log.Errorf("RiakServersToCluster error for server %+v: %+v\n", server, err.Error())
continue // try another server
}
if err = cluster.Start(); err != nil {
log.Errorln("starting Riak cluster (for ping): " + err.Error())
continue
}
if err := PingCluster(cluster); err != nil {
if err := cluster.Stop(); err != nil {
log.Errorln("stopping Riak cluster (after ping error): " + err.Error())
}
log.Errorf("Riak PingCluster error for server %+v: %+v\n", server, err.Error())
continue
}
if err := cluster.Stop(); err != nil {
log.Errorln("stopping Riak cluster (after ping success): " + err.Error())
}
return tc.RiakPingResp{Status: "OK", Server: server.FQDN + ":" + server.Port}, nil
}
return tc.RiakPingResp{}, errors.New("failed to ping any Riak server")
}

func GetDNSSECKeys(cdnName string, tx *sql.Tx, authOpts *riak.AuthOptions) (tc.DNSSECKeys, bool, error) {
key := tc.DNSSECKeys{}
found := false
Expand Down
142 changes: 92 additions & 50 deletions traffic_ops/traffic_ops_golang/riaksvc/riak_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"strconv"
"time"

"github.com/apache/trafficcontrol/lib/go-log"
Expand Down Expand Up @@ -114,6 +115,32 @@ func DeleteObject(key string, bucket string, cluster StorageCluster) error {
return nil
}

// PingCluster pings the given Riak cluster, and returns nil on success, or any error
func PingCluster(cluster StorageCluster) error {
if cluster == nil {
return errors.New("ERROR: No valid cluster on which to execute a command")
}
pingCommandBuilder := riak.PingCommandBuilder{}
iCmd, err := pingCommandBuilder.Build()
if err != nil {
return errors.New("building riak ping command: " + err.Error())
}
if err := cluster.Execute(iCmd); err != nil {
return errors.New("executing riak ping command: " + err.Error())
}
cmd, ok := iCmd.(*riak.PingCommand)
if !ok {
return fmt.Errorf("unexpected riak command type: %T", iCmd)
}
if err := cmd.Error(); err != nil {
return errors.New("riak ping command returned error: " + err.Error())
}
if !cmd.Success() {
return errors.New("riak ping command returned failure, but no error")
}
return nil
}

// fetch an object from riak storage
func FetchObjectValues(key string, bucket string, cluster StorageCluster) ([]*riak.Object, error) {
if cluster == nil {
Expand Down Expand Up @@ -166,6 +193,65 @@ func SaveObject(obj *riak.Object, bucket string, cluster StorageCluster) error {
return nil
}

type ServerAddr struct {
FQDN string
Port string
}

func GetRiakServers(tx *sql.Tx) ([]ServerAddr, error) {
rows, err := tx.Query(`
SELECT CONCAT(s.host_name, '.', s.domain_name) FROM server s
JOIN type t ON s.type = t.id
JOIN status st ON s.status = st.id
WHERE t.name = 'RIAK' AND st.name = 'ONLINE'
`)
if err != nil {
return nil, errors.New("querying riak servers: " + err.Error())
}
defer rows.Close()
servers := []ServerAddr{}
portStr := strconv.Itoa(RiakPort)
for rows.Next() {
s := ServerAddr{Port: portStr}
if err := rows.Scan(&s.FQDN); err != nil {
return nil, errors.New("scanning riak servers: " + err.Error())
}
servers = append(servers, s)
}
return servers, nil
}

func RiakServersToCluster(servers []ServerAddr, authOptions *riak.AuthOptions) (StorageCluster, error) {
if authOptions == nil {
return nil, errors.New("ERROR: no riak auth information from riak.conf, cannot authenticate to any riak servers")
}
nodes := []*riak.Node{}
for _, srv := range servers {
nodeOpts := &riak.NodeOptions{
RemoteAddress: srv.FQDN + ":" + srv.Port,
AuthOptions: authOptions,
}
nodeOpts.AuthOptions.TlsConfig.ServerName = srv.FQDN
node, err := riak.NewNode(nodeOpts)
if err != nil {
return nil, errors.New("creating riak node: " + err.Error())
}
nodes = append(nodes, node)
}
if len(nodes) == 0 {
return nil, errors.New("ERROR: no available riak servers")
}
opts := &riak.ClusterOptions{
Nodes: nodes,
ExecutionAttempts: MaxCommandExecutionAttempts,
}
cluster, err := riak.NewCluster(opts)
if err != nil {
return nil, errors.New("creating riak cluster: " + err.Error())
}
return RiakStorageCluster{Cluster: cluster}, nil
}

// returns a riak cluster of online riak nodes.
func GetRiakCluster(db *sql.DB, authOptions *riak.AuthOptions) (StorageCluster, error) {
riakServerQuery := `
Expand All @@ -188,7 +274,6 @@ func GetRiakCluster(db *sql.DB, authOptions *riak.AuthOptions) (StorageCluster,

for rows.Next() {
var s tc.Server
var n *riak.Node
if err := rows.Scan(&s.HostName, &s.DomainName); err != nil {
return nil, err
}
Expand Down Expand Up @@ -253,59 +338,16 @@ func WithClusterX(db *sqlx.DB, authOpts *riak.AuthOptions, f func(StorageCluster
return f(cluster)
}

// returns a riak cluster of online riak nodes.
func GetRiakClusterTx(tx *sql.Tx, authOptions *riak.AuthOptions) (StorageCluster, error) {
// TODO remove duplication with GetRiakCluster

riakServerQuery := `
SELECT s.host_name, s.domain_name FROM server s
INNER JOIN type t on s.type = t.id
INNER JOIN status st on s.status = st.id
WHERE t.name = 'RIAK' AND st.name = 'ONLINE'
`

if authOptions == nil {
return nil, errors.New("ERROR: no riak auth information from riak.conf, cannot authenticate to any riak servers")
}

var nodes []*riak.Node
rows, err := tx.Query(riakServerQuery)
servers, err := GetRiakServers(tx)
if err != nil {
return nil, err
return nil, errors.New("getting riak servers: " + err.Error())
}
defer rows.Close()

for rows.Next() {
var s tc.Server
var n *riak.Node
if err := rows.Scan(&s.HostName, &s.DomainName); err != nil {
return nil, err
}
addr := fmt.Sprintf("%s.%s:%d", s.HostName, s.DomainName, RiakPort)
nodeOpts := &riak.NodeOptions{
RemoteAddress: addr,
AuthOptions: authOptions,
}
nodeOpts.AuthOptions.TlsConfig.ServerName = fmt.Sprintf("%s.%s", s.HostName, s.DomainName)
n, err := riak.NewNode(nodeOpts)
if err != nil {
return nil, err
}
nodes = append(nodes, n)
}

if len(nodes) == 0 {
return nil, errors.New("ERROR: no available riak servers")
}

opts := &riak.ClusterOptions{
Nodes: nodes,
ExecutionAttempts: MaxCommandExecutionAttempts,
cluster, err := RiakServersToCluster(servers, authOptions)
if err != nil {
return nil, errors.New("creating riak cluster from servers: " + err.Error())
}

cluster, err := riak.NewCluster(opts)

return RiakStorageCluster{Cluster: cluster}, err
return cluster, nil
}

func WithClusterTx(tx *sql.Tx, authOpts *riak.AuthOptions, f func(StorageCluster) error) error {
Expand Down
2 changes: 2 additions & 0 deletions traffic_ops/traffic_ops_golang/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ func Routes(d ServerData) ([]Route, []RawRoute, http.Handler, error) {

//Ping
{1.1, http.MethodGet, `ping$`, ping.PingHandler(), 0, NoAuth, nil},
{1.1, http.MethodGet, `riak/ping/?(\.json)?$`, ping.Riak, auth.PrivLevelReadOnly, Authenticated, nil},
{1.1, http.MethodGet, `keys/ping/?(\.json)?$`, ping.Keys, auth.PrivLevelReadOnly, Authenticated, nil},

//Profile: CRUD
{1.1, http.MethodGet, `profiles/?(\.json)?$`, api.ReadHandler(profile.GetTypeSingleton()), auth.PrivLevelReadOnly, Authenticated, nil},
Expand Down