Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Expect job status failed in spark batch model #6679

Closed
KnightChess opened this issue Sep 15, 2022 · 6 comments
Closed

[SUPPORT] Expect job status failed in spark batch model #6679

KnightChess opened this issue Sep 15, 2022 · 6 comments
Labels
on-call-triaged priority:major degraded perf; unable to move forward; potential bugs spark Issues related to spark writer-core Issues relating to core transactions/write actions

Comments

@KnightChess
Copy link
Contributor

KnightChess commented Sep 15, 2022

Describe the problem you faced

@Override
  public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
    ...
          fileWriter.writeAvro(record.getRecordKey(),
              rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));
        } else {
          fileWriter.writeAvroWithMetadata(record.getKey(), rewriteRecord((GenericRecord) avroRecord.get()));
        }
        ...
    } catch (Throwable t) {
      // Not throwing exception from here, since we don't want to fail the entire job
      // for a single record
      writeStatus.markFailure(record, t, recordMetadata);
      LOG.error("Error writing record " + record, t);
    }
  }
private def commitAndPerformPostOperations(spark: SparkSession,
                                             ...
                                            ): (Boolean, common.util.Option[java.lang.String], common.util.Option[java.lang.String]) = {
    if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() == 0) {
      log.info("Proceeding to commit the write.")
      ...
      (commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant)
    } else {
      log.error(s"${tableInstantInfo.operation} failed with errors")
      if (log.isTraceEnabled) {
        log.trace("Printing out the top 100 errors")
        writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors)
          .take(100)
          .foreach(ws => {
            log.trace("Global error :", ws.getGlobalError)
            if (ws.getErrors.size() > 0) {
              ws.getErrors.foreach(kt =>
                log.trace(s"Error for key: ${kt._1}", kt._2))
            }
          })
      }
      (false, common.util.Option.empty(), common.util.Option.empty())
    }
  }

I have a SchemaCompatibilityException in method rewriteRecord, but it had be catch, only print log in driver: ERROR HoodieSparkSqlWriter$: UPSERT failed with errors, and the spark job status is SUCCEEDED

some question need help:

  • Is it for streaming scenes? If throw exceptiion how to restore it?
  • in batch model, will use job status as trigger condition often. how to resolve this scene in current version, thanks
@KnightChess
Copy link
Contributor Author

@vinothchandar @xushiyan @yihua Could you help answer this question?

@yihua yihua added spark Issues related to spark writer-core Issues relating to core transactions/write actions priority:major degraded perf; unable to move forward; potential bugs labels Sep 21, 2022
@yihua yihua added this to Awaiting Triage in GI Tracker Board via automation Sep 21, 2022
@nsivabalan
Copy link
Contributor

which version of hudi are you using?
We have a config https://hudi.apache.org/docs/configurations/#hoodiedatasourcewritestreamingignorefailedbatch: when set to true, will ignore failures and will proceed to next batch.
Default value for this was true until 0.12.0.

Can you set the value of this config to false and let us know if you are still facing issues.

@nsivabalan nsivabalan moved this from Awaiting Triage to Awaiting Ack Triaged in GI Tracker Board Oct 23, 2022
@KnightChess
Copy link
Contributor Author

@nsivabalan thanks reply, I use 0.11.0 version. But we are batch job, not streaming job. Follow the config which you advice in code, I found the execption processing logic in streaming model, I will try to refer it to implement my logic in batch job. thanks

      Try(
        HoodieSparkSqlWriter.write(
          sqlContext, mode, updatedOptions, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering))
      ) match {
        case Success((true, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
          log.info(s"Micro batch id=$batchId succeeded"
            + (commitOps.isPresent match {
            case true => s" for commit=${commitOps.get()}"
            case _ => s" with no new commits"
          }))
          writeClient = Some(client)
          hoodieTableConfig = Some(tableConfig)
          if (compactionInstantOps.isPresent) {
            asyncCompactorService.enqueuePendingAsyncServiceInstant(
              new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
          }
          if (clusteringInstant.isPresent) {
            asyncClusteringService.enqueuePendingAsyncServiceInstant(new HoodieInstant(
              State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringInstant.get()
            ))
          }
          Success((true, commitOps, compactionInstantOps))
        case Failure(e) =>
          // clean up persist rdds in the write process
          data.sparkSession.sparkContext.getPersistentRDDs
            .foreach {
              case (id, rdd) =>
                try {
                  rdd.unpersist()
                } catch {
                  case t: Exception => log.warn("Got excepting trying to unpersist rdd", t)
                }
            }
          log.error(s"Micro batch id=$batchId threw following exception: ", e)
          if (ignoreFailedBatch) {
            log.info(s"Ignore the exception and move on streaming as per " +
              s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
            Success((true, None, None))
          } else {
            if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
            Failure(e)
          }
        case Success((false, commitOps, compactionInstantOps, clusteringInstant, client, tableConfig)) =>
          log.error(s"Micro batch id=$batchId ended up with errors"
            + (commitOps.isPresent match {
              case true =>  s" for commit=${commitOps.get()}"
              case _ => s""
            }))
          if (ignoreFailedBatch) {
            log.info(s"Ignore the errors and move on streaming as per " +
              s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key} configuration")
            Success((true, None, None))
          } else {
            if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
            Failure(new HoodieCorruptedDataException(s"Micro batch id=$batchId ended up with errors"))
          }
      }

@xushiyan xushiyan moved this from Awaiting Ack Triaged to Awaiting Triage in GI Tracker Board Oct 24, 2022
@nsivabalan
Copy link
Contributor

thanks for reporting. I got what you are looking for. have put up a fix here.
#7140
yet to write tests. but you can test it out and let us know if it looks good.

GI Tracker Board automation moved this from Awaiting Triage to Done Nov 4, 2022
@nsivabalan
Copy link
Contributor

feel free to reopen if the fix does not solve your use-case.

@Zouxxyy
Copy link
Contributor

Zouxxyy commented Nov 13, 2022

@KnightChess Hi, the problem has been fixed, but the test is missing, can you provide a way to reproduce the problem, then I can add it to the UT

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
on-call-triaged priority:major degraded perf; unable to move forward; potential bugs spark Issues related to spark writer-core Issues relating to core transactions/write actions
Projects
Archived in project
Development

No branches or pull requests

4 participants