Skip to content

Commit

Permalink
Fix indent
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Dec 2, 2016
1 parent afd5c0f commit 7ee4cf1
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -647,38 +647,38 @@ object StreamMetadata extends Logging {
/** Read the metadata from file if it exists */
def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = {
val fs = FileSystem.get(hadoopConf)
if (fs.exists(metadataFile)) {
var input: FSDataInputStream = null
try {
input = fs.open(metadataFile)
val reader = new InputStreamReader(input, StandardCharsets.UTF_8)
val metadata = Serialization.read[StreamMetadata](reader)
Some(metadata)
} catch {
case NonFatal(e) =>
logError(s"Error reading stream metadata from $metadataFile", e)
throw e
} finally {
IOUtils.closeQuietly(input)
}
} else None
if (fs.exists(metadataFile)) {
var input: FSDataInputStream = null
try {
input = fs.open(metadataFile)
val reader = new InputStreamReader(input, StandardCharsets.UTF_8)
val metadata = Serialization.read[StreamMetadata](reader)
Some(metadata)
} catch {
case NonFatal(e) =>
logError(s"Error reading stream metadata from $metadataFile", e)
throw e
} finally {
IOUtils.closeQuietly(input)
}
} else None
}

/** Write metadata to file, overwrite if it exists */
/** Write metadata to file */
def write(
metadata: StreamMetadata,
metadataFile: Path,
hadoopConf: Configuration): Unit = {
var output: FSDataOutputStream = null
try {
val fs = FileSystem.get(hadoopConf)
output = fs.create(metadataFile, true) // overwrite if exists
output = fs.create(metadataFile)
val writer = new OutputStreamWriter(output)
Serialization.write(metadata, writer)
writer.close()
} catch {
case NonFatal(e) =>
logError(s"Error writing stream metedata $metadata to $metadataFile", e)
logError(s"Error writing stream metadata $metadata to $metadataFile", e)
throw e
} finally {
IOUtils.closeQuietly(output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class StateOperatorProgress private[sql](
*
* @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param name User-specified name of the query.
* @param name User-specified name of the query, null if not specified.
* @param timestamp Timestamp (ms) of the beginning of the trigger.
* @param batchId A unique id for the current batch of data being processed. Note that in the
* case of retries after a failure a given batchId my be executed more than once.
Expand Down

0 comments on commit 7ee4cf1

Please sign in to comment.