Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11010] [TABLE] Flink SQL timestamp is inconsistent with currentProcessingTime() #7180

Closed
wants to merge 7 commits into from

Conversation

lamberken
Copy link
Member

What is the purpose of the change

the ProcessingTime is just implemented by invoking System.currentTimeMillis() but the long value will be automatically wrapped to a Timestamp with the following statement:
new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));

Brief change log

org.apache.flink.table.codegen.CodeGenerator#generateProctimeTimestamp
add Default TimeZone#offset

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (no)

@lamberken
Copy link
Member Author

lamberken commented Nov 27, 2018

more discussion details, please read Question about Timestamp in Flink SQL

minimal reproducible example


public static void main(String[] args) throws Exception {

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

  DataStreamSource<String> socket = env.socketTextStream("localhost", 9999);


  tableEnv.registerDataStream("orders", socket, "user,proctime.proctime");

  Table result = tableEnv.sqlQuery("select proctime from orders");
  tableEnv
    .toRetractStream(result, TypeInformation.of(Row.class))
    .addSink(new SinkFunction<Tuple2<Boolean, Row>>() {
        @Override
        public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {

            String sysTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
            String procTime = value.f1.toString();

            System.out.println("sysTime:  " + sysTime );
            System.out.println("procTime: " + procTime );

            System.out.println();

        }
    });
  env.execute();

}

result

sysTime:  2018-11-27 17:29:53.744
procTime: 2018-11-27 09:29:53.744

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contirbution @lamber-ken , can you kindly add test to justify your change please?

@lamberken
Copy link
Member Author

Thanks for review @walterdd. Yes, Ok, I'm very glad to.

read more about Calcite SqlFunctions#internalToTime.

and if I give it an event time with unix timestamp 0, then I got the Timestamp(-28800000). I am confused why internalToTimestamp need to subtract the offset?

@walterddr
Copy link
Contributor

walterddr commented Dec 2, 2018

Hi @lamber-ken . the minimum reproduction example should be a good starting point for adding test/ITCase to justify your change. I also commented on the JIRA ticket as well.

@lamberken
Copy link
Member Author

lamberken commented Dec 4, 2018

hi, @walterddr, I had add sql proctime test.

@lamberken
Copy link
Member Author

lamberken commented Dec 4, 2018

hi, @tillrohrmann, the ci build failed, but I can't find something helpful for me from log, how to rebuild ?
here is ci log detail

@lzqdename
Copy link
Contributor

lzqdename commented Dec 6, 2018

let me show how to generate the wrong result


background: processing time in tumbling window flink:1.5.0

the invoke stack is as follows:
[1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747)
[2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53)
[3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74)
[4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72)
[5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39)
[6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46)
[7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550)
[8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505)
[9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266)
[10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281)
[11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
[12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
[13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180)
[14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293)
[15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142)
[16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
[17] java.lang.Thread.run (Thread.java:748)

now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747)

and the code is as follows:
public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); }
let us print the value of windowStart:v
print v
v = 1544074830000
let us print the value of windowEnd:v
print v
v = 1544074833000

after this, come back to
[1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51)

then,we will execute
`
if (windowStartOffset.isDefined) {
output.setField(
lastFieldPos + windowStartOffset.get,
SqlFunctions.internalToTimestamp(windowStart))
}

if (windowEndOffset.isDefined) {
output.setField(
lastFieldPos + windowEndOffset.get,
SqlFunctions.internalToTimestamp(windowEnd))
}
`

before execute,the output is
output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
after execute,the output is
output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null"

so,do you think the
long value 1544074830000 translated to be 2018-12-06 05:40:30.0
long value 1544074833000 translated to be 2018-12-06 05:40:33.0
would be right?

I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0

okay,let us continue

now ,the data will be written to kafka,before write ,the data will be serialized
let us see what happened!

the call stack is as follows:
[1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41)
[2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48)
[3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15)
[4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130)
[5] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue (ObjectMapper.java:2,444)
[6] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree (ObjectMapper.java:2,586)
[7] org.apache.flink.formats.json.JsonRowSerializationSchema.convert (JsonRowSerializationSchema.java:189)
[8] org.apache.flink.formats.json.JsonRowSerializationSchema.convertRow (JsonRowSerializationSchema.java:128)
[9] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize (JsonRowSerializationSchema.java:102)
[10] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize (JsonRowSerializationSchema.java:51)
[11] org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue (KeyedSerializationSchemaWrapper.java:46)
[12] org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke (FlinkKafkaProducer010.java:355)
[13] org.apache.flink.streaming.api.operators.StreamSink.processElement (StreamSink.java:56)
[14] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560)
[15] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535)
[16] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515)
[17] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679)
[18] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657)
[19] org.apache.flink.streaming.api.operators.StreamMap.processElement (StreamMap.java:41)
[20] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560)
[21] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535)
[22] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515)
[23] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679)
[24] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657)
[25] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51)
[26] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37)
[27] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28)
[28] DataStreamCalcRule$88.processElement (null)
[29] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:66)
[30] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:35)
[31] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66)
[32] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560)
[33] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535)
[34] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515)
[35] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679)
[36] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657)
[37] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51)
[38] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:65)
[39] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74)
[40] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72)
[41] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39)
[42] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46)
[43] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550)
[44] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505)
[45] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266)
[46] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281)
[47] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
[48] java.util.concurrent.FutureTask.run (FutureTask.java:266)
[49] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180)
[50] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293)
[51] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142)
[52] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
[53] java.lang.Thread.run (Thread.java:748)

and the code is as follows:
protected long _timestamp(Date value)
{
return value == null ? 0L : value.getTime();
}

here,use windowEnd for example,the value is
value = "2018-12-06 05:40:33.0"
value.getTime() = 1544046033000

see,the initial value is 1544074833000 and the final value is 1544046033000

the minus value is 28800000, ---> 8 hours ,because I am in China.

why? the key reason is SqlFunctions.internalToTimestamp
public static Timestamp internalToTimestamp(long v)
{
return new Timestamp(v - LOCAL_TZ.getOffset(v));
}

in the code, It minus the LOCAL_TZ , I think it is redundant!

@lamberken
Copy link
Member Author

@lzqdename, thanks for your detail description.

@Jennifer-sarah
Copy link

+1, from my side. I had the same problem. thanks

@walterddr
Copy link
Contributor

Hi @lamber-ken sorry for the late response. I am a bit confuse with the JIRA ticket description and the PR comments.

I am under the impression that the goal should make all internal times GMT-based regardless of where the code runs. Thus currentProcessingTime() should return a GMT-based time. if my understanding is correct, shouldn't the fix be in the "timerService" side instead?

@lamberken
Copy link
Member Author

@walterddr, you are welcome.
flink sql use CodeGenerator#generateProctimeTimestamp method to process proctime and its type is Timestamp, flink use calcite to convert long to Timestamp

but calcite implementation of the method as below, SqlFunctions#internalToTime

public static java.sql.Time internalToTime(int v) {
    return new java.sql.Time(v - LOCAL_TZ.getOffset(v));
}

@lzqdename
Copy link
Contributor

lzqdename commented Dec 10, 2018

In the Flink
to get process time ,the result is OK
then , In the Flink,the result is transfromed several times
for example , Long ->Timestamp ->Long->Timestamp

when Long ->Timestamp,use code as follows:[In SqlFunctions]
public static java.sql.Time internalToTime(int v) {
return new java.sql.Time(v - LOCAL_TZ.getOffset(v));
}

when Timestamp->Long,use code as follows:[In SqlFunctions]
// mainly intended for java.sql.Timestamp but works for other dates also
public static long toLong(java.util.Date v, TimeZone timeZone) {
final long time = v.getTime();
return time + timeZone.getOffset(time);
}

In the final write step,
Timestamp to Long
use code as follows:
protected long _timestamp(Date value) {
return value == null ? 0L : value.getTime();
}
the position is :org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41)

now ,let me think, we use two different class to handle the transformation between Long and Timestamp,
and the first class SqlFunctions imports TimeZone factor,
but the second class ,TimeZone factor not imported

It is not consitent when handling time transformation!

@lamberken
Copy link
Member Author

lamberken commented Dec 10, 2018

hi, @twalthr, @dawidwys, cc or talk about it, thanks. :)

@AdolphKK
Copy link

@lamber-ken thanks, it looks good to me. Our servers are in EST zone and this problem has been bothering me for a long time.

@samsai
Copy link

samsai commented Dec 11, 2018

i got the same problem too:
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ssZ")
val origin: DataStream[TransactionEvent] = env.fromCollection(List(
TransactionEvent("u1", format.parse("2018-01-02 01:13:30+0800"), 10)
))
val source2 = origin
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorTransactionEvent{
override def extractTimestamp(element: TransactionEvent): Long = {
val timestamp = element.time.getTime
println(s"extractTimestamp:$timestamp")
timestamp
}
})
tEnv.fromDataStream(source2, 'user,'eventTime.rowtime)
.toAppendStream[Row].print()

I got eventTime as 2018-01-01 17:13:30.0, which is 8hours delayed

@lamberken
Copy link
Member Author

hi, @zentol cc

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lamber-ken thanks for the explanation. I am still concern whether this is the best solution to the problem by working backwards from Calcite. I tagged https://issues.apache.org/jira/browse/FLINK-8353 as a more well structured plan to add timezone support to FLINK.

Timestamp procTimestamp = (Timestamp) value.getField(0);

// validate the second here
long procSecondTime = procTimestamp.getTime() / 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes me some trouble because cases can happen when they cross the second boundary. this is not a stable ITCase in my opinion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's better to validate hour. I'll update.

@lamberken
Copy link
Member Author

lamberken commented Dec 11, 2018

@walterddr, as the https://issues.apache.org/jira/browse/FLINK-8353 desc.

  • need to modify each function which works with sql proctiome.
  • it's not user-friendly, because user need to use udf function to convert proctime.
  • flink sql shoud consistent with flink-jar program which user do not need to care the default timezone.

@samsai
Copy link

samsai commented Dec 12, 2018

@walterddr could you please check the timezone problem when dealing with eventtime?
it seems that lamber-ken's commit only fixed proc time.
Would you please refer to my previous comment to take a look at my eventtime usecase. thanks a lot!
------update
i found the correct timestamp was changed in OutputRowtimeProcessFunction, using SqlFunctions.internalToTimestamp(), i'm not sure whether its redundant or not.
please fix this problem. thx

@walterddr
Copy link
Contributor

@walterddr could you please check the timezone problem when dealing with eventtime?
it seems that lamber-ken's commit only fixed proc time.
Would you please refer to my previous comment to take a look at my eventtime usecase. thanks a lot!
------update
i found the correct timestamp was changed in OutputRowtimeProcessFunction, using SqlFunctions.internalToTimestamp(), i'm not sure whether its redundant or not.
please fix this problem. thx

FLINK-11010 only reports bugs for utilizing currentProcessingTime(). maybe we can mark the JIRA as duplicate of FLINK-8353 and focus on addressing the bigger overall timezone problem instead. what do you think?

Copy link
Contributor

@hequn8128 hequn8128 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lamber-ken Thanks for looking into the problem and bring the discussions.
I'm afraid we can't add the offset simply. I have answered your question in the user mailing list. Below is a copy:

The two methods(add offset and minus offset) do not have to appear in pairs. Currently, Flink doesn't support time zone configuration. The timestamp(time of type Timestamp) always means the time in UTC+0. So in the test of your pr[1], the output timestamp means a time in UTC+0, instead of a time in your timezone. You can verify it by changing your sql to:
String sqlQuery = "select proctime, LOCALTIMESTAMP from MyTable";

But you raised a good question and it is true that it would be better to support time zone configuration in Flink. For example, provide a global timezone configuration. However, it is not a one or two lines code change. We need to take all operators into consideration. And it is better to solve it once for all.

Best, Hequn

@lamberken
Copy link
Member Author

hi, @hequn8128, tableEnv can set timezone like bellow code. but just only set the timezone, not apply to flink system. yes, it is better to solve it once for all, not just add the offset simply.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env);
tableEnv.getConfig().setTimeZone(TimeZone.getDefault());       
        

Or it can just add the offset simply now, and then redesign how to solve this problem(for example, a global timezone configuration). because it can solve the problem currently.

@hequn8128
Copy link
Contributor

hequn8128 commented Dec 30, 2018

@lamber-ken Yes, you are right. It would be good if we can solve it in a simple way. However, you pr would bring some side effects.

Basically, Flink returns timestamp in UTC+0. With your pr, the output time is a local timestamp for proctime. However, you only changed one place about how to represent a timestamp. There are a lot of other places still return timestamp in UTC+0 which would bring confusion and inconsistency.

Flink also returns and only returns local timestamp in functions that clearly indicate that the time of the return type is a local timestamp, such as the CURRENT_TIMESTAMP function. String sqlQuery = "select proctime, LOCALTIMESTAMP from MyTable"; This is why offset is added in the place of the link you have provided.

Best, Hequn

@lamberken
Copy link
Member Author

@hequn8128, Ok, I see. thanks

@wangxlong
Copy link
Contributor

wangxlong commented Jan 10, 2019

Besides the 'select proctime' case, windowStart and windowEnd fields also have this problem when using processing-time window.

if (windowStartOffset.isDefined) {
      output.setField(
        lastFieldPos + windowStartOffset.get,
        SqlFunctions.internalToTimestamp(windowStart))
    }
if (windowEndOffset.isDefined) {
      output.setField(
        lastFieldPos + windowEndOffset.get,
        SqlFunctions.internalToTimestamp(windowEnd))
   }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
9 participants