Skip to content

Commit

Permalink
KYLIN-3788 Modify the time conversion time zone of the kafka streamin…
Browse files Browse the repository at this point in the history
…g access

* change simpleformat timezone of   kafka streaming timestamp, such as  hour_start, day_start ...

* Modify the time conversion logic of the  kafka data access; this fix solve the problem that the streaming task time is different from the realtime due to the time zone in kafka streaming.

* KYLIN-3788 This commit Changes SimpleDateformat to FastDateFormat because SimpleDateFormat is forbidden API.
  • Loading branch information
zhaojintaozhao authored and nichunen committed Apr 12, 2019
1 parent ed266aa commit e13f101
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ public static Date stringToDate(String str, String pattern) {
return date;
}

public static String formatToTimeStrWithTimeZone(TimeZone timeZone, long mills){
return formatToStrWithTimeZone(timeZone, mills, DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
}

public static String formatToDateStrWithTimeZone(TimeZone timeZone, long mills){
return formatToStrWithTimeZone(timeZone, mills, DEFAULT_DATE_PATTERN);
}

private static String formatToStrWithTimeZone(TimeZone timeZone, long mills, String pattern){
FastDateFormat dateFormat = FastDateFormat.getInstance(pattern, timeZone);
return dateFormat.format(new Date(mills));
}

public static long stringToMillis(String str) {
// try to be smart and guess the date format
if (isAllDigits(str)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,37 @@ public static long getHourStart(long ts) {
}

public static long getDayStart(long ts) {
return ts / ONE_DAY_TS * ONE_DAY_TS;
return getDayStartWithTimeZone(gmt, ts);
}

public static long getDayStartWithTimeZone(TimeZone timeZone, long ts){
Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
calendar.setTimeInMillis(ts);
int year = calendar.get(Calendar.YEAR);
int month = calendar.get(Calendar.MONTH);
int date = calendar.get(Calendar.DATE);
calendar.clear();
calendar.set(year, month, date);
return calendar.getTimeInMillis();
}

public static long getWeekStart(long ts) {
Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
calendar.setTimeInMillis(getDayStart(ts));
return getWeekStartWithTimeZone(gmt, ts);
}

public static long getWeekStartWithTimeZone(TimeZone timeZone, long ts){
Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
calendar.setTimeInMillis(getDayStartWithTimeZone(timeZone, ts));
calendar.add(Calendar.DAY_OF_WEEK, calendar.getFirstDayOfWeek() - calendar.get(Calendar.DAY_OF_WEEK));
return calendar.getTimeInMillis();
}

public static long getMonthStart(long ts) {
Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
return getMonthStartWithTimeZone(gmt, ts);
}

public static long getMonthStartWithTimeZone(TimeZone timeZone, long ts){
Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
calendar.setTimeInMillis(ts);
int year = calendar.get(Calendar.YEAR);
int month = calendar.get(Calendar.MONTH);
Expand All @@ -65,7 +84,11 @@ public static long getMonthStart(long ts) {
}

public static long getQuarterStart(long ts) {
Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
return getQuarterStartWithTimeZone(gmt, ts);
}

public static long getQuarterStartWithTimeZone(TimeZone timeZone, long ts) {
Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
calendar.setTimeInMillis(ts);
int year = calendar.get(Calendar.YEAR);
int month = calendar.get(Calendar.MONTH);
Expand All @@ -75,7 +98,11 @@ public static long getQuarterStart(long ts) {
}

public static long getYearStart(long ts) {
Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
return getYearStartWithTimeZone(gmt, ts);
}

public static long getYearStartWithTimeZone(TimeZone timeZone, long ts) {
Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
calendar.setTimeInMillis(ts);
int year = calendar.get(Calendar.YEAR);
calendar.clear();
Expand All @@ -84,14 +111,22 @@ public static long getYearStart(long ts) {
}

public static long getWeekEnd(long ts) {
Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
calendar.setTimeInMillis(getWeekStart(ts));
return getWeekEndWithTimeZone(gmt, ts);
}

public static long getWeekEndWithTimeZone(TimeZone timeZone, long ts) {
Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
calendar.setTimeInMillis(getWeekStartWithTimeZone(timeZone, ts));
calendar.add(Calendar.DAY_OF_WEEK, 7);
return calendar.getTimeInMillis();
}

public static long getMonthEnd(long ts) {
Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
return getMonthEndWithTimeZone(gmt, ts);
}

public static long getMonthEndWithTimeZone(TimeZone timeZone, long ts) {
Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
calendar.setTimeInMillis(ts);
calendar.set(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONDAY), calendar.get(Calendar.DAY_OF_MONTH), 0, 0, 0);
calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMaximum(Calendar.DAY_OF_MONTH));
Expand All @@ -100,15 +135,23 @@ public static long getMonthEnd(long ts) {
}

public static long getQuarterEnd(long ts) {
Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
calendar.setTimeInMillis(getQuarterStart(ts));
return getQuarterEndWithTimeZone(gmt, ts);
}

public static long getQuarterEndWithTimeZone(TimeZone timeZone, long ts) {
Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
calendar.setTimeInMillis(getQuarterStartWithTimeZone(timeZone, ts));
calendar.add(Calendar.MONTH, 3);
return calendar.getTimeInMillis();
}

public static long getYearEnd(long ts) {
Calendar calendar = Calendar.getInstance(gmt, Locale.ROOT);
calendar.setTimeInMillis(getYearStart(ts));
return getYearEndWithTimeZone(gmt, ts);
}

public static long getYearEndWithTimeZone(TimeZone timeZone, long ts) {
Calendar calendar = Calendar.getInstance(timeZone, Locale.ROOT);
calendar.setTimeInMillis(getYearStartWithTimeZone(timeZone, ts));
calendar.add(Calendar.YEAR, 1);
return calendar.getTimeInMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,109 @@ public void basicTest() throws ParseException {
Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t2));

long t3 = dateFormat.parse("2012/12/31 11:02:01").getTime();
Assert.assertEquals(dateFormat.parse("2012/12/31 00:00:00").getTime(), TimeUtil.getDayStart(t3));
Assert.assertEquals(dateFormat.parse("2012/12/30 00:00:00").getTime(), TimeUtil.getWeekStart(t3));
Assert.assertEquals(dateFormat.parse("2012/12/1 00:00:00").getTime(), TimeUtil.getMonthStart(t3));
Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t3));
Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t3));
Assert.assertEquals(dateFormat.parse("2012/12/30 00:00:00").getTime(), TimeUtil.getWeekStart(t3));
Assert.assertEquals(dateFormat.parse("2013/1/6 00:00:00").getTime(), TimeUtil.getWeekEnd(t3));
Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getMonthEnd(t3));
Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getQuarterEnd(t3));
Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getYearEnd(t3));


long t4 = dateFormat.parse("2015/01/01 10:01:30").getTime();
Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getMonthStart(t4));
Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t4));
Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t4));
Assert.assertEquals(dateFormat.parse("2014/12/28 00:00:00").getTime(), TimeUtil.getWeekStart(t4));
long t4 = dateFormat.parse("2012/10/29 11:02:01").getTime();
Assert.assertEquals(dateFormat.parse("2012/10/29 00:00:00").getTime(), TimeUtil.getDayStart(t4));
Assert.assertEquals(dateFormat.parse("2012/10/28 00:00:00").getTime(), TimeUtil.getWeekStart(t4));
Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getMonthStart(t4));
Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t4));
Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t4));
Assert.assertEquals(dateFormat.parse("2012/11/4 00:00:00").getTime(), TimeUtil.getWeekEnd(t4));
Assert.assertEquals(dateFormat.parse("2012/11/1 00:00:00").getTime(), TimeUtil.getMonthEnd(t4));
Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getQuarterEnd(t4));
Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getYearEnd(t4));

long t5 = dateFormat.parse("2012/8/29 23:12:41").getTime();
Assert.assertEquals(dateFormat.parse("2012/8/29 00:00:00").getTime(), TimeUtil.getDayStart(t5));
Assert.assertEquals(dateFormat.parse("2012/8/26 00:00:00").getTime(), TimeUtil.getWeekStart(t5));
Assert.assertEquals(dateFormat.parse("2012/8/1 00:00:00").getTime(), TimeUtil.getMonthStart(t5));
Assert.assertEquals(dateFormat.parse("2012/7/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t5));
Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t5));
Assert.assertEquals(dateFormat.parse("2012/9/2 00:00:00").getTime(), TimeUtil.getWeekEnd(t5));
Assert.assertEquals(dateFormat.parse("2012/9/1 00:00:00").getTime(), TimeUtil.getMonthEnd(t5));
Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterEnd(t5));
Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getYearEnd(t5));

long t6 = dateFormat.parse("2015/01/01 10:01:30").getTime();
Assert.assertEquals(dateFormat.parse("2015/01/01 00:00:00").getTime(), TimeUtil.getDayStart(t6));
Assert.assertEquals(dateFormat.parse("2014/12/28 00:00:00").getTime(), TimeUtil.getWeekStart(t6));
Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getMonthStart(t6));
Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t6));
Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t6));
Assert.assertEquals(dateFormat.parse("2015/1/4 00:00:00").getTime(), TimeUtil.getWeekEnd(t6));
Assert.assertEquals(dateFormat.parse("2015/2/1 00:00:00").getTime(), TimeUtil.getMonthEnd(t6));
Assert.assertEquals(dateFormat.parse("2015/4/1 00:00:00").getTime(), TimeUtil.getQuarterEnd(t6));
Assert.assertEquals(dateFormat.parse("2016/1/1 00:00:00").getTime(), TimeUtil.getYearEnd(t6));
}


@Test
public void basicTestWithTimeZone() throws ParseException {
java.text.DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss", Locale.ROOT);
TimeZone timeZone = TimeZone.getTimeZone("GMT+8");
dateFormat.setTimeZone(timeZone);

long t1 = dateFormat.parse("2012/01/01 00:00:01").getTime();
Assert.assertEquals(normalizeTime(t1, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t1));
Assert.assertEquals(normalizeTime(t1, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t1));

long t2 = dateFormat.parse("2012/12/31 11:02:01").getTime();
Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t2));
Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t2));

long t3 = dateFormat.parse("2012/12/31 11:02:01").getTime();
Assert.assertEquals(dateFormat.parse("2012/12/31 00:00:00").getTime(), TimeUtil.getDayStartWithTimeZone(timeZone, t3));
Assert.assertEquals(dateFormat.parse("2012/12/30 00:00:00").getTime(), TimeUtil.getWeekStartWithTimeZone(timeZone, t3));
Assert.assertEquals(dateFormat.parse("2012/12/1 00:00:00").getTime(), TimeUtil.getMonthStartWithTimeZone(timeZone, t3));
Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterStartWithTimeZone(timeZone, t3));
Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStartWithTimeZone(timeZone, t3));
Assert.assertEquals(dateFormat.parse("2013/1/6 00:00:00").getTime(), TimeUtil.getWeekEndWithTimeZone(timeZone, t3));
Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getMonthEndWithTimeZone(timeZone, t3));
Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getQuarterEndWithTimeZone(timeZone, t3));
Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getYearEndWithTimeZone(timeZone, t3));


long t4 = dateFormat.parse("2012/10/29 11:02:01").getTime();
Assert.assertEquals(dateFormat.parse("2012/10/29 00:00:00").getTime(), TimeUtil.getDayStartWithTimeZone(timeZone, t4));
Assert.assertEquals(dateFormat.parse("2012/10/28 00:00:00").getTime(), TimeUtil.getWeekStartWithTimeZone(timeZone, t4));
Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getMonthStartWithTimeZone(timeZone, t4));
Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterStartWithTimeZone(timeZone, t4));
Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStartWithTimeZone(timeZone, t4));
Assert.assertEquals(dateFormat.parse("2012/11/4 00:00:00").getTime(), TimeUtil.getWeekEndWithTimeZone(timeZone, t4));
Assert.assertEquals(dateFormat.parse("2012/11/1 00:00:00").getTime(), TimeUtil.getMonthEndWithTimeZone(timeZone, t4));
Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getQuarterEndWithTimeZone(timeZone, t4));
Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getYearEndWithTimeZone(timeZone, t4));

long t5 = dateFormat.parse("2012/8/29 23:12:41").getTime();
Assert.assertEquals(dateFormat.parse("2012/8/29 00:00:00").getTime(), TimeUtil.getDayStartWithTimeZone(timeZone, t5));
Assert.assertEquals(dateFormat.parse("2012/8/26 00:00:00").getTime(), TimeUtil.getWeekStartWithTimeZone(timeZone, t5));
Assert.assertEquals(dateFormat.parse("2012/8/1 00:00:00").getTime(), TimeUtil.getMonthStartWithTimeZone(timeZone, t5));
Assert.assertEquals(dateFormat.parse("2012/7/1 00:00:00").getTime(), TimeUtil.getQuarterStartWithTimeZone(timeZone, t5));
Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStartWithTimeZone(timeZone, t5));
Assert.assertEquals(dateFormat.parse("2012/9/2 00:00:00").getTime(), TimeUtil.getWeekEndWithTimeZone(timeZone, t5));
Assert.assertEquals(dateFormat.parse("2012/9/1 00:00:00").getTime(), TimeUtil.getMonthEndWithTimeZone(timeZone, t5));
Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterEndWithTimeZone(timeZone, t5));
Assert.assertEquals(dateFormat.parse("2013/1/1 00:00:00").getTime(), TimeUtil.getYearEndWithTimeZone(timeZone, t5));

long t6 = dateFormat.parse("2015/01/01 10:01:30").getTime();
Assert.assertEquals(dateFormat.parse("2015/01/01 00:00:00").getTime(), TimeUtil.getDayStartWithTimeZone(timeZone, t6));
Assert.assertEquals(dateFormat.parse("2014/12/28 00:00:00").getTime(), TimeUtil.getWeekStartWithTimeZone(timeZone, t6));
Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getMonthStartWithTimeZone(timeZone, t6));
Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getQuarterStartWithTimeZone(timeZone, t6));
Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getYearStartWithTimeZone(timeZone, t6));
Assert.assertEquals(dateFormat.parse("2015/1/4 00:00:00").getTime(), TimeUtil.getWeekEndWithTimeZone(timeZone, t6));
Assert.assertEquals(dateFormat.parse("2015/2/1 00:00:00").getTime(), TimeUtil.getMonthEndWithTimeZone(timeZone, t6));
Assert.assertEquals(dateFormat.parse("2015/4/1 00:00:00").getTime(), TimeUtil.getQuarterEndWithTimeZone(timeZone, t6));
Assert.assertEquals(dateFormat.parse("2016/1/1 00:00:00").getTime(), TimeUtil.getYearEndWithTimeZone(timeZone, t6));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import java.nio.ByteBuffer;
import java.util.TimeZone;

import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.StreamingMessageRow;
import org.apache.kylin.common.util.TimeUtil;
Expand Down Expand Up @@ -108,6 +111,8 @@ public static Map<String, String> parseProperties(String propertiesStr) {
* @return true if the columnName is a derived time column; otherwise false;
*/
public static final boolean populateDerivedTimeColumns(String columnName, List<String> result, long t) {
String timeZoneStr = KylinConfig.getInstanceFromEnv().getTimeZone();
TimeZone timeZone = TimeZone.getTimeZone(timeZoneStr);

Integer derivedTimeColumn = derivedTimeColumns.get(columnName);
if (derivedTimeColumn == null) {
Expand All @@ -118,31 +123,31 @@ public static final boolean populateDerivedTimeColumns(String columnName, List<S
switch (derivedTimeColumn) {
case 1:
normalized = TimeUtil.getMinuteStart(t);
result.add(DateFormat.formatToTimeWithoutMilliStr(normalized));
result.add(DateFormat.formatToTimeStrWithTimeZone(timeZone, normalized));
break;
case 2:
normalized = TimeUtil.getHourStart(t);
result.add(DateFormat.formatToTimeWithoutMilliStr(normalized));
result.add(DateFormat.formatToTimeStrWithTimeZone(timeZone, normalized));
break;
case 3:
normalized = TimeUtil.getDayStart(t);
result.add(DateFormat.formatToDateStr(normalized));
normalized = TimeUtil.getDayStartWithTimeZone(timeZone, t);
result.add(DateFormat.formatToDateStrWithTimeZone(timeZone, normalized));
break;
case 4:
normalized = TimeUtil.getWeekStart(t);
result.add(DateFormat.formatToDateStr(normalized));
normalized = TimeUtil.getWeekStartWithTimeZone(timeZone, t);
result.add(DateFormat.formatToDateStrWithTimeZone(timeZone, normalized));
break;
case 5:
normalized = TimeUtil.getMonthStart(t);
result.add(DateFormat.formatToDateStr(normalized));
normalized = TimeUtil.getMonthStartWithTimeZone(timeZone, t);
result.add(DateFormat.formatToDateStrWithTimeZone(timeZone, normalized));
break;
case 6:
normalized = TimeUtil.getQuarterStart(t);
result.add(DateFormat.formatToDateStr(normalized));
normalized = TimeUtil.getQuarterStartWithTimeZone(timeZone, t);
result.add(DateFormat.formatToDateStrWithTimeZone(timeZone, normalized));
break;
case 7:
normalized = TimeUtil.getYearStart(t);
result.add(DateFormat.formatToDateStr(normalized));
normalized = TimeUtil.getYearStartWithTimeZone(timeZone, t);
result.add(DateFormat.formatToDateStrWithTimeZone(timeZone, normalized));
break;
default:
throw new IllegalStateException();
Expand Down

0 comments on commit e13f101

Please sign in to comment.