Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-2919] Support ingest from Kafka in StreamSQL #2695

Closed
wants to merge 2 commits into from

Conversation

jackylk
Copy link
Contributor

@jackylk jackylk commented Sep 5, 2018

In this PR, following is changed:

  1. Support Kafka as streaming source table in StreamSQL
  2. Delete the lock file when DROP STREAM is triggered
  3. Fix the streaming segment data size shown in SHOW SEGMENTS
  • Any interfaces changed?
    NA

  • Any backward compatibility impacted?
    In SHOW SEGMENT command, for Load Start Time and End Time, the output data type in RDD is changed from timestamp to string.

  • Document update required?
    NA

  • Testing done
    Please provide details on
    - Whether new unit test cases have been added or why no new tests are required?
    - How it is tested? Please attach test report.
    - Is it a performance related change? Please attach the performance test report.
    - Any additional information to help reviewers in testing this change.
    rerun all test

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
    NA

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/100/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/268/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8338/

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
<version>2.2.1</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use spark.version property

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, fixed

System.out.println("streaming finished")
}

def showTableCount(spark: SparkSession, tableName: String): Thread = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove unused code

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/111/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/279/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8349/

*/
public String getFormat() {
return getTableInfo().getFactTable().getTableProperties()
.get("format");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems this can be moved to the previous line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

validateSinkTable(streamDf.schema, sinkTable)

// kafka source always have fixed schema, need to get actual schema
val isKafka = Option(sourceTable.getFormat).exists(_ == "kafka")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for string equality, better to use equals

CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType))
}
if (!querySchema.equals(StructType(fields))) {
throw new MalformedCarbonCommandException(s"Schema of table ${ sink.getTableName } " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can move the first half to the next line so that we can avoid string concatenation here.
The same with line#58

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

val streamReader = sparkSession.readStream
.schema(getSparkSchema(sourceTable))
.format(format)
val streamReader = if (format != "kafka") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use 'equals' instead of '='

@@ -316,6 +316,10 @@ private void appendBlockletToDataFile() throws IOException {
}

public BlockletMinMaxIndex getBatchMinMaxIndex() {
if (output == null) {
return StreamSegment.mergeBlockletMinMax(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for multiple lines here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -217,6 +217,12 @@
<version>${spark.version}</version>
<scope>${spark.deps.scope}</scope>
</dependency>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it needed for whole project scope? I think it will only be used in streaming related modules

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@CarbonDataQA
Copy link

Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/162/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/330/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8401/

@chenliang613
Copy link
Contributor

retest this please

@CarbonDataQA
Copy link

Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/164/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/332/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8403/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8435/

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/197/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/365/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/371/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8441/

@CarbonDataQA
Copy link

Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/203/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8442/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/372/

@jackylk
Copy link
Contributor Author

jackylk commented Sep 11, 2018

retest this please

@CarbonDataQA
Copy link

Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/214/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/383/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8453/

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/238/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/407/

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/241/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8480/

@CarbonDataQA
Copy link

Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/410/

@brijoobopanna
Copy link
Contributor

retest this please

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/252/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8491/

@CarbonDataQA
Copy link

Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/421/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8492/

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/253/

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/256/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8495/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/422/

@CarbonDataQA
Copy link

Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/425/

fix compile

fix comment

fix test

fix spark2.3
@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/263/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8502/

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/267/

@CarbonDataQA
Copy link

Build Failed with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/432/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8506/

@CarbonDataQA
Copy link

Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/436/

@jackylk
Copy link
Contributor Author

jackylk commented Sep 13, 2018

retest this please

@CarbonDataQA
Copy link

Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.1/268/

@CarbonDataQA
Copy link

Build Failed with Spark 2.3.1, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder2.3/8507/

@CarbonDataQA
Copy link

Build Success with Spark 2.2.1, Please check CI http://95.216.28.178:8080/job/ApacheCarbonPRBuilder1/437/

@asfgit asfgit closed this in 560bfbe Sep 13, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants