Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

插件体系构建(包括UDF, UDAF体系) #19

Closed
garyelephant opened this issue Aug 27, 2017 · 1 comment
Closed

插件体系构建(包括UDF, UDAF体系) #19

garyelephant opened this issue Aug 27, 2017 · 1 comment
Milestone

Comments

@garyelephant
Copy link
Contributor

garyelephant commented Aug 27, 2017

插件体系和具体的实现可以参考

Cloudera Morphlines
Embulk
Fluent
Hangout
Logstash
Kinesis
Spark UDF
Hive UDF


Filter插件需求:
Distinct,sample


用户需求:
repartition 增加或减少并行度,输出文件个数。

@garyelephant garyelephant added this to the M1 milestone Aug 27, 2017
@garyelephant garyelephant changed the title 插件体系构建(包括UDF, UDAF体系),插件的开发与测试 插件体系构建(包括UDF, UDAF体系) Aug 27, 2017
@garyelephant
Copy link
Contributor Author

garyelephant commented Aug 30, 2017

数据流程举例:

 从kafka input 获取到1条数据 "a b c"
 String RDD 转换为DataFrame, Row的结构如下:
 {
    "raw_message": "a b c"
 }

 再经过split filter:
 {
    "raw_message": "a b c"
    "c1": "a",
    "c2": "b",
    "c3": "c"
 }
 之后再通过Kafka output输出即可。

Filter 操作DataFrame的需求整理如下:

 Q: how to insert row to DataFrame ? (filter plugin: clone)
 A: https://docs.databricks.com/spark/latest/faq/append-a-row-to-rdd-or-dataframe.html

 Q: how to delete row from DataFrame ? (filter plugin: drop)
 A: https://stackoverflow.com/questions/43515193/how-to-delete-rows-in-a-table-created-from-a-spark-dataframe

 Q: how to add specific column from DataFrame ? (filter plugin: date)
 A: 用withColumn() + udf的组合

 Q: how to delete specific column from DataFrame ? (filter plugin: drop_field)
 A: example: df.drop(df.col("raw.hourOfWeek"))

 Q: how to update specific column from DataFrame ? (filter plugin: )
 A: 还是用withColumn

 Q: how to added multiple columns at once ? (filter plugin: grok, kv, split)
 A: 有2种方式:(1)val newDf = dataframe.flatMap(...), 参考:
 https://community.hortonworks.com/comments/73622/view.html
 (2) 注册UDF, 这个UDF返回的类型是Struct,相当于withColumn("name", udf), 新增了一个嵌套结构的column,
 这个column中包含了多个字段。参考:
 https://stackoverflow.com/questions/32196207/derive-multiple-columns-from-a-single-column-in-a-spark-dataframe
 https://community.hortonworks.com/comments/73622/view.html
 另外见 #30 

 Q: how to added column enriched from dict information ? (filter plugin: dict, geoip)
 A: 这个得看字典信息的数据量,如果小,可以直接用broadcast variable + udf 实现;如果大,最好是将字典信息注册为DataFrame,
 之后与数据DataFrame 进行join操作(Spark应该会根据字典信息的DataFrame自动作join优化,所以不一定需要区分字典信息的数据量),
 字典信息仍然需要用broadcast variable 事先创建,避免每个batch重建。
 other references:
 https://stackoverflow.com/questions/31816975/how-to-pass-whole-row-to-udf-spark-dataframe-filter

RickyHuo added a commit that referenced this issue Sep 2, 2017
ic4y pushed a commit to ic4y/incubator-seatunnel that referenced this issue May 22, 2023
[Sync] Business dev merge to 2.4.4-dev
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant