Skip to content

Commit

Permalink
Integration test for Google GCS hosted table (#105)
Browse files Browse the repository at this point in the history
* extend delta sharing protocol to include share id and table id

* more documentation on the id

* add integration test for gcs hosted tables

* update readme

* format python

* format

* address comments

* fix test

* fix format

* test python dependency

* another try

* read google key from github secret and write temp file in integration test

* add condition

* fix yaml

* log it

* log it more

* fix spark test

* address comments

* format python

* enable python integration test

* rm imports
  • Loading branch information
zhuansunxt committed Jan 11, 2022
1 parent 7dbf108 commit eb0e5bd
Show file tree
Hide file tree
Showing 15 changed files with 127 additions and 36 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ jobs:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AZURE_TEST_ACCOUNT_KEY: ${{ secrets.AZURE_TEST_ACCOUNT_KEY }}
GOOGLE_APPLICATION_CREDENTIALS: /tmp/google_service_account_key.json
GOOGLE_SERVICE_ACCOUNT_KEY: ${{ secrets.GOOGLE_SERVICE_ACCOUNT_KEY }}
steps:
- name: Checkout repository
uses: actions/checkout@v2
Expand Down Expand Up @@ -42,6 +44,8 @@ jobs:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
AZURE_TEST_ACCOUNT_KEY: ${{ secrets.AZURE_TEST_ACCOUNT_KEY }}
GOOGLE_APPLICATION_CREDENTIALS: /tmp/google_service_account_key.json
GOOGLE_SERVICE_ACCOUNT_KEY: ${{ secrets.GOOGLE_SERVICE_ACCOUNT_KEY }}
# Github Actions' default miniconda
CONDA_PREFIX: /usr/share/miniconda
steps:
Expand Down
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,17 @@ The server is using `hadoop-azure` to read Azure Data Lake Storage Gen2. We supp
```
`YOUR-ACCOUNT-NAME` is your Azure storage account and `YOUR-ACCOUNT-KEY` is your account key.

More cloud storage supports will be added in the future.
### Google Cloud Storage

We support using [Service Account](https://cloud.google.com/iam/docs/service-accounts) to read Google Cloud Storage. You can find more details in [GCP Authentication Doc](https://cloud.google.com/docs/authentication/getting-started).

To set up the Service Account credentials, you can specify the environment GOOGLE_APPLICATION_CREDENTIALS before starting the Delta Sharing Server.

```
export GOOGLE_APPLICATION_CREDENTIALS="KEY_PATH"
```

Replace `KEY_PATH` with path of the JSON file that contains your service account key.

## Authorization

Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ lazy val server = (project in file("server")) enablePlugins(JavaAppPackaging) se
ExclusionRule("com.fasterxml.jackson.module"),
ExclusionRule("com.google.guava", "guava")
),
"com.google.cloud" % "google-cloud-storage" % "2.2.1" excludeAll(
"com.google.cloud" % "google-cloud-storage" % "2.2.2" excludeAll(
ExclusionRule("com.fasterxml.jackson.core"),
ExclusionRule("com.fasterxml.jackson.module")
),
"com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop2-2.2.3" excludeAll(
"com.google.cloud.bigdataoss" % "gcs-connector" % "hadoop2-2.2.4" excludeAll(
ExclusionRule("com.fasterxml.jackson.core"),
ExclusionRule("com.fasterxml.jackson.module")
),
Expand Down
8 changes: 8 additions & 0 deletions python/delta_sharing/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@
from delta_sharing.protocol import AddFile, Table
from delta_sharing.rest_client import DataSharingRestClient

try:
from yarl import URL
from yarl._quoting import _Quoter

URL._PATH_REQUOTER = _Quoter(safe="@:", protected="/+=") # type: ignore
except:
pass


class DeltaSharingReader:
def __init__(
Expand Down
8 changes: 8 additions & 0 deletions python/delta_sharing/tests/test_delta_sharing.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def test_list_shares(sharing_client: SharingClient):
Share(name="share6"),
Share(name="share7"),
Share(name="share_azure"),
Share(name="share_gcp"),
]


Expand Down Expand Up @@ -88,6 +89,7 @@ def _verify_all_tables_result(tables: Sequence[Table]):
Table(name="table9", share="share7", schema="schema2"),
Table(name="table_wasb", share="share_azure", schema="default"),
Table(name="table_abfs", share="share_azure", schema="default"),
Table(name="table_gcs", share="share_gcp", schema="default"),
]


Expand Down Expand Up @@ -280,6 +282,12 @@ def list_all_tables(
pd.DataFrame({"c1": ["foo bar"], "c2": ["foo bar"],}),
id="Azure Data Lake Storage Gen2",
),
pytest.param(
"share_gcp.default.table_gcs",
None,
pd.DataFrame({"c1": ["foo bar"], "c2": ["foo bar"],}),
id="Google Cloud Storage",
),
],
)
def test_load(profile_path: str, fragments: str, limit: Optional[int], expected: pd.DataFrame):
Expand Down
1 change: 1 addition & 0 deletions python/delta_sharing/tests/test_rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def test_list_shares(rest_client: DataSharingRestClient):
Share(name="share6"),
Share(name="share7"),
Share(name="share_azure"),
Share(name="share_gcp"),
]


Expand Down
1 change: 1 addition & 0 deletions python/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pyarrow>=4.0.0
fsspec>=0.7.4
requests
aiohttp
yarl>=1.6.0

# Linter
mypy==0.812
Expand Down
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
'requests',
'aiohttp',
'dataclasses;python_version<"3.7"',
'yarl>=1.6.0',
],
extras_require={
's3': ['s3fs'],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ import io.delta.sharing.server.util.JsonUtils
class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {

def shouldRunIntegrationTest: Boolean = {
sys.env.get("AWS_ACCESS_KEY_ID").exists(_.length > 0)
sys.env.get("AWS_ACCESS_KEY_ID").exists(_.length > 0) &&
sys.env.get("AZURE_TEST_ACCOUNT_KEY").exists(_.length > 0) &&
sys.env.get("GOOGLE_APPLICATION_CREDENTIALS").exists(_.length > 0)
}

private var serverConfig: ServerConfig = _
Expand All @@ -65,6 +67,20 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid)
}

private def verifyPreSignedUrl(url: String, expectedLength: Int): Unit = {
// We should be able to read from the url
assert(IOUtils.toByteArray(new URL(url)).size == expectedLength)

// Modifying the file to access a different path should fail. This ensures the url is scoped
// down to the specific file.
val urlForDifferentObject = url.replaceAll("\\.parquet", ".orc")
assert(url != urlForDifferentObject)
val e = intercept[IOException] {
IOUtils.toByteArray(new URL(urlForDifferentObject))
}
assert(e.getMessage.contains("Server returned HTTP response code: 403")) // 403 Forbidden
}

def requestPath(path: String): String = {
s"https://${serverConfig.getHost}:${serverConfig.getPort}${serverConfig.getEndpoint}$path"
}
Expand Down Expand Up @@ -160,7 +176,8 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
Share().withName("share5"),
Share().withName("share6"),
Share().withName("share7"),
Share().withName("share_azure")
Share().withName("share_azure"),
Share().withName("share_gcp")
)
)
assert(expected == JsonFormat.fromJsonString[ListSharesResponse](response))
Expand All @@ -183,7 +200,8 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
Share().withName("share5"),
Share().withName("share6"),
Share().withName("share7"),
Share().withName("share_azure")
Share().withName("share_azure"),
Share().withName("share_gcp")
)
assert(expected == shares)
}
Expand Down Expand Up @@ -544,24 +562,9 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
}
}

private def verifyPreSignedUrl(url: String, expectedLength: Int): Unit = {
// We should be able to read from the url
assert(IOUtils.toByteArray(new URL(url)).size == expectedLength)

// Modifying the file to access a different path should fail. This ensures the url is scoped
// down to the specific file.
val urlForDifferentObject = url.replaceAll("\\.parquet", ".orc")
assert(url != urlForDifferentObject)
val e = intercept[IOException] {
IOUtils.toByteArray(new URL(urlForDifferentObject))
}
assert(e.getMessage.contains("Server returned HTTP response code: 403")) // 403 Forbidden
}

ignore("gcs support") {
assume(shouldRunIntegrationTest)
integrationTest("gcp support") {
val gcsTableName = "table_gcs"
val response = readNDJson(requestPath(s"/shares/share_gcs/schemas/default/tables/${gcsTableName}/query"), Some("POST"), Some("{}"), Some(0))
val response = readNDJson(requestPath(s"/shares/share_gcp/schemas/default/tables/${gcsTableName}/query"), Some("POST"), Some("{}"), Some(0))
val lines = response.split("\n")
val protocol = lines(0)
val metadata = lines(1)
Expand Down
49 changes: 39 additions & 10 deletions server/src/test/scala/io/delta/sharing/server/TestResource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package io.delta.sharing.server

import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files

import org.apache.commons.io.FileUtils

import io.delta.sharing.server.config._

object TestResource {
Expand All @@ -35,30 +38,46 @@ object TestResource {
val container = "delta-sharing-test-container"
}

object GCP {
val bucket = "delta-sharing-dev"
}

val TEST_PORT = 12345

val testAuthorizationToken = "dapi5e3574ec767ca1548ae5bbed1a2dc04d"

def maybeSetupGoogleServiceAccountCredentials: Unit = {
// Only setup Google Service Account credentials when it is provided through env variable.
if (sys.env.get("GOOGLE_SERVICE_ACCOUNT_KEY").exists(_.length > 0)
&& sys.env.get("GOOGLE_APPLICATION_CREDENTIALS").exists(_.length > 0)) {
val serviceAccountKey = sys.env("GOOGLE_SERVICE_ACCOUNT_KEY")
val credFilePath = new File(sys.env("GOOGLE_APPLICATION_CREDENTIALS"))
credFilePath.deleteOnExit()
FileUtils.writeStringToFile(credFilePath, serviceAccountKey, UTF_8, false)
}
}

def setupTestTables(): File = {
val testConfigFile = Files.createTempFile("delta-sharing", ".yaml").toFile
testConfigFile.deleteOnExit()
maybeSetupGoogleServiceAccountCredentials
val shares = java.util.Arrays.asList(
ShareConfig("share1",
java.util.Arrays.asList(
SchemaConfig(
"default",
java.util.Arrays.asList(
TableConfig("table1", s"s3a://${TestResource.AWS.bucket}/delta-exchange-test/table1"),
TableConfig("table3", s"s3a://${TestResource.AWS.bucket}/delta-exchange-test/table3"),
TableConfig("table7", s"s3a://${TestResource.AWS.bucket}/delta-exchange-test/table7")
TableConfig("table1", s"s3a://${AWS.bucket}/delta-exchange-test/table1"),
TableConfig("table3", s"s3a://${AWS.bucket}/delta-exchange-test/table3"),
TableConfig("table7", s"s3a://${AWS.bucket}/delta-exchange-test/table7")
)
)
)
),
ShareConfig("share2",
java.util.Arrays.asList(
SchemaConfig("default", java.util.Arrays.asList(
TableConfig("table2", s"s3a://${TestResource.AWS.bucket}/delta-exchange-test/table2")
TableConfig("table2", s"s3a://${AWS.bucket}/delta-exchange-test/table2")
)
)
)),
Expand All @@ -67,8 +86,8 @@ object TestResource {
SchemaConfig(
"default",
java.util.Arrays.asList(
TableConfig("table4", s"s3a://${TestResource.AWS.bucket}/delta-exchange-test/table4"),
TableConfig("table5", s"s3a://${TestResource.AWS.bucket}/delta-exchange-test/table5")
TableConfig("table4", s"s3a://${AWS.bucket}/delta-exchange-test/table4"),
TableConfig("table5", s"s3a://${AWS.bucket}/delta-exchange-test/table5")
)
)
)
Expand All @@ -79,7 +98,7 @@ object TestResource {
"default",
java.util.Arrays.asList(
// table made with spark.sql.parquet.compression.codec=gzip
TableConfig("test_gzip", s"s3a://${TestResource.AWS.bucket}/compress-test/table1")
TableConfig("test_gzip", s"s3a://${AWS.bucket}/compress-test/table1")
)
)
)
Expand All @@ -100,13 +119,13 @@ object TestResource {
SchemaConfig(
"schema1",
java.util.Arrays.asList(
TableConfig("table8", s"s3a://${TestResource.AWS.bucket}/delta-exchange-test/table8")
TableConfig("table8", s"s3a://${AWS.bucket}/delta-exchange-test/table8")
)
),
SchemaConfig(
"schema2",
java.util.Arrays.asList(
TableConfig("table9", s"s3a://${TestResource.AWS.bucket}/delta-exchange-test/table9")
TableConfig("table9", s"s3a://${AWS.bucket}/delta-exchange-test/table9")
)
)
)
Expand All @@ -122,8 +141,18 @@ object TestResource {
)
)
)
)
),
// scalastyle:on
ShareConfig("share_gcp",
java.util.Arrays.asList(
SchemaConfig(
"default",
java.util.Arrays.asList(
TableConfig("table_gcs", s"gs://${GCP.bucket}/delta-sharing-test/table1")
)
)
)
)
)

val serverConfig = new ServerConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ class ServerConfigSuite extends FunSuite {
"table3",
"abfss://<container-name>@<account-name}.dfs.core.windows.net/<the-table-path>")
))
)),
ShareConfig("share3", Arrays.asList(
SchemaConfig("schema3", Arrays.asList(
TableConfig(
"table4",
"gs://<bucket-name>/<the-table-path>")
))
))
)
val serverConfig = new ServerConfig()
Expand Down
7 changes: 7 additions & 0 deletions server/src/universal/conf/delta-sharing-server.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ shares:
- name: "table3"
# Azure Data Lake Storage Gen2. See https://github.com/delta-io/delta-sharing#azure-data-lake-storage-gen2 for how to config the credentials
location: "abfss://<container-name>@<account-name}.dfs.core.windows.net/<the-table-path>"
- name: "share3"
schemas:
- name: "schema3"
tables:
- name: "table4"
# Google Cloud Storage (GCS). See https://github.com/delta-io/delta-sharing#google-cloud-storage for how to config the credentials
location: "gs://<bucket-name>/<the-table-path>"
# Set the host name that the server will use
host: "localhost"
# Set the port that the server will listen on. Note: using ports below 1024
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import org.scalatest.BeforeAndAfterAll
trait DeltaSharingIntegrationTest extends SparkFunSuite with BeforeAndAfterAll {

def shouldRunIntegrationTest: Boolean = {
sys.env.get("AWS_ACCESS_KEY_ID").exists(_.length > 0)
sys.env.get("AWS_ACCESS_KEY_ID").exists(_.length > 0) &&
sys.env.get("AZURE_TEST_ACCOUNT_KEY").exists(_.length > 0) &&
sys.env.get("GOOGLE_APPLICATION_CREDENTIALS").exists(_.length > 0)
}

@volatile private var process: Process = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
Table(name = "table9", schema = "schema2", share = "share7"),
Table(name = "test_gzip", schema = "default", share = "share4"),
Table(name = "table_wasb", schema = "default", share = "share_azure"),
Table(name = "table_abfs", schema = "default", share = "share_azure")
Table(name = "table_abfs", schema = "default", share = "share_azure"),
Table(name = "table_gcs", schema = "default", share = "share_gcp")
)
assert(expected == client.listAllTables().toSet)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,19 @@ class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaShar
val tablePath = testProfileFile.getCanonicalPath + s"#share_azure.default.${azureTableName}"
checkAnswer(
spark.read.format("deltaSharing").load(tablePath),
Row("foo bar", "foo bar") :: Nil)
Row("foo bar", "foo bar") :: Nil
)
}
}

integrationTest("gcp support") {
val tablePath = testProfileFile.getCanonicalPath + s"#share_gcp.default.table_gcs"
checkAnswer(
spark.read.format("deltaSharing").load(tablePath),
Row("foo bar", "foo bar") :: Nil
)
}

integrationTest("random access stream") {
// Set maxConnections to 1 so that if we leak any connection, we will hang forever because any
// further request won't be able to get a free connection from the pool.
Expand Down

0 comments on commit eb0e5bd

Please sign in to comment.