File tree Expand file tree Collapse file tree
main/java/com/google/cloud/pubsublite/spark
test/java/com/google/cloud/pubsublite/spark Expand file tree Collapse file tree Original file line number Diff line number Diff line change 1818
1919import static com .google .cloud .pubsublite .internal .wire .ServiceClients .addDefaultSettings ;
2020
21+ import com .google .api .gax .rpc .ApiException ;
2122import com .google .auto .value .AutoValue ;
2223import com .google .cloud .pubsublite .AdminClient ;
2324import com .google .cloud .pubsublite .AdminClientSettings ;
@@ -79,9 +80,16 @@ public static PslDataSourceOptions fromSparkDataSourceOptions(DataSourceOptions
7980 options
8081 .get (Constants .MAX_MESSAGE_PER_BATCH_CONFIG_KEY )
8182 .ifPresent (mmpb -> builder .setMaxMessagesPerBatch (Long .parseLong (mmpb )));
83+ String subscriptionPathVal = options .get (Constants .SUBSCRIPTION_CONFIG_KEY ).get ();
84+ SubscriptionPath subscriptionPath ;
85+ try {
86+ subscriptionPath = SubscriptionPath .parse (subscriptionPathVal );
87+ } catch (ApiException e ) {
88+ throw new IllegalArgumentException (
89+ "Unable to parse subscription path " + subscriptionPathVal );
90+ }
8291 return builder
83- .setSubscriptionPath (
84- SubscriptionPath .parse (options .get (Constants .SUBSCRIPTION_CONFIG_KEY ).get ()))
92+ .setSubscriptionPath (subscriptionPath )
8593 .setFlowControlSettings (
8694 FlowControlSettings .builder ()
8795 .setMessagesOutstanding (
Original file line number Diff line number Diff line change 1+ /*
2+ * Copyright 2020 Google LLC
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package com .google .cloud .pubsublite .spark ;
18+
19+ import static org .junit .Assert .assertThrows ;
20+
21+ import com .google .common .collect .ImmutableMap ;
22+ import org .apache .spark .sql .sources .v2 .DataSourceOptions ;
23+ import org .junit .Test ;
24+
25+ public class PslDataSourceOptionsTest {
26+
27+ @ Test
28+ public void testInvalidSubPath () {
29+ DataSourceOptions options =
30+ new DataSourceOptions (ImmutableMap .of (Constants .SUBSCRIPTION_CONFIG_KEY , "invalid/path" ));
31+ assertThrows (
32+ IllegalArgumentException .class ,
33+ () -> PslDataSourceOptions .fromSparkDataSourceOptions (options ));
34+ }
35+ }
You can’t perform that action at this time.
0 commit comments