Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http-x插件问题(已经解决,代码合并到了master分支了) #1497

Closed
2 of 4 tasks
taoyameng opened this issue Feb 9, 2023 · 6 comments
Closed
2 of 4 tasks
Labels
question Further information is requested

Comments

@taoyameng
Copy link
Contributor

Search before asking

  • I had searched in the issues and found no similar question.

  • I had googled my question but i didn't get any help.

  • I had read the documentation: ChunJun doc but it didn't help me.

Description

根据chunjun文档里的http插件示例,使用sql模式运行,出错了,调试了半天暂时没有找到原因,下面是sql脚本和http接口的返回值信息

CREATE TABLE source
(
name VARCHAR,
age int,
sex varchar,
hobby varchar
)
WITH (
'connector' = 'http-x',
'url' = 'http://192.168.14.236:8090/test/test',
'intervalTime' = '3000',
'method' = 'get',
'decode' = 'json',
'dataSubject' = '${data}',
'fields' = 'name,age,sex,hobby',
'column' = '[
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "sex",
"type": "string"
},
{
"name": "hobby",
"type": "string"
}
]'
);
CREATE TABLE sink
(
name VARCHAR,
age int,
sex varchar,
hobby varchar
) WITH (
'connector' = 'stream-x'
);
INSERT INTO sink
SELECT *
FROM source u;

下面是http接口的返回值
{
"code":"0000",
"msg":"请求成功",
"data":[
{
"sex":"男",
"name":"臭桑",
"age":24,
"hobby":"打游戏"
}
],
"timestamp":"2023-02-09 13:28:39",
"total":0
}

最后是报错信息
image
其中 converter.toInternal(data) 这个方法接收的是string类型的参数,但是传递的是map

Code of Conduct

@taoyameng taoyameng added the question Further information is requested label Feb 9, 2023
@taoyameng
Copy link
Contributor Author

taoyameng commented Feb 9, 2023 via email

@kinoxyz1
Copy link
Member

kinoxyz1 commented Feb 9, 2023

因为发到群里的sample有dataSubject,但是master分支上的http-x插件无法在with参数中配置dataSubject,所以我在HttpOptions和HttpDynamicTableFactory中添加了dataSubject
……
------------------ 原始邮件 ------------------ 发件人: "DTStack/chunjun" @.>; 发送时间: 2023年2月9日(星期四) 下午3:06 @.>; @.@.>; 主题: Re: [DTStack/chunjun] http-x插件问题 (Issue #1497) If you are the latest code of the master branch, there seems to be some problems in sql mode. Some parameters are missing in sql mode, such as 'dataSubject' = '${data}', which need to be completed in HttpDynamicTableFactory.class. After completion, the debug is as follows: If you have time, you can start to try to complete the code, if you don’t have time, I will mention pr to fix it。 — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

image

image

The parameter types are inconsistent, the parameter is passed as a Map, and the method receives a String, you can try to modify and submit the pr

@taoyameng
Copy link
Contributor Author

taoyameng commented Feb 9, 2023 via email

@taoyameng
Copy link
Contributor Author

这个问题现在已经基本解决了,解决方式如下:
在HttpOptions类下加上如下配置
加完配置后就可以在sql脚本里面添加相应的with参数了

public static final ConfigOption<String> DATA_SUBJECT =
        ConfigOptions.key("dataSubject")
                .stringType()
                .defaultValue("${data}")
                .withDescription("数据的主体");

public static final ConfigOption<Long> CYCLES =
        ConfigOptions.key("cycles")
                .longType()
                .defaultValue(1L)
                .withDescription("循环请求http接口的次数,默认是-1,会无限制次数请求的");

然后在HttpDynamicTableFactory类加上上面的配置信息

@Override
public Set<ConfigOption<?>> optionalOptions() {
    Set<ConfigOption<?>> options = new HashSet<>();
    options.add(HttpOptions.DECODE);
    options.add(HttpOptions.METHOD);
    options.add(HttpOptions.HEADER);
    options.add(HttpOptions.BODY);
    options.add(HttpOptions.PARAMS);
    options.add(HttpOptions.INTERVALTIME);
    options.add(HttpOptions.COLUMN);
    options.add(HttpOptions.DELAY);
    // 下面两个是新加的
    options.add(HttpOptions.DATA_SUBJECT);
    options.add(HttpOptions.CYCLES);
    return options;
}

private HttpRestConfig getRestapiConf(ReadableConfig config) {
    Gson gson = GsonUtil.setTypeAdapter(new Gson());
    HttpRestConfig httpRestConfig = new HttpRestConfig();
    httpRestConfig.setIntervalTime(config.get(HttpOptions.INTERVALTIME));
    httpRestConfig.setUrl(config.get(HttpOptions.URL));
    httpRestConfig.setDecode(config.get(HttpOptions.DECODE));
    httpRestConfig.setRequestMode(config.get(HttpOptions.METHOD));
    // 下面两个是新加的
    httpRestConfig.setDataSubject(config.get(HttpOptions.DATA_SUBJECT));
    httpRestConfig.setCycles(config.get(HttpOptions.CYCLES));
    httpRestConfig.setParam(
            gson.fromJson(
                    config.get(HttpOptions.PARAMS),
                    new TypeToken<List<MetaParam>>() {
                    }.getType()));
    httpRestConfig.setHeader(
            gson.fromJson(
                    config.get(HttpOptions.HEADER),
                    new TypeToken<List<MetaParam>>() {
                    }.getType()));
    httpRestConfig.setBody(
            gson.fromJson(
                    config.get(HttpOptions.BODY),
                    new TypeToken<List<MetaParam>>() {
                    }.getType()));
    httpRestConfig.setColumn(
            gson.fromJson(
                    config.get(HttpOptions.COLUMN),
                    new TypeToken<List<FieldConf>>() {
                    }.getType()));
    return httpRestConfig;
}

最后也是最关键的就是修改HttpRowConverter类的toInternal方法和类的泛型
在JsonResponseParse类的next()方法调用toInternal的时候,传递的就是map类型参数,而HttpRowConverter类
原来类的泛型是String,toInternal方法参数类型相应的也是String,下面需要将泛型修改为Map<String,Object> 即可,同时toInternal方法的参数类型就是Map<String,Object>了,然后我们删除下面两行即可

    Map<String, Object> result =
    DefaultRestHandler.gson.fromJson(input, GsonUtil.gsonMapTypeToken);

完整的代码如下

@Override
public RowData toInternal(Map<String, Object> result) throws Exception {
    GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
    List<String> columns = rowType.getFieldNames();
    for (int pos = 0; pos < columns.size(); pos++) {
        Object value =
                MapUtil.getValueByKey(
                        result, columns.get(pos), httpRestConfig.getFieldDelimiter());
        if (value instanceof LinkedTreeMap) {
            value = value.toString();
        }
        genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(value));
    }
    return genericRowData;
}

@kinoxyz1
Copy link
Member

这个问题现在已经基本解决了,解决方式如下: 在HttpOptions类下加上如下配置 加完配置后就可以在sql脚本里面添加相应的with参数了

public static final ConfigOption<String> DATA_SUBJECT =
        ConfigOptions.key("dataSubject")
                .stringType()
                .defaultValue("${data}")
                .withDescription("数据的主体");

public static final ConfigOption<Long> CYCLES =
        ConfigOptions.key("cycles")
                .longType()
                .defaultValue(1L)
                .withDescription("循环请求http接口的次数,默认是-1,会无限制次数请求的");

然后在HttpDynamicTableFactory类加上上面的配置信息

@Override
public Set<ConfigOption<?>> optionalOptions() {
    Set<ConfigOption<?>> options = new HashSet<>();
    options.add(HttpOptions.DECODE);
    options.add(HttpOptions.METHOD);
    options.add(HttpOptions.HEADER);
    options.add(HttpOptions.BODY);
    options.add(HttpOptions.PARAMS);
    options.add(HttpOptions.INTERVALTIME);
    options.add(HttpOptions.COLUMN);
    options.add(HttpOptions.DELAY);
    // 下面两个是新加的
    options.add(HttpOptions.DATA_SUBJECT);
    options.add(HttpOptions.CYCLES);
    return options;
}

private HttpRestConfig getRestapiConf(ReadableConfig config) {
    Gson gson = GsonUtil.setTypeAdapter(new Gson());
    HttpRestConfig httpRestConfig = new HttpRestConfig();
    httpRestConfig.setIntervalTime(config.get(HttpOptions.INTERVALTIME));
    httpRestConfig.setUrl(config.get(HttpOptions.URL));
    httpRestConfig.setDecode(config.get(HttpOptions.DECODE));
    httpRestConfig.setRequestMode(config.get(HttpOptions.METHOD));
    // 下面两个是新加的
    httpRestConfig.setDataSubject(config.get(HttpOptions.DATA_SUBJECT));
    httpRestConfig.setCycles(config.get(HttpOptions.CYCLES));
    httpRestConfig.setParam(
            gson.fromJson(
                    config.get(HttpOptions.PARAMS),
                    new TypeToken<List<MetaParam>>() {
                    }.getType()));
    httpRestConfig.setHeader(
            gson.fromJson(
                    config.get(HttpOptions.HEADER),
                    new TypeToken<List<MetaParam>>() {
                    }.getType()));
    httpRestConfig.setBody(
            gson.fromJson(
                    config.get(HttpOptions.BODY),
                    new TypeToken<List<MetaParam>>() {
                    }.getType()));
    httpRestConfig.setColumn(
            gson.fromJson(
                    config.get(HttpOptions.COLUMN),
                    new TypeToken<List<FieldConf>>() {
                    }.getType()));
    return httpRestConfig;
}

最后也是最关键的就是修改HttpRowConverter类的toInternal方法和类的泛型 在JsonResponseParse类的next()方法调用toInternal的时候,传递的就是map类型参数,而HttpRowConverter类 原来类的泛型是String,toInternal方法参数类型相应的也是String,下面需要将泛型修改为Map<String,Object> 即可,同时toInternal方法的参数类型就是Map<String,Object>了,然后我们删除下面两行即可

    Map<String, Object> result =
    DefaultRestHandler.gson.fromJson(input, GsonUtil.gsonMapTypeToken);

完整的代码如下

@Override
public RowData toInternal(Map<String, Object> result) throws Exception {
    GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
    List<String> columns = rowType.getFieldNames();
    for (int pos = 0; pos < columns.size(); pos++) {
        Object value =
                MapUtil.getValueByKey(
                        result, columns.get(pos), httpRestConfig.getFieldDelimiter());
        if (value instanceof LinkedTreeMap) {
            value = value.toString();
        }
        genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(value));
    }
    return genericRowData;
}

can you create a pull request? chunjun doc(开发者指南@如何提交一个优秀的PR)

taoyameng pushed a commit to taoyameng/chunjun that referenced this issue Feb 11, 2023
FlechazoW pushed a commit that referenced this issue Feb 13, 2023
…lug-in cannot be used in sql mode (#1500)

Co-authored-by: 443321070@qq.com <taotao0226.?>
@taoyameng taoyameng changed the title http-x插件问题 http-x插件问题(已经解决,代码合并到了master分支了) Feb 13, 2023
@FlechazoW
Copy link
Member

Merged pr. Closed as #1500

ll076110 pushed a commit that referenced this issue Mar 3, 2023
…lug-in cannot be used in sql mode (#1500)

Co-authored-by: 443321070@qq.com <taotao0226.?>
(cherry picked from commit d65793b)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants