Skip to content

Commit 3b9fa6c

Browse files
committed
Exercise 07 - Querying Kafka Topics
1 parent 75d8357 commit 3b9fa6c

File tree

13 files changed

+615
-1
lines changed

13 files changed

+615
-1
lines changed

build.sh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ function validate() {
2424

2525
cp -r $EXERCISES_DIR $TMP_DIR
2626
cp -r $SOLUTIONS_DIR $TMP_DIR
27-
cp -r $STAGING_DIR $TMP_DIR
27+
cp -r $STAGING_DIR $TMP_DIR
28+
cp $EXERCISES_DIR/src/main/resources/cloud.properties $TMP_DIR/$EXERCISES_DIR/cloud.properties
2829

2930
cd $TMP_DIR/$EXERCISES_DIR
3031

@@ -36,6 +37,8 @@ function validate() {
3637
./exercise.sh solve $EXERCISE
3738
echo "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"
3839

40+
cp cloud.properties src/main/resources/cloud.properties
41+
3942
if [ -f "pom.xml" ]; then
4043
mvn clean test
4144
fi
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
# Querying Flink Tables with the Table API (Exercise)
2+
3+
In this exercise, we will execute some Flink Table API queries using the built-in Marketplace tables in Confluent Cloud.
4+
5+
We'll start with basic select statements to set a foundation for future exercises.
6+
7+
## Setup the exercise
8+
9+
Some code has been prepared for you. Load the code into your project by executing the following:
10+
11+
```
12+
./exercise.sh stage 07
13+
```
14+
15+
This will copy source and test files from the staging folder into your exercises directory.
16+
17+
You should now see a `CustomerService.java` and `OrderService.java` in your `marketplace` package and a series of tests for each.
18+
19+
You can run the tests from your IDE, or by executing the following Maven command:
20+
21+
```
22+
mvn clean test
23+
```
24+
25+
Currently, these tests will fail because the methods haven't been implemented.
26+
27+
While working through the exercise, it can be helpful to periodically run the tests to measure progress.
28+
29+
## Query all customers
30+
31+
The first task is to implement the `CustomerService.allCustomers` method.
32+
33+
This is an easy-to-understand query. We are trying to obtain the details for all of the customers.
34+
35+
Most Flink applications will modify the data before sending it to another table. Here we use the data as is and don't send it anywhere. This means it is of limited use.
36+
37+
However, it isn't useless. These queries can often be useful when debugging an application to understand what data is coming in.
38+
39+
More importantly, it creates a foundation for future work.
40+
41+
**Note:** It's important to remember that the data in the table is streaming and unbounded. Once the query is executed it will run forever until it is terminated.
42+
43+
### Implement the method
44+
45+
Implement the `CustomerService.allCustomers` method as follows:
46+
47+
- Select all fields from all customers in the `examples.marketplace.customers` table.
48+
- Execute and return the result.
49+
50+
<details>
51+
<summary>**Hint**</summary>
52+
<p>
53+
You can implement a basic select statement as follows:
54+
55+
```
56+
env.from("TABLE NAME")
57+
.select($("*"))
58+
.execute();
59+
```
60+
</p>
61+
</details>
62+
63+
Run the tests to verify your solution.
64+
65+
### Run the marketplace
66+
67+
Next, we can try the query in the `Marketplace.java` class.
68+
69+
Modify `Marketplace.java` as follows.
70+
71+
- In the `main` method, create an instance of the `CustomerService`.
72+
- Call the `allCustomers` method on the service to get a `TableResult`.
73+
- Call the `print` method on the `TableResult` to print all records to the standard output.
74+
75+
Now, if you run the `Marketplace` it should print out `Customer` records until you terminate the application.
76+
77+
### Inspect the query
78+
79+
Executing this query generates a corresponding SQL query in Confluent Cloud. We can inspect that query to see what it looks like.
80+
81+
![Add Cloud Environment](images/07-querying-flink-tables/flink-statements.png)
82+
83+
- Open Confluent Cloud.
84+
- Navigate to the **flink-table-api-java** environment.
85+
- Select the **Flink** tab and go to **Flink Statements**.
86+
- Click on the most recent statement and look at the query.
87+
88+
It won't be a direct match to the code that you wrote. The records have a hidden `$rowtime` field. Don't worry about that for now. We will return to it later in the course.
89+
90+
## Query all customer addresses
91+
92+
Next, we will implement the `CustomerService.allCustomerAddresses` method. The shipping department will require addresses so they know where to deliver the orders. However, they don't need other customer details. Here, we will create a query that gives them only the details they need.
93+
94+
Implement the `CustomerService.allCustomerAddresses` method as follows:
95+
96+
- Select all customers
97+
- Using the `$('fieldName')` syntax return the following fields:
98+
- `customer_id`
99+
- `address`
100+
- `postcode`
101+
- `city`
102+
- **Note:** The `$('fieldName')` syntax is used to build what is known as an API Expression. These API Expressions will get more complex throughout the course.
103+
104+
<details>
105+
<summary>**Hint**</summary>
106+
<p>
107+
You can specify `$('fieldName')` multiple times, separated by a comma.
108+
</p>
109+
</details>
110+
111+
Run the tests to verify your solution.
112+
113+
You can also print the results inside the `Marketplace.java` file. However, if you try to print results from multiple unbounded queries, only the first one will succeed. The others will be stuck waiting for a query that never finishes.
114+
115+
Have a look at the query in Confluent Cloud. Is it what you expected?
116+
117+
## Query orders over 50 dollars
118+
119+
The previous query filtered out specific columns or fields from the records. Now, we'll implement a query that filters out specific records.
120+
121+
We will implement the `OrderService.ordersOver50Dollars` method. The eCommerce site has a policy that grants free shipping on all orders over 50 dollars. We want to determine which orders qualify for free shipping.
122+
123+
Implement the `OrderService.ordersOver50Dollars` method as follows:
124+
125+
- Select all fields from the `examples.marketplace.orders` table.
126+
- Use a `where` clause to check if the `price` field `isGreaterOrEqual` to 50.
127+
- Execute and return the results.
128+
129+
<details>
130+
<summary>**Hint**</summary>
131+
<p>
132+
The `where` method takes an API Expression (eg. `$('fieldName')`) as a parameter. You can call additional methods such as the `isGreaterOrEqual` method on those expressions.
133+
</p>
134+
</details>
135+
136+
Run the tests.
137+
138+
Execute the method inside the `Marketplace.java` to see the results.
139+
140+
## Query order prices after tax
141+
142+
The final task will be a little more difficult. We will implement the `OrderService.pricesWithTax` method. It will compute a new price after applying a tax rate (eg. Tax Rate = 1.15). This seems simple enough but there is a hidden issue in the data.
143+
144+
Implement the `OrderService.pricesWithTax` method as follows:
145+
146+
- Select all rows from the `examples.marketplace.orders` table.
147+
- Return the following fields, transformed as required:
148+
- Return the `order_id` as is.
149+
- Use the `as` API Expression to rename the `price` field to `original_price`.
150+
- Using the `price` again:
151+
- Use the `times` method to multiply it by the tax amount.
152+
- Use the `round` method to round to 2 decimal places.
153+
- Use the `as` method to rename the result to `price_with_tax`.
154+
155+
<details>
156+
<summary>**Hint**</summary>
157+
<p>
158+
Using API Expressions you can obtain the value of the same field multiple times, but apply different transformations to each. For example, you can say
159+
160+
```
161+
$("my_string")
162+
.as("original_string"),
163+
$("my_string")
164+
.upperCase()
165+
.as("uppercase_string")
166+
```
167+
168+
</p>
169+
</details>
170+
171+
Run the following test: `pricesWithTax_shouldReturnTheCorrectPrices`
172+
173+
```
174+
mvn test -Dtest=OrderServiceTest#pricesWithTax_shouldReturnTheCorrectPrices
175+
```
176+
177+
The test should fail. Inspecting the output should reveal that the expected and actual amounts differ by exactly one cent. But why?
178+
179+
### Investigating the failure
180+
181+
Let's take a moment to try and understand what happened.
182+
183+
![Add Cloud Environment](images/07-querying-flink-tables/stream-processing.png)
184+
185+
- Inspect the SQL in Confluent Cloud. It should look more or less correct.
186+
- Copy the query.
187+
- In Confluent Cloud, navigate to the `Stream processing` section on the left.
188+
- Click the `Create workspace` button to create a new workspace.
189+
- Paste the query into the workspace.
190+
- Modify it as follows:
191+
- Replace ```(ROUND(`price` * 1.15, 2))``` with ```(`price` * 1.15)```.
192+
- Run the query and observe the results.
193+
194+
Many of the `price_with_tax` entries have a long sequence of trailing decimal places (eg. 99999...). However, if you manually do the calculation, you won't get those long trailing decimal places. This is a clue to the problem.
195+
196+
- Create a new statement using the `+` button beside your existing statment. Use the following query:
197+
- `SHOW CREATE TABLE examples.marketplace.orders;`
198+
- Run the query and inspect the results.
199+
200+
Observe the data type for the `price` field. It is listed as a `DOUBLE`.
201+
202+
Floats and doubles are difficult to represent in a binary format. As a result, small errors can creep into floating-point arithmetic. These small errors are notoriously bad when dealing with currencies and result in rounding errors.
203+
204+
### Fixing the problem
205+
206+
To fix this, we need to stop using the `DOUBLE` type and instead use `DECIMAL(10, 2)` which is better for working with currency. We can't change the source table. But that doesn't mean we can't fix the problem.
207+
208+
- Modify the query in your workspace as follows:
209+
- Replace ````price```` with ```CAST(`price` AS DECIMAL(10, 2))```.
210+
- Rerun the query and observe the results. You should see that every result has exactly 4 decimal places. No more trailing decimals.
211+
212+
**Note:** We have 4 decimals rather than 2 (Remember, it's a DECIMAL(10,2). But why? The price has 2 decimals and the tax rate has 2 decimals. When you multiply them 4 decimals are required to obtain the necessary precision.
213+
214+
Next, translate this solution into the `OrderService.priceWithTax` method as follows:
215+
216+
- Use the `cast` method to convert the `price_with_tax` field to a `DataTypes.DECIMAL(10, 2)`.
217+
- For completeness, let's apply the same cast to the `original_price`, even though it isn't affected by this problem.
218+
219+
<details>
220+
<summary>**Hint**</summary>
221+
<p>
222+
The `cast` method is part of the API expression, similar to the `as` method or the `round` method.
223+
224+
Be careful. The location of the cast is important. Should it come before, or after your multiplication?
225+
</p>
226+
</details>
227+
228+
Run all the tests and verify they pass.
229+
230+
Execute the method inside the `Marketplace.java` to see the results (Use whatever tax rate you want).
231+
232+
## Using Confluent Cloud to debug queries
233+
234+
One of the important things we learned in this exercise is how to use Confluent Cloud to debug Table API queries.
235+
236+
The Flink Statement browser allows us to collect details about the SQL statements that are generated by the Table API. Meanwhile, the Stream Processing workspaces allow us to manually edit and execute SQL statements to help locate problems and their solutions.
237+
238+
These are valuable tools you can use throughout the rest of the course if you find yourself getting stuck on an exercise. If you aren't sure what's going on, look at the SQL statement in Confluent Cloud. See if you can modify it to work there, then go back to the code and try to replicate that change.
239+
240+
## Finish
241+
242+
This brings us to the end of this exercise.
192 KB
Loading
50.6 KB
Loading
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package marketplace;
2+
3+
import org.apache.flink.table.api.TableEnvironment;
4+
import org.apache.flink.table.api.TableResult;
5+
6+
import static org.apache.flink.table.api.Expressions.$;
7+
8+
public class CustomerService {
9+
private final TableEnvironment env;
10+
11+
public CustomerService(TableEnvironment env) {
12+
this.env = env;
13+
}
14+
15+
public TableResult allCustomers() {
16+
return env.from("examples.marketplace.customers")
17+
.select($("*"))
18+
.execute();
19+
}
20+
21+
public TableResult allCustomerAddresses() {
22+
return env.from("examples.marketplace.customers")
23+
.select(
24+
$("customer_id"),
25+
$("address"),
26+
$("postcode"),
27+
$("city")
28+
).execute();
29+
}
30+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package marketplace;
2+
3+
import io.confluent.flink.plugin.ConfluentSettings;
4+
import org.apache.flink.table.api.TableEnvironment;
5+
6+
import java.math.BigDecimal;
7+
import java.util.Arrays;
8+
9+
public class Marketplace {
10+
11+
public static void main(String[] args) throws Exception {
12+
ConfluentSettings.Builder settings = ConfluentSettings.newBuilder("/cloud.properties");
13+
14+
TableEnvironment env = TableEnvironment.create(settings.build());
15+
CustomerService customers = new CustomerService(env);
16+
OrderService orders = new OrderService(env);
17+
18+
env.useCatalog("examples");
19+
env.useDatabase("marketplace");
20+
21+
Arrays.stream(env.listTables()).forEach(System.out::println);
22+
23+
customers.allCustomers();
24+
customers.allCustomerAddresses();
25+
orders.ordersOver50Dollars();
26+
orders.pricesWithTax(BigDecimal.valueOf(1.1));
27+
}
28+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package marketplace;
2+
3+
import org.apache.flink.table.api.DataTypes;
4+
import org.apache.flink.table.api.TableEnvironment;
5+
import org.apache.flink.table.api.TableResult;
6+
7+
import java.math.BigDecimal;
8+
9+
import static org.apache.flink.table.api.Expressions.$;
10+
11+
public class OrderService {
12+
protected TableEnvironment env;
13+
14+
public OrderService(TableEnvironment env) {
15+
this.env = env;
16+
}
17+
18+
public TableResult ordersOver50Dollars() {
19+
return env.from("examples.marketplace.orders")
20+
.select($("*"))
21+
.where($("price").isGreaterOrEqual(50))
22+
.execute();
23+
}
24+
25+
public TableResult pricesWithTax(BigDecimal taxAmount) {
26+
return env.from("examples.marketplace.orders")
27+
.select(
28+
$("order_id"),
29+
$("price")
30+
.cast(DataTypes.DECIMAL(10, 2))
31+
.as("original_price"),
32+
$("price")
33+
.cast(DataTypes.DECIMAL(10, 2))
34+
.times(taxAmount)
35+
.round(2)
36+
.as("price_with_tax")
37+
).execute();
38+
}
39+
}

0 commit comments

Comments
 (0)