Skip to content

Commit

Permalink
Add more test tables and update integration tests (#36)
Browse files Browse the repository at this point in the history
Add two more tables and update existing integration tests accordingly:

- `share3.default.table4`: The table column order is not the same as parquet files.
- `share3.default.table5`: An empty table.
  • Loading branch information
zsxwing committed Jun 5, 2021
1 parent 7c5c3ed commit eac84d4
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 6 deletions.
18 changes: 17 additions & 1 deletion python/delta_sharing/tests/test_delta_sharing.py
Expand Up @@ -26,7 +26,7 @@
@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
def test_list_shares(sharing_client: SharingClient):
shares = sharing_client.list_shares()
assert shares == [Share(name="share1"), Share(name="share2")]
assert shares == [Share(name="share1"), Share(name="share2"), Share(name="share3")]


@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
Expand Down Expand Up @@ -57,6 +57,8 @@ def test_list_all_tables(sharing_client: SharingClient):
Table(name="table1", share="share1", schema="default"),
Table(name="table3", share="share1", schema="default"),
Table(name="table2", share="share2", schema="default"),
Table(name="table4", share="share3", schema="default"),
Table(name="table5", share="share3", schema="default"),
]


Expand Down Expand Up @@ -105,6 +107,20 @@ def test_list_all_tables(sharing_client: SharingClient):
),
id="partitioned and different schemas",
),
pytest.param(
"share3.default.table4",
pd.DataFrame(
{
"type": [None, None],
"eventTime": [
pd.Timestamp("2021-04-28 23:33:57.955"),
pd.Timestamp("2021-04-28 23:33:48.719"),
],
"date": [date(2021, 4, 28), date(2021, 4, 28)],
}
),
id="table column order is not the same as parquet files",
),
],
)
def test_load(profile_path: str, fragments: str, expected: pd.DataFrame):
Expand Down
2 changes: 1 addition & 1 deletion python/delta_sharing/tests/test_rest_client.py
Expand Up @@ -31,7 +31,7 @@
@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
def test_list_shares(rest_client: DataSharingRestClient):
response = rest_client.list_shares()
assert response.shares == [Share(name="share1"), Share(name="share2")]
assert response.shares == [Share(name="share1"), Share(name="share2"), Share(name="share3")]


@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE)
Expand Down
Expand Up @@ -152,7 +152,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
integrationTest("/shares") {
val response = readJson(requestPath("/shares"))
val expected = ListSharesResponse(
Vector(Share().withName("share1"), Share().withName("share2")))
Vector(Share().withName("share1"), Share().withName("share2"), Share().withName("share3")))
assert(expected == JsonFormat.fromJsonString[ListSharesResponse](response))
}

Expand All @@ -165,7 +165,7 @@ class DeltaSharingServiceSuite extends FunSuite with BeforeAndAfterAll {
response = JsonFormat.fromJsonString[ListSharesResponse](readJson(requestPath(s"/shares?pageToken=${response.nextPageToken.get}&maxResults=1")))
shares ++= response.items
}
val expected = Seq(Share().withName("share1"), Share().withName("share2"))
val expected = Seq(Share().withName("share1"), Share().withName("share2"), Share().withName("share3"))
assert(expected == shares)
}

Expand Down
13 changes: 12 additions & 1 deletion server/src/test/scala/io/delta/sharing/server/TestResource.scala
Expand Up @@ -55,7 +55,18 @@ object TestResource {
TableConfig("table2", s"s3a://${TestResource.AWS.bucket}/delta-exchange-test/table2")
)
)
))
)),
ShareConfig("share3",
java.util.Arrays.asList(
SchemaConfig(
"default",
java.util.Arrays.asList(
TableConfig("table4", s"s3a://${TestResource.AWS.bucket}/delta-exchange-test/table4"),
TableConfig("table5", s"s3a://${TestResource.AWS.bucket}/delta-exchange-test/table5")
)
)
)
)
)

val serverConfig = new ServerConfig()
Expand Down
Expand Up @@ -27,7 +27,9 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
val expected = Set(
Table(name = "table1", schema = "default", share = "share1"),
Table(name = "table2", schema = "default", share = "share2"),
Table(name = "table3", schema = "default", share = "share1")
Table(name = "table3", schema = "default", share = "share1"),
Table(name = "table4", schema = "default", share = "share3"),
Table(name = "table5", schema = "default", share = "share3")
)
assert(expected == client.listAllTables().toSet)
} finally {
Expand Down
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaSharingIntegrationTest {
Expand Down Expand Up @@ -84,6 +85,34 @@ class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaShar
}
}

integrationTest("table4: table column order is not the same as parquet files") {
val tablePath = testProfileFile.getCanonicalPath + "#share3.default.table4"
val expected = Seq(
Row(null, sqlTimestamp("2021-04-28 16:33:57.955"), sqlDate("2021-04-28")),
Row(null, sqlTimestamp("2021-04-28 16:33:48.719"), sqlDate("2021-04-28"))
)
checkAnswer(spark.read.format("deltaSharing").load(tablePath), expected)
withTable("delta_sharing_test") {
sql(s"CREATE TABLE delta_sharing_test USING deltaSharing LOCATION '$tablePath'")
checkAnswer(sql(s"SELECT * FROM delta_sharing_test"), expected)
}
}

integrationTest("table5: empty table") {
val tablePath = testProfileFile.getCanonicalPath + "#share3.default.table5"
checkAnswer(spark.read.format("deltaSharing").load(tablePath), Nil)
val expectedSchema = StructType(Array(
StructField("eventTime", TimestampType),
StructField("date", DateType),
StructField("type", StringType).withComment("this is a comment")))
assert(spark.read.format("deltaSharing").load(tablePath).schema == expectedSchema)
withTable("delta_sharing_test") {
sql(s"CREATE TABLE delta_sharing_test USING deltaSharing LOCATION '$tablePath'")
checkAnswer(sql(s"SELECT * FROM delta_sharing_test"), Nil)
assert(sql(s"SELECT * FROM delta_sharing_test").schema == expectedSchema)
}
}

integrationTest("partition pruning") {
val tablePath = testProfileFile.getCanonicalPath + "#share1.default.table3"
val expected = Seq(
Expand Down

0 comments on commit eac84d4

Please sign in to comment.