Skip to content

Commit aa3c094

Browse files
authored
Recent updates in Confluent Cloud have improved the reliability of queries. (#11)
This negates the need for the retries that were previously used. The retry logic has been removed as a consequence.
1 parent 39d6781 commit aa3c094

File tree

27 files changed

+71
-183
lines changed

27 files changed

+71
-183
lines changed

CHANGELOG.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,10 @@
2323
## Version 0.3.3
2424

2525
* Added a dev container file.
26-
* Updated the Gitpod configuration.
26+
* Updated the Gitpod configuration.
27+
28+
## Version 0.3.4
29+
30+
* Recent updates in Confluent Cloud have improved the reliability of queries.
31+
This negates the need for the retries that were previously used.
32+
The retry logic has been removed as a consequence.

solutions/01-connecting-to-confluent-cloud/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ under the License.
198198
</dependency>
199199
</dependencies>
200200
<configuration>
201-
<rerunFailingTestsCount>5</rerunFailingTestsCount>
202201
<argLine>-XX:+EnableDynamicAgentLoading</argLine>
203202
</configuration>
204203
</plugin>

solutions/02-querying-flink-tables/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ under the License.
198198
</dependency>
199199
</dependencies>
200200
<configuration>
201-
<rerunFailingTestsCount>5</rerunFailingTestsCount>
202201
<argLine>-XX:+EnableDynamicAgentLoading</argLine>
203202
</configuration>
204203
</plugin>

solutions/02-querying-flink-tables/src/test/java/marketplace/CustomerServiceIntegrationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void allCustomers_shouldReturnTheDetailsOfAllCustomers() throws Exception
5656
env.fromValues(customers).insertInto(customersTableName).execute();
5757

5858
// Execute the query.
59-
TableResult results = retry(() -> customerService.allCustomers());
59+
TableResult results = customerService.allCustomers();
6060

6161
// Fetch the actual results.
6262
List<Row> actual = fetchRows(results)
@@ -90,7 +90,7 @@ public void allCustomerAddresses_shouldReturnTheAddressesOfAllCustomers() throws
9090
env.fromValues(customers).insertInto(customersTableName).execute();
9191

9292
// Execute the query.
93-
TableResult results = retry(() -> customerService.allCustomerAddresses());
93+
TableResult results = customerService.allCustomerAddresses();
9494

9595
// Fetch the actual results.
9696
List<Row> actual = fetchRows(results)

solutions/02-querying-flink-tables/src/test/java/marketplace/FlinkIntegrationTest.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -93,26 +93,6 @@ public void mainTeardown() {
9393
}
9494
}
9595

96-
protected TableResult retry(Supplier<TableResult> supplier) {
97-
return retry(3, supplier);
98-
}
99-
100-
protected TableResult retry(int tries, Supplier<TableResult> supplier) {
101-
try {
102-
return supplier.get();
103-
} catch (Exception e) {
104-
logger.error("Failed on retryable command.", e);
105-
106-
if(tries > 0) {
107-
logger.info("Retrying");
108-
return retry(tries - 1, supplier);
109-
} else {
110-
logger.info("Maximum number of tries exceeded. Failing...");
111-
throw e;
112-
}
113-
}
114-
}
115-
11696
protected TableResult cancelOnExit(TableResult tableResult) {
11797
jobsToCancel.add(tableResult);
11898
return tableResult;

solutions/02-querying-flink-tables/src/test/java/marketplace/OrderServiceIntegrationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void ordersOver50Dollars_shouldOnlyReturnOrdersWithAPriceOf50DollarsOrMor
7272
env.fromValues(orders).insertInto(ordersTableName).execute();
7373

7474
// Execute the query.
75-
TableResult results = retry(() -> orderService.ordersOver50Dollars());
75+
TableResult results = orderService.ordersOver50Dollars();
7676

7777
// Build the expected results.
7878
List<Row> expected = orders.stream().filter(row -> row.<Double>getFieldAs(indexOf("price")) >= 50).toList();
@@ -114,7 +114,7 @@ public void pricesWithTax_shouldReturnTheCorrectPrices() {
114114
env.fromValues(orders).insertInto(ordersTableName).execute();
115115

116116
// Execute the query.
117-
TableResult results = retry(() -> orderService.pricesWithTax(taxAmount));
117+
TableResult results = orderService.pricesWithTax(taxAmount);
118118

119119
// Fetch the actual results.
120120
List<Row> actual = fetchRows(results)

solutions/03-building-a-streaming-pipeline/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ under the License.
198198
</dependency>
199199
</dependencies>
200200
<configuration>
201-
<rerunFailingTestsCount>5</rerunFailingTestsCount>
202201
<argLine>-XX:+EnableDynamicAgentLoading</argLine>
203202
</configuration>
204203
</plugin>

solutions/03-building-a-streaming-pipeline/src/test/java/marketplace/CustomerServiceIntegrationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void allCustomers_shouldReturnTheDetailsOfAllCustomers() throws Exception
5656
env.fromValues(customers).insertInto(customersTableName).execute();
5757

5858
// Execute the query.
59-
TableResult results = retry(() -> customerService.allCustomers());
59+
TableResult results = customerService.allCustomers();
6060

6161
// Fetch the actual results.
6262
List<Row> actual = fetchRows(results)
@@ -90,7 +90,7 @@ public void allCustomerAddresses_shouldReturnTheAddressesOfAllCustomers() throws
9090
env.fromValues(customers).insertInto(customersTableName).execute();
9191

9292
// Execute the query.
93-
TableResult results = retry(() -> customerService.allCustomerAddresses());
93+
TableResult results = customerService.allCustomerAddresses();
9494

9595
// Fetch the actual results.
9696
List<Row> actual = fetchRows(results)

solutions/03-building-a-streaming-pipeline/src/test/java/marketplace/FlinkIntegrationTest.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -93,26 +93,6 @@ public void mainTeardown() {
9393
}
9494
}
9595

96-
protected TableResult retry(Supplier<TableResult> supplier) {
97-
return retry(3, supplier);
98-
}
99-
100-
protected TableResult retry(int tries, Supplier<TableResult> supplier) {
101-
try {
102-
return supplier.get();
103-
} catch (Exception e) {
104-
logger.error("Failed on retryable command.", e);
105-
106-
if(tries > 0) {
107-
logger.info("Retrying");
108-
return retry(tries - 1, supplier);
109-
} else {
110-
logger.info("Maximum number of tries exceeded. Failing...");
111-
throw e;
112-
}
113-
}
114-
}
115-
11696
protected TableResult cancelOnExit(TableResult tableResult) {
11797
jobsToCancel.add(tableResult);
11898
return tableResult;

solutions/03-building-a-streaming-pipeline/src/test/java/marketplace/OrderServiceIntegrationTest.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void ordersOver50Dollars_shouldOnlyReturnOrdersWithAPriceOf50DollarsOrMor
8686
env.fromValues(orders).insertInto(ordersTableName).execute();
8787

8888
// Execute the query.
89-
TableResult results = retry(() -> orderService.ordersOver50Dollars());
89+
TableResult results = orderService.ordersOver50Dollars();
9090

9191
// Build the expected results.
9292
List<Row> expected = orders.stream().filter(row -> row.<Double>getFieldAs(indexOf("price")) >= 50).toList();
@@ -128,7 +128,7 @@ public void pricesWithTax_shouldReturnTheCorrectPrices() {
128128
env.fromValues(orders).insertInto(ordersTableName).execute();
129129

130130
// Execute the query.
131-
TableResult results = retry(() -> orderService.pricesWithTax(taxAmount));
131+
TableResult results = orderService.pricesWithTax(taxAmount);
132132

133133
// Fetch the actual results.
134134
List<Row> actual = fetchRows(results)
@@ -162,6 +162,7 @@ public void pricesWithTax_shouldReturnTheCorrectPrices() {
162162
@Test
163163
@Timeout(60)
164164
public void createFreeShippingTable_shouldCreateTheTable() {
165+
deleteTable(orderQualifiedForFreeShippingTableName);
165166
deleteTableOnExit(orderQualifiedForFreeShippingTableName);
166167

167168
TableResult result = orderService.createFreeShippingTable();
@@ -221,11 +222,9 @@ public void streamOrdersOver50Dollars_shouldStreamRecordsToTheTable() throws Exc
221222
cancelOnExit(orderService.streamOrdersOver50Dollars());
222223

223224
// Query the destination table.
224-
TableResult queryResult = retry(() ->
225-
env.from(orderQualifiedForFreeShippingTableName)
226-
.select($("*"))
227-
.execute()
228-
);
225+
TableResult queryResult = env.from(orderQualifiedForFreeShippingTableName)
226+
.select($("*"))
227+
.execute();
229228

230229
// Obtain the actual results.
231230
List<Row> actual = fetchRows(queryResult)

0 commit comments

Comments
 (0)