Skip to content

Commit

Permalink
Move conf.go into its own package, hadoopconf
Browse files Browse the repository at this point in the history
Tweak some interfaces along the way. Also, add some code to ignore
logical cluster names.

Fixes #127.
  • Loading branch information
colinmarc committed Aug 5, 2018
1 parent 7ed2427 commit 59eb6f6
Show file tree
Hide file tree
Showing 10 changed files with 188 additions and 134 deletions.
35 changes: 21 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

hdfs "github.com/colinmarc/hdfs/internal/protocol/hadoop_hdfs"
"github.com/colinmarc/hdfs/internal/rpc"

"github.com/colinmarc/hdfs/hadoopconf"
krb "gopkg.in/jcmturner/gokrb5.v5/client"
)

Expand Down Expand Up @@ -92,14 +94,13 @@ type ClientOptions struct {
// actually configured, you should check for whether KerberosClient is set in
// the resulting ClientOptions before proceeding:
//
// options, _ := ClientOptionsFromConf(conf)
// options := ClientOptionsFromConf(conf)
// if options.KerberosClient != nil {
// // Replace with a valid credentialed client.
// options.KerberosClient = getKerberosClient()
// }
func ClientOptionsFromConf(conf HadoopConf) (ClientOptions, error) {
namenodes, err := conf.Namenodes()
options := ClientOptions{Addresses: namenodes}
func ClientOptionsFromConf(conf hadoopconf.HadoopConf) ClientOptions {
options := ClientOptions{Addresses: conf.Namenodes()}

options.UseDatanodeHostname = (conf["dfs.client.use.datanode.hostname"] == "true")

Expand All @@ -114,7 +115,7 @@ func ClientOptionsFromConf(conf HadoopConf) (ClientOptions, error) {
options.KerberosServicePrincipleName = strings.Split(conf["dfs.namenode.kerberos.principal"], "@")[0]
}

return options, err
return options
}

// NewClient returns a connected Client for the given options, or an error if
Expand Down Expand Up @@ -146,19 +147,25 @@ func NewClient(options ClientOptions) (*Client, error) {
return &Client{namenode: namenode, options: options}, nil
}

// New returns a connected Client, or an error if it can't connect. The user
// will be the current system user. Any relevantoptions (including the
// address(es) of the namenode(s), if an empty string is passed) will be loaded
// from the Hadoop configuration present at HADOOP_CONF_DIR. Note, however,
// that New will not attempt any Kerberos authentication; use NewClient if you
// need that.
// New returns Client connected to the namenode(s) specified by address, or an
// error if it can't connect. Multiple namenodes can be specified by separating
// them with commas, for example "nn1:9000,nn2:9000".
//
// The user will be the current system user. Any other relevant options
// (including the address(es) of the namenode(s), if an empty string is passed)
// will be loaded from the Hadoop configuration present at HADOOP_CONF_DIR or
// HADOOP_HOME, as specified by hadoopconf.LoadFromEnvironment and
// ClientOptionsFromConf.
//
// Note, however, that New will not attempt any Kerberos authentication; use
// NewClient if you need that.
func New(address string) (*Client, error) {
conf := LoadHadoopConf("")
options, err := ClientOptionsFromConf(conf)
conf, err := hadoopconf.LoadFromEnvironment()
if err != nil {
options = ClientOptions{}
return nil, err
}

options := ClientOptionsFromConf(conf)
if address != "" {
options.Addresses = strings.Split(address, ",")
}
Expand Down
18 changes: 12 additions & 6 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"testing"

"github.com/colinmarc/hdfs/hadoopconf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
krb "gopkg.in/jcmturner/gokrb5.v5/client"
Expand Down Expand Up @@ -35,10 +36,14 @@ func getClientForUser(t *testing.T, username string) *Client {
return c
}

conf := LoadHadoopConf("")
options, _ := ClientOptionsFromConf(conf)
conf, err := hadoopconf.LoadFromEnvironment()
if err != nil || conf == nil {
t.Fatal("Couldn't load ambient config", err)
}

options := ClientOptionsFromConf(conf)
if options.Addresses == nil {
t.Fatal("No hadoop configuration found at HADOOP_CONF_DIR")
t.Fatal("Missing namenode addresses in ambient config")
}

if options.KerberosClient != nil {
Expand Down Expand Up @@ -139,12 +144,13 @@ func assertPathError(t *testing.T, err error, op, path string, wrappedErr error)
}

func TestNewWithMultipleNodes(t *testing.T) {
conf := LoadHadoopConf("")
nns, err := conf.Namenodes()
conf, err := hadoopconf.LoadFromEnvironment()
if err != nil {
t.Fatal("No hadoop configuration found at HADOOP_CONF_DIR")
t.Fatal("Couldn't load ambient config", err)
}

nns := conf.Namenodes()

nns = append([]string{"localhost:100"}, nns...)
_, err = NewClient(ClientOptions{Addresses: nns, User: "gohdfs1"})
assert.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/hdfs/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func completePath(fragment string) {

fullPath := paths[0]
if fullPath == "" {
fullPath = userDir() + "/"
fullPath = userDir(client) + "/"
} else if hasGlob(fullPath) {
return
}
Expand Down
12 changes: 7 additions & 5 deletions cmd/hdfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/colinmarc/hdfs"
"github.com/colinmarc/hdfs/hadoopconf"
"github.com/pborman/getopt"
)

Expand Down Expand Up @@ -181,10 +182,12 @@ func getClient(namenode string) (*hdfs.Client, error) {
namenode = os.Getenv("HADOOP_NAMENODE")
}

// Ignore errors here, since we don't care if the conf doesn't exist if the
// namenode was specified.
conf := hdfs.LoadHadoopConf("")
options, _ := hdfs.ClientOptionsFromConf(conf)
conf, err := hadoopconf.LoadFromEnvironment()
if err != nil {
return nil, fmt.Errorf("Problem loading configuration: %s", err)
}

options := hdfs.ClientOptionsFromConf(conf)
if namenode != "" {
options.Addresses = []string{namenode}
}
Expand All @@ -193,7 +196,6 @@ func getClient(namenode string) (*hdfs.Client, error) {
return nil, errors.New("Couldn't find a namenode to connect to. You should specify hdfs://<namenode>:<port> in your paths. Alternatively, set HADOOP_NAMENODE or HADOOP_CONF_DIR in your environment.")
}

var err error
if options.KerberosClient != nil {
options.KerberosClient, err = getKerberosClient()
if err != nil {
Expand Down
91 changes: 0 additions & 91 deletions conf.go

This file was deleted.

132 changes: 132 additions & 0 deletions hadoopconf/hadoopconf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Package hadoopconf provides utilities for reading and parsing Hadoop's xml
// configuration files.
package hadoopconf

import (
"encoding/xml"
"fmt"
"io/ioutil"
"net/url"
"os"
"path/filepath"
"sort"
"strings"
)

type property struct {
Name string `xml:"name"`
Value string `xml:"value"`
}

type propertyList struct {
Property []property `xml:"property"`
}

var confFiles = []string{"core-site.xml", "hdfs-site.xml", "mapred-site.xml"}

// HadoopConf represents a map of all the key value configutation
// pairs found in a user's hadoop configuration files.
type HadoopConf map[string]string

// LoadFromEnvironment tries to locate the Hadoop configuration files based on
// the environment, and returns a HadoopConf object representing the parsed
// configuration. If the HADOOP_CONF_DIR environment variable is specified, it
// uses that, or if HADOOP_HOME is specified, it uses $HADOOP_HOME/conf.
//
// If no configuration can be found, it returns a nil map. If the configuration
// files exist but there was an error opening or parsing them, that is returned
// as well.
func LoadFromEnvironment() (HadoopConf, error) {
hadoopConfDir := os.Getenv("HADOOP_CONF_DIR")
if hadoopConfDir != "" {
if conf, err := Load(hadoopConfDir); conf != nil || err != nil {
return conf, err
}
}

hadoopHome := os.Getenv("HADOOP_HOME")
if hadoopHome != "" {
if conf, err := Load(filepath.Join(hadoopHome, "conf")); conf != nil || err != nil {
return conf, err
}
}

return nil, nil
}

// Load returns a HadoopConf object representing configuration from the
// specified path. It will parse core-site.xml, hdfs-site.xml, and
// mapred-site.xml.
//
// If no configuration files could be found, Load returns a nil map. If the
// configuration files exist but there was an error opening or parsing them,
// that is returned as well.
func Load(path string) (HadoopConf, error) {
var conf HadoopConf

for _, file := range confFiles {
pList := propertyList{}
f, err := ioutil.ReadFile(filepath.Join(path, file))
if os.IsNotExist(err) {
continue
} else if err != nil {
return conf, err
}

err = xml.Unmarshal(f, &pList)
if err != nil {
return conf, fmt.Errorf("%s: %s", path, err)
}

if conf == nil {
conf = make(HadoopConf)
}

for _, prop := range pList.Property {
conf[prop.Name] = prop.Value
}
}

return conf, nil
}

// Namenodes returns the namenode hosts present in the configuration. The
// returned slice will be sorted and deduped. The values are loaded from
// fs.defaultFS (or the deprecated fs.default.name), or fields beginning with
// dfs.namenode.rpc-address.
//
// To handle 'logical' clusters Namenodes will not return any cluster names
// found in dfs.ha.namenodes.<clustername> properties.
//
// If no namenode addresses can befound, Namenodes returns a nil slice.
func (conf HadoopConf) Namenodes() []string {
nns := make(map[string]bool)
var clusterNames []string

for key, value := range conf {
if strings.Contains(key, "fs.default") {
nnUrl, _ := url.Parse(value)
nns[nnUrl.Host] = true
} else if strings.HasPrefix(key, "dfs.namenode.rpc-address.") {
nns[value] = true
} else if strings.HasPrefix(key, "dfs.ha.namenodes.") {
clusterNames = append(clusterNames, key[len("dfs.ha.namenodes."):])
}
}

for _, cn := range clusterNames {
delete(nns, cn)
}

if len(nns) == 0 {
return nil
}

keys := make([]string, 0, len(nns))
for k, _ := range nns {
keys = append(keys, k)
}

sort.Strings(keys)
return keys
}
Loading

0 comments on commit 59eb6f6

Please sign in to comment.