-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before asking
- I had searched in the issues and found no similar issues.
Version
4.0.1
What's Wrong?
Data was lost during the data parsing process:
Error msg:
During Doris's analysis, the specified fields should normally have NULL values if they do not exist. However, the log shows otherwise - the fields are missing instead.
Flink SQL code :
CREATE TABLE Kafka_Table ( idBIGINT NOT NULL COMMENT '自增id',_app_user_idSTRING COMMENT '应用中用户ID',analysis_dateDATE COMMENT '日期',_event_nameSTRING COMMENT '预',_app_idSTRING COMMENT '',_package_nameSTRING COMMENT 'An',_app_channel_idSTRING COMMENT '包)',_app_versionSTRING COMMENT 'A)',_app_game_versionSTRING COMMENT '应 ',_app_res_versionSTRING COMMENT '应',_app_install_idSTRING COMMENT '应',_app_activate_idSTRING COMMENT '应',_device_idSTRING COMMENT '应',_ad_idSTRING COMMENT 'a',_androididSTRING COMMENT 'D',_idfvSTRING COMMENT 'fv',_os_typeSTRING COMMENT '操',_os_versionSTRING COMMENT '',_app_langSTRING COMMENT '言',_os_timezone_offsetString COMMENT '区',_os_timestampBIGINT COMMENT ')',_device_network_typeINT COMMENT '事',stimeTIMESTAMP COMMENT '间',_user_ipSTRING COMMENT '址',_country_codeSTRING COMMENT 'd',analysis_hourSTRING COMMENT '',_sub_event_nameSTRING COMMENT '',properties` STRING
)
WITH
(
'connector' = 'kafka',
'topic' = 'rs_m_custom_event',
'properties.bootstrap.servers' = 'alikafka-pre-public-intl-sg-x1e48g70301-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-public-intl-sg-x1e48g70301-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-public-intl-sg-x1e48g70301-3-vpc.alikafka.aliyuncs.com:9092',
'properties.group.id' = 'flinkcdccustom',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE TABLE doris_to (
id BIGINT NOT NULL COMMENT '自增id',
app_user_id STRING COMMENT '应用中用户ID',
analysis_date DATE COMMENT '日期',
event_name STRING COMMENT '预称',
app_id STRING COMMENT 'Ap ',
package_name STRING COMMENT 'AleID',
app_channel_id STRING COMMENT ')',
app_version STRING COMMENT ')',
app_game_version STRING COMMENT '号 ',
app_res_version STRING COMMENT '号',
app_install_id STRING COMMENT '',
app_activate_id STRING COMMENT '',
device_id STRING COMMENT '一ID',
ad_id STRING COMMENT 'a',
androidid STRING COMMENT '',
idfv STRING COMMENT '',
os_type STRING COMMENT '',
os_version STRING COMMENT '',
app_lang STRING COMMENT '',
os_timezone_offset String COMMENT '',
os_timestamp BIGINT COMMENT '',
device_network_type INT COMMENT '',
stime TIMESTAMP COMMENT '',
user_ip STRING COMMENT '',
country_id STRING COMMENT ,
analysis_hour STRING COMMENT ,
sub_event_name STRING COMMENT ,
properties STRING,
PRIMARY KEY (id, app_user_id, analysis_date) NOT ENFORCED
)
WITH
(
'connector' = 'doris',
'table.identifier' = '*****',
${flinkdoris} 'sink.label-prefix' = 'kafka2doris_label'
);
-- 3. 写入
INSERT INTO
doris_to (
id,
app_user_id,
analysis_date,
event_name,
app_id,
package_name,
app_channel_id,
app_version,
app_game_version,
app_res_version,
app_install_id,
app_activate_id,
device_id,
ad_id,
androidid,
idfv,
os_type,
os_version,
app_lang,
os_timezone_offset,
os_timestamp,
device_network_type,
stime,
user_ip,
country_id,
analysis_hour,
sub_event_name,
properties
)
SELECT
id,
_app_user_id,
analysis_date,
_event_name,
_app_id,
_package_name,
_app_channel_id,
_app_version,
_app_game_version,
_app_res_version,
_app_install_id,
_app_activate_id,
_device_id,
_ad_id,
_androidid,
_idfv,
_os_type,
_os_version,
_app_lang,
_os_timezone_offset,
_os_timestamp,
_device_network_type,
stime,
_user_ip,
_country_code,
analysis_hour,
_sub_event_name,
properties
FROM
Kafka_Table
where
_event_name = '****';`
What You Expected?
fix the bug thx
How to Reproduce?
No response
Anything Else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct