Skip to content

Commit

Permalink
Merge pull request #29 from ibm-watson-data-lab/bluemix_cos_support_s…
Browse files Browse the repository at this point in the history
…cala

Bluemix Cloud Object Storage Support [Scala]
  • Loading branch information
bassel-zeidan committed Oct 9, 2017
2 parents fc92a7f + eb33c1a commit 3475774
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 28 deletions.
62 changes: 56 additions & 6 deletions scala/README.md
Expand Up @@ -133,13 +133,10 @@ within a DSX Jupyter notebook, you can obtain your account credentials in the fo
If your Object Storage was created with a Softlayer account, each part of the credentials will
be found as text that you can copy and paste into the example code below.

### IBM Cloud Object Storage / Data Science Experience
### Softlayer IBM Cloud Object Storage (COS)
```scala
import com.ibm.ibmos2spark.CloudObjectStorage

// The credentials HashMap may be created for you with the
// "insert to code" link in your DSX notebook.

var credentials = scala.collection.mutable.HashMap[String, String](
"endPoint"->"https://identity.open.softlayer.com",
"accessKey"->"xx",
Expand All @@ -161,8 +158,61 @@ var dfData1 = spark.
load(cos.url(bucketName, objectname))
```

### Bluemix IBM Cloud Object Storage (COS)
The class CloudObjectStorage allows you to connect to an IBM bluemix COS. You can connect to
a bluemix COS using api keys as follows:
```scala
import com.ibm.ibmos2spark.CloudObjectStorage

var credentials = scala.collection.mutable.HashMap[String, String](
"endPoint"->"xxx",
"apiKey"->"xxx",
"serviceId"->"xxx"
)
var bucketName = "myBucket"
var objectname = "mydata.csv"

var configurationName = "cos_config_name" // you can choose any string you want
var cos = new CloudObjectStorage(sc, credentials, configurationName, "bluemix_cos")
var spark = SparkSession.
builder().
getOrCreate()

var dfData1 = spark.
read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
option("header", "true").
option("inferSchema", "true").
load(cos.url(bucketName, objectname))
```
Alternatively, you can connect to a bluemix COS using IAM token. Example:
```scala
import com.ibm.ibmos2spark.CloudObjectStorage

// The credentials HashMap may be created for you with the
// "insert to code" link in your DSX notebook.

var credentials = scala.collection.mutable.HashMap[String, String](
"endPoint"->"xxx",
"iamToken"->"xxx",
"serviceId"->"xxx"
)
var bucketName = "myBucket"
var objectname = "mydata.csv"

var configurationName = "cos_config_name" // you can choose any string you want
var cos = new CloudObjectStorage(sc, credentials, configurationName, "bluemix_cos", "iam_token")
var spark = SparkSession.
builder().
getOrCreate()

var dfData1 = spark.
read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
option("header", "true").
option("inferSchema", "true").
load(cos.url(bucketName, objectname))
```

### Bluemix / Data Science Experience
### Bluemix Swift Object Storage/ Data Science Experience


```scala
Expand All @@ -189,7 +239,7 @@ var rdd = sc.textFile(bmos.url(container , objectname))
```


### Softlayer
### Softlayer Swift Object Storage



Expand Down
104 changes: 82 additions & 22 deletions scala/src/main/scala/Osconfig.scala
Expand Up @@ -129,54 +129,114 @@ class bluemix(sc: SparkContext, name: String, creds: HashMap[String, String],
}

/**
* CloudObjectStorage class sets up a s3d connection between an IBM Spark service
* instance and an IBM Cloud Object Storage instance.
* This class allows you to connect to an IBM cloud object storage (COS) instance. It also support
connecting to a COS instance that is being hosted on bluemix.
* Constructor arguments:
* sparkcontext: a SparkContext object.
* credentials: a dictionary with the following required keys:
*
* endpoint
* accessKey
* secretKey
* credentials: a dictionary with the following required keys to connect to COS.
The required keys differ according to the type of COS.
* - for COS type "softlayer_cos" the following key are required:
* endPoint [required]
* accessKey [required]
* secretKey [required]
* - for COS type "bluemix_cos", here are the required/optional key:
* endPoint [required]
* serviceId [required]
* apiKey OR iamToken depends on the selected authorization method (authMethod) [required]
* iamServiceEndpoint [optional] (default: https://iam.ng.bluemix.net/oidc/token)
* v2SignerType [optional]
* configurationName [optional]: string that identifies this configuration. You can
use any string you like. This allows you to create
multiple configurations to different Object Storage accounts.
if a configuration name is not passed the default one will be used "service".
* cosType [optional]: string that identifies the type of COS to connect to. The supported types of COS
are "softlayer_cos" and "bluemix_cos". "softlayer_cos" will be chosen as default if no cosType is passed.
* authMethod [optional]: string that identifies the type of authorization method to use when connecting to COS. This parameter
is not reqired for softlayer_cos but only needed for bluemix_cos. Two options can be chosen for this params
"api_key" or "iam_token". "api_key" will be chosen as default if the value is not set.
*/
class CloudObjectStorage(sc: SparkContext, credentials: HashMap[String, String], configurationName: String = "") {
class CloudObjectStorage(sc: SparkContext, credentials: HashMap[String, String],
configurationName: String = "", cosType: String = "softlayer_cos",
authMethod: String = "api_key") {

// check if all credentials are available
val requiredValues = Array("endPoint", "accessKey", "secretKey")
for ( key <- requiredValues ) {
if (!credentials.contains(key)) {
throw new IllegalArgumentException("Invalid input: missing required input [" + key + "]")
}
}
// check for valid credentials
_validate_credentials(credentials, cosType, authMethod)

// set config
val hadoopConf = sc.hadoopConfiguration
val prefix = "fs.cos." + getConfigurationName()
val prefix = "fs.cos." + _getConfigurationName()

hadoopConf.set(prefix + ".endpoint", credentials("endPoint"))
hadoopConf.set(prefix + ".access.key", credentials("accessKey"))
hadoopConf.set(prefix + ".secret.key", credentials("secretKey"))

private def getConfigurationName() : String = {
if (cosType == "softlayer_cos") {
// softlayer cos case
hadoopConf.set(prefix + ".access.key", credentials("accessKey"))
hadoopConf.set(prefix + ".secret.key", credentials("secretKey"))
} else if (cosType == "bluemix_cos") {
// bluemix COS case
hadoopConf.set(prefix + ".iam.service.id", credentials("serviceId"))
if (authMethod == "api_key") {
hadoopConf.set(prefix + ".iam.api.key", credentials("apiKey"))
} else if (authMethod == "iam_token") {
hadoopConf.set(prefix + ".iam.token", credentials("iamToken"))
}

if (credentials.contains("iamServiceEndpoint")) {
hadoopConf.set(prefix + ".iam.endpoint", credentials("iamServiceEndpoint"))
}

if (credentials.contains("v2SignerType")) {
hadoopConf.set(prefix + ".v2.signer.type", credentials("v2SignerType"))
}
}

private def _getConfigurationName() : String = {
if (configurationName != "") {
return configurationName
} else {
return globalVariables.DEFAULT_SERVICE_NAME
}
}

private def _validate_credentials(credentials : HashMap[String, String], cosType : String, authMethod : String) = {
val requiredKeys = _get_required_key_array(cosType, authMethod)

// check the existence of all required values in credentials
for ( key <- requiredKeys ) {
if (!credentials.contains(key)) {
throw new IllegalArgumentException("Invalid input: missing required input [" + key + "]")
}
}
}

private def _get_required_key_array(cosType : String, authMethod : String) : Array[String] = {
val requiredKeySoftlayerCos = Array("endPoint", "accessKey", "secretKey")
val requiredKeyListIamApiKey = Array("endPoint", "apiKey", "serviceId")
val requiredKeyListIamToken = Array("endPoint", "iamToken", "serviceId")

if (cosType == "bluemix_cos") {
if (authMethod == "api_key") {
return requiredKeyListIamApiKey
} else if (authMethod == "iam_token") {
return requiredKeyListIamToken
} else {
throw new IllegalArgumentException("Invalid input: authMethod. authMethod is optional but if set, it should have one of the following values: api_key, iam_token")
}
} else if (cosType == "softlayer_cos") {
return requiredKeySoftlayerCos
} else {
throw new IllegalArgumentException("Invalid input: cosType. cosType is optional but if set, it should have one of the following values: softlayer_cos, bluemix_cos")
}
}

def url(bucketName: String, objectName: String) : String = {
var serviceName = getConfigurationName()
var serviceName = _getConfigurationName()
return "cos://" + bucketName + "." + serviceName + "/" + objectName
}
}

0 comments on commit 3475774

Please sign in to comment.