Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-19530] [docs] Refactor Joins and Dynamic Table english content pages #14373

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

KKcorps
Copy link

@KKcorps KKcorps commented Dec 13, 2020

What is the purpose of the change

The PR addresses the refactoring of the documentation under FLIP-60. The PR addresses the table concepts documentation and makes changes so that the flow is more streamlines.

Brief change log

  • Move Idle State Retention Time configuration from Query Configuration to Dynamic Tables Page.
  • Streamline Dynamic Tables page. Use unified examples throughout the documentation.
  • Explain in more detail why regular joins are not feasible on unbounded data.

Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 5363cc0 (Sun Dec 13 13:32:59 UTC 2020)

Warnings:

  • Documentation files were touched, but no .zh.md files: Update Chinese documentation or file Jira ticket.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 13, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

- **State Size:** Continuous queries are evaluated on unbounded streams and are often supposed to run for weeks or months. Hence, the total amount of data that a continuous query processes can be very large. Queries that have to update previously emitted results need to maintain all emitted rows in order to be able to update them. For instance, the first example query needs to store the URL count for each user to be able to increase the count and sent out a new result when the input table receives a new row. If only registered users are tracked, the number of counts to maintain might not be too high. However, if non-registered users get a unique user name assigned, the number of counts to maintain would grow over time and might eventually cause the query to fail.

{% highlight sql %}
SELECT user, COUNT(url)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user is a keyword in flink sql. We should surround them with backticks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do that, or rename it to something else since this is a made-up query. It could be user_id :)

- **Computing Updates:** Some queries require to recompute and update a large fraction of the emitted result rows even if only a single input record is added or updated. Clearly, such queries are not well suited to be executed as continuous queries. An example is the following query which computes for each user a `RANK` based on the time of the last click. As soon as the `clicks` table receives a new row, the `lastAction` of the user is updated and a new rank must be computed. However since two rows cannot have the same rank, all lower ranked rows need to be updated as well.

{% highlight sql %}
SELECT user, RANK() OVER (ORDER BY lastAction)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem as above.

Copy link
Contributor

@sjwiesman sjwiesman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made it most of the way through the dynamic tables page. I haven't looked at joins or versioned tables yet. Please run your work through a spell checker. I just copied it into a google doc and found a number of issues.


The following table compares traditional relational algebra and stream processing for input data, execution, and output results.
First, let's try to understand the challenges processing unbounded data presents as compared to bounded data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like making this less "academic sounding" but I'd like to keep the tone more positive.
What about something like:

Suggested change
First, let's try to understand the challenges processing unbounded data presents as compared to bounded data.
First, let's try to understand the processing of unbounded data differs from bounded data.


The following table compares traditional relational algebra and stream processing for input data, execution, and output results.
First, let's try to understand the challenges processing unbounded data presents as compared to bounded data.
In Flink, unbounded data is represented as data stream while bounded data is represented as batch table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grammar nits

Suggested change
In Flink, unbounded data is represented as data stream while bounded data is represented as batch table.
In Flink, unbounded data is represented as a data stream, while bounded data is represented as a batch table.

</table>

Despite these differences, relational queries and SQL provide a powerful toolset for processing streams. Advanced relational database systems offer a feature called *Materialized Views*. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the query result such that the query does not need to be evaluated when it is accessed. A common challenge for caching is to prevent a cache from serving outdated results. A materialized view becomes obsolete when the base tables of its definition query are modified. *Eager View Maintenance* is a technique to update a materialized view as soon as its base tables are updated.
Despite these challenges, processing streams with relational queries and SQL is not impossible. We take inspirationg from the *Materialized Views* feature of the relational database systems. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the result of the query such that the query does not need to be evaluated when the view is accessed. However, a common challenge for Materialized Views is to prevent the cache from serving outdated results when the base tables of its definition query are modified. Databases use *Eager View Maintenance* technique to update a materialized view as soon as its base tables are updated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I think we should aim for a more positive tone.

Despite these challenges, processing streams with relational queries and SQL is not impossible.

phrases like "challenges" and "not impossible" make it sound like this is going to be doable but hard. What we want to convey is that Flink makes streaming processing approachable.

Why don't we replace the first sentence of this paragraph with the original first sentence.

Despite these differences, relational queries and SQL provide a powerful toolset for processing streams.

There are also some spelling errors in this paragraph. "inspirationg" among others.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


Dynamic Tables &amp; Continuous Queries
---------------------------------------

*Dynamic tables* are the core concept of Flink's Table API and SQL support for streaming data. In contrast to the static tables that represent batch data, dynamic tables change over time. But just like static batch tables, systems can execute queries over dynamic tables. Querying dynamic tables yields a *Continuous Query*. A continuous query never terminates and produces dynamic results - another dynamic table. The query continuously updates its (dynamic) result table to reflect changes on its (dynamic) input tables. Essentially, a continuous query on a dynamic table is very similar to a query that defines a materialized view.
*Dynamic tables* are the core concept of Flink's Table API and SQL to support queries on streaming data. In contrast to the static tables that represent batch data, dynamic tables change over time. However, they can be queried like static batch tables. A query on Dynamic Tables never terminates and continuously updates its result to reflect the changes on its input tables. These queries are known as *Continuous Queries*. If Dynamic Table is equivalent to a Materialized View then continuous queries are equivalent to the SQL query defined on the base table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sentence "However, they can be queried like static batch tables." feels out of place in this paragraph. Drop it? I'm not sure what you're trying to say with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't the continuous queries what is comparable to a materialized view, and not dynamic tables themselves?

These queries are known as Continuous Queries. If Dynamic Table is equivalent to a Materialized View then continuous queries are equivalent to the SQL query defined on the base table.


It is important to note that a continuous query output is always semantically equivalent to the result of the same query executed in batch mode on a snapshot of the input tables.
It is important to note that the result of a continuous query is always semantically equivalent to the result of the same query executed in batch mode on a snapshot of the input tables.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the result of" makes the sentence passive and its not actually necessary in the context. dropping it gives an active voice.

Suggested change
It is important to note that the result of a continuous query is always semantically equivalent to the result of the same query executed in batch mode on a snapshot of the input tables.
It is important to note that a continuous query is always semantically equivalent to the result of the same query executed in batch mode on a snapshot of the input tables.

</center>
<br><br>

The API to convert a dynamic table into a `DataStream` is discussed on the [Common Concepts]({% link dev/table/common.md %}#convert-a-table-into-a-datastream) page. Please note that only append and retract streams are supported when converting a dynamic table into a `DataStream`. The `TableSink` interface to emit a dynamic table to an external system are discussed on the [TableSources and TableSinks](../sourceSinks.html#define-a-tablesink) page.
The API to convert a dynamic table into a `DataStream` is discussed on the [Common Concepts]({{ site.baseurl }}/dev/table/common.html#convert-a-table-into-a-datastream) page. Please note that only Append and Retract streams are supported when converting a dynamic table into a `DataStream`. The `TableSink` interface to emit a dynamic table to an external system are discussed on the [TableSources and TableSinks](../sourceSinks.html#define-a-tablesink) page.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert the link to common concepts to use the link tag.

Challenges in Unbounded Data Processing
----------------------------------------

Most of the semantically valid queries can be evaluated as continuous queries on streams. However, there are some queries that are too expensive to compute, either due to the size of state that they need to maintain or because computing updates is too expensive.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sentence is very wordy. I think we can strip it down.

Suggested change
Most of the semantically valid queries can be evaluated as continuous queries on streams. However, there are some queries that are too expensive to compute, either due to the size of state that they need to maintain or because computing updates is too expensive.
Most of the semantically valid queries can be evaluated as continuous queries on streams. However, some queries are too expensive to compute either due to the size of state that they need to maintain or because computing updates are too costly.


Most of the semantically valid queries can be evaluated as continuous queries on streams. However, there are some queries that are too expensive to compute, either due to the size of state that they need to maintain or because computing updates is too expensive.

- **State Size:** Continuous queries are evaluated on unbounded streams and are often supposed to run for weeks or months. Hence, the total amount of data that a continuous query processes can be very large. Queries that have to update previously emitted results need to maintain all emitted rows in order to be able to update them. For instance, the first example query needs to store the URL count for each user to be able to increase the count and sent out a new result when the input table receives a new row. If only registered users are tracked, the number of counts to maintain might not be too high. However, if non-registered users get a unique user name assigned, the number of counts to maintain would grow over time and might eventually cause the query to fail.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some filler words in this paragraph we can drop. Additionally, I think we need to mention that state size for the first query is only problematic if the keyspace is unbounded. Especially since is needs to store aggregates.

Suggested change
- **State Size:** Continuous queries are evaluated on unbounded streams and are often supposed to run for weeks or months. Hence, the total amount of data that a continuous query processes can be very large. Queries that have to update previously emitted results need to maintain all emitted rows in order to be able to update them. For instance, the first example query needs to store the URL count for each user to be able to increase the count and sent out a new result when the input table receives a new row. If only registered users are tracked, the number of counts to maintain might not be too high. However, if non-registered users get a unique user name assigned, the number of counts to maintain would grow over time and might eventually cause the query to fail.
- **State Size:** Continuous queries are evaluated on unbounded streams and are often supposed to run for weeks or months. Hence, the total amount of data that a continuous query processes can be very large. Queries that have to update previously emitted results need to maintain all emitted rows to update them. For instance, the first example query needs to store the URL and count for each user to increase the count and send out a new result when the input table receives a new row. If only registered users are tracked, the number of counts to maintain might not be too high. However, if non-registered users get a unique user name assigned, the number of counts to maintain would grow over time and eventually cause the query to fail.

- **State Size:** Continuous queries are evaluated on unbounded streams and are often supposed to run for weeks or months. Hence, the total amount of data that a continuous query processes can be very large. Queries that have to update previously emitted results need to maintain all emitted rows in order to be able to update them. For instance, the first example query needs to store the URL count for each user to be able to increase the count and sent out a new result when the input table receives a new row. If only registered users are tracked, the number of counts to maintain might not be too high. However, if non-registered users get a unique user name assigned, the number of counts to maintain would grow over time and might eventually cause the query to fail.

{% highlight sql %}
SELECT user, COUNT(url)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do that, or rename it to something else since this is a made-up query. It could be user_id :)

GROUP BY user;
{% endhighlight %}

- **Computing Updates:** Some queries require to recompute and update a large fraction of the emitted result rows even if only a single input record is added or updated. Clearly, such queries are not well suited to be executed as continuous queries. An example is the following query which computes for each user a `RANK` based on the time of the last click. As soon as the `clicks` table receives a new row, the `lastAction` of the user is updated and a new rank must be computed. However since two rows cannot have the same rank, all lower ranked rows need to be updated as well.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two points

  1. There are a number of grammar issues with this paragraph.

  2. I disagree with the premise. Users need to be cognizant of state size for queries like this but that doesn't mean they are reasonable queries to run in production if you know what you're doing.

Copy link
Contributor

@morsapaes morsapaes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments on top of @sjwiesman's. As I see it, the "Dynamic Tables" page was not in need of a complete refactor (most explanations were simpler before, IMO), but more of an update to include some new concepts that were not covered (e.g. changelog sources), as well as some structure changes to make it less "heavy" — which you addressed.

I think it could help if we better aligned on the goals for each page you'd like to refactor before working on them.


The following table compares traditional relational algebra and stream processing for input data, execution, and output results.
First, let's try to understand the challenges processing unbounded data presents as compared to bounded data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
First, let's try to understand the challenges processing unbounded data presents as compared to bounded data.
First, let's try to understand how the processing of unbounded data differs from that of bounded data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here it doesn't make sense to go into bounded and unbounded data, but to state how SQL on streams differs from traditional "batch" SQL. The main point should be to bridge what people are familiar with from the RDBMS world and Flink SQL.

</table>

Despite these differences, relational queries and SQL provide a powerful toolset for processing streams. Advanced relational database systems offer a feature called *Materialized Views*. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the query result such that the query does not need to be evaluated when it is accessed. A common challenge for caching is to prevent a cache from serving outdated results. A materialized view becomes obsolete when the base tables of its definition query are modified. *Eager View Maintenance* is a technique to update a materialized view as soon as its base tables are updated.
Despite these challenges, processing streams with relational queries and SQL is not impossible. We take inspirationg from the *Materialized Views* feature of the relational database systems. A materialized view is defined as a SQL query, just like a regular virtual view. In contrast to a virtual view, a materialized view caches the result of the query such that the query does not need to be evaluated when the view is accessed. However, a common challenge for Materialized Views is to prevent the cache from serving outdated results when the base tables of its definition query are modified. Databases use *Eager View Maintenance* technique to update a materialized view as soon as its base tables are updated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines -57 to +64
The connection between eager view maintenance and SQL queries on streams becomes evident if we consider the following:
We can draw the following analogy between Eager view maintenance and Streaming SQL queries:

- A database table results from a *stream* of `INSERT`, `UPDATE`, and `DELETE` DML statements, often called *changelog stream*.
- A materialized view is defined as a SQL query. To update the view, queries must continuously process the changelog streams of the view's base relations.
- The materialized view is the result of the streaming SQL query.
- A table in most of the databases as well as Flink is the result of a *stream* of `INSERT`, `UPDATE`, and `DELETE` DML statements, often called *changelog stream*.
- The result of Streaming SQL query is equivalent to a Materialized View.
- The SQL query continuously processes the changelog streams of the base tables to update the result stream.

We introduce the following concept of *Dynamic tables* in the next section with these points in mind.
Flink Table API defines these materialized view as *Dynamic tables*. Let's take a look at them in the next section.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the original version might be clearer than the proposed changes.

@@ -28,161 +28,285 @@ This page describes how relational concepts elegantly translate to streaming, al
* This will be replaced by the TOC
{:toc}

Relational Queries on Data Streams
----------------------------------
Relational Queries on Unbounded Data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't change the title. "Unbounded Data" is not as straighforward to understand as "Data Streams".


Dynamic Tables &amp; Continuous Queries
---------------------------------------

*Dynamic tables* are the core concept of Flink's Table API and SQL support for streaming data. In contrast to the static tables that represent batch data, dynamic tables change over time. But just like static batch tables, systems can execute queries over dynamic tables. Querying dynamic tables yields a *Continuous Query*. A continuous query never terminates and produces dynamic results - another dynamic table. The query continuously updates its (dynamic) result table to reflect changes on its (dynamic) input tables. Essentially, a continuous query on a dynamic table is very similar to a query that defines a materialized view.
*Dynamic tables* are the core concept of Flink's Table API and SQL to support queries on streaming data. In contrast to the static tables that represent batch data, dynamic tables change over time. However, they can be queried like static batch tables. A query on Dynamic Tables never terminates and continuously updates its result to reflect the changes on its input tables. These queries are known as *Continuous Queries*. If Dynamic Table is equivalent to a Materialized View then continuous queries are equivalent to the SQL query defined on the base table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't the continuous queries what is comparable to a materialized view, and not dynamic tables themselves?

These queries are known as Continuous Queries. If Dynamic Table is equivalent to a Materialized View then continuous queries are equivalent to the SQL query defined on the base table.


* **Delete messages** - `DELETE` changes as delete messages. The stream consuming operator needs to be aware of the unique key attribute in order to apply messages correctly.

The main difference between and upstert stream and a retract stream is that `UPDATE` changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
The main difference between and upstert stream and a retract stream is that `UPDATE` changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream.
The main difference between and upsert stream and a retract stream is that `UPDATE` changes are encoded with a single message and hence more efficient. The following figure visualizes the conversion of a dynamic table into an upsert stream.

The API to convert a dynamic table into a `DataStream` is discussed on the [Common Concepts]({% link dev/table/common.md %}#convert-a-table-into-a-datastream) page. Please note that only append and retract streams are supported when converting a dynamic table into a `DataStream`. The `TableSink` interface to emit a dynamic table to an external system are discussed on the [TableSources and TableSinks](../sourceSinks.html#define-a-tablesink) page.
The API to convert a dynamic table into a `DataStream` is discussed on the [Common Concepts]({{ site.baseurl }}/dev/table/common.html#convert-a-table-into-a-datastream) page. Please note that only Append and Retract streams are supported when converting a dynamic table into a `DataStream`. The `TableSink` interface to emit a dynamic table to an external system are discussed on the [TableSources and TableSinks](../sourceSinks.html#define-a-tablesink) page.

Challenges in Unbounded Data Processing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I'd keep the original section.

Comment on lines +205 to +310

Currently, you can tune the Idle state retention time in Flink. By removing the state of a key, the continuous query completely forgets that it has seen this key before. If a record with a key, whose state has been removed before, is processed, the record will be treated as if it was the first record with the respective key. e.g. In the queries mentioned in the [Challenges in Unbounded Data Processing](#challenges-in-unbounded-data-processing) section, we can allow flink to forget about an `user` after 1 hour. If a new event, arrives for the same user after the state has been cleared up, the count will be initiated to `0`

There are two parameters to configure the idle state retention time:
- The **minimum idle state retention time** defines how long the state of an inactive key is at least kept before it is removed.
- The **maximum idle state retention time** defines how long the state of an inactive key is at most kept before it is removed.



<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// obtain query configuration from TableEnvironment
TableConfig tConfig = tableEnv.getConfig();
// set query parameters
tConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24));

// define query
Table result = ...

// create TableSink
TableSink<Row> sink = ...

// register TableSink
tableEnv.registerTableSink(
"outputTable", // table name
new String[]{...}, // field names
new TypeInformation[]{...}, // field types
sink); // table sink

// emit result Table via a TableSink
result.executeInsert("outputTable");

// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class);

{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// obtain query configuration from TableEnvironment
val tConfig: TableConfig = tableEnv.getConfig
// set query parameters
tConfig.setIdleStateRetentionTime(Time.hours(12), Time.hours(24))

// define query
val result: Table = ???

// create TableSink
val sink: TableSink[Row] = ???

// register TableSink
tableEnv.registerTableSink(
"outputTable", // table name
Array[String](...), // field names
Array[TypeInformation[_]](...), // field types
sink) // table sink

// emit result Table via a TableSink
result.executeInsert("outputTable")

// convert result Table into a DataStream[Row]
val stream: DataStream[Row] = result.toAppendStream[Row]

{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
# use TableConfig in python API
t_config = TableConfig()
# set query parameters
t_config.set_idle_state_retention_time(timedelta(hours=12), timedelta(hours=24))

env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env, t_config)

# define query
result = ...

# create TableSink
sink = ...

# register TableSink
table_env.register_table_sink("outputTable", # table name
sink) # table sink

# emit result Table via a TableSink
result.insert_into("outputTable")

{% endhighlight %}
</div>
</div>

Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of `minTime` and `maxTime`. The difference between `minTime` and `maxTime` must be at least 5 minutes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is way too involved for a documentation page introducing dynamic tables. Is there a particular reason to mention this here when idle state retention is explained in "Query Configuration"?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Because the Query Configuration page makes sense when there are a lot of configurations to tune for and the configurations can be used for multiple scenarios but that's not the case.

Comment on lines 40 to 42
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
SELECT *
FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

Comment on lines +47 to +56
The regular joins present a lot of challenges though. Let's assume all the rows in table are represented as `(t, k, v)` where t is the timestamp of arrival, k is the key of the row which may or may not be unique and v represents the value to be joined. All of them can be followed by suffix `l` or `r` where l implies the left side of the join and r the right side of the join.

Taking a case for a single key K, we can have the following cases -
* **tl != tr** - This implies that to do a join you have to keep the value in left table in the state until the value in the right table arrives for the same key. The question is how to decide for how long to hold the key, since the delay between the streams is generally not constant.

* **tl = Infinity or tr = Infinity** - what if one of the streams doesn't contain any values for the key. Do you keep on holding to value from one of the streams indefinitely or do you discard it after some time? If you want to discard it, what should be the right interval after which it can be considered safe to discard the key.

The only solution is to keep both sides of the join input in Flink's state forever.
Thus, the resource usage will grow indefinitely as well, if one or both input tables are continuously growing.
Flink offers `Interval Joins` to tackle the indefinitely growing state problem. Let's take a look at these in the next section.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @sjwiesman mentioned a couple of times before, these changes give it a negative spin and are also more lengthy than necessary to introduce regular joins.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants