Skip to content

Commit

Permalink
Merge pull request #4769 from zeeshanakram3/Origo_fix_unhandled_QN_ex…
Browse files Browse the repository at this point in the history
…ception

Origo: fix unhandled QN exception in Argus
  • Loading branch information
mnaamani committed Jun 12, 2023
2 parents 0c9c506 + b234da0 commit 86a278a
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 24 deletions.
43 changes: 25 additions & 18 deletions distributor-node/src/services/content/ContentService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { ObjectStatus, ObjectStatusType, ReadonlyConfig } from '../../types'
import { StateCacheService } from '../cache/StateCacheService'
import { LoggingService } from '../logging'
import { Logger } from 'winston'
import { FileContinousReadStream, FileContinousReadStreamOptions } from './FileContinousReadStream'
import { FileContinuousReadStream, FileContinuousReadStreamOptions } from './FileContinuousReadStream'
import FileType from 'file-type'
import { Readable, pipeline } from 'stream'
import { NetworkingService } from '../networking'
Expand Down Expand Up @@ -45,25 +45,29 @@ export class ContentService {
}

public async cacheCleanup(): Promise<void> {
const supportedObjects = await this.networking.fetchSupportedDataObjects()
const cachedObjectsIds = this.stateCache.getCachedObjectsIds()
let droppedObjects = 0
try {
const supportedObjects = await this.networking.fetchSupportedDataObjects()
const cachedObjectsIds = this.stateCache.getCachedObjectsIds()
let droppedObjects = 0

this.logger.verbose('Performing cache cleanup...', {
supportedObjects: supportedObjects.size,
objectsInCache: cachedObjectsIds.length,
})
this.logger.verbose('Performing cache cleanup...', {
supportedObjects: supportedObjects.size,
objectsInCache: cachedObjectsIds.length,
})

for (const objectId of cachedObjectsIds) {
if (!supportedObjects.has(objectId)) {
this.drop(objectId, 'No longer supported')
++droppedObjects
for (const objectId of cachedObjectsIds) {
if (!supportedObjects.has(objectId)) {
this.drop(objectId, 'No longer supported')
++droppedObjects
}
}
}

this.logger.verbose('Cache cleanup finished', {
droppedObjects,
})
this.logger.verbose('Cache cleanup finished', {
droppedObjects,
})
} catch (err) {
this.logger.error('Failed to perform cache cleanup ', { err })
}
}

public async startupInit(): Promise<void> {
Expand Down Expand Up @@ -164,8 +168,11 @@ export class ContentService {
return fs.createWriteStream(this.path(objectId), { autoClose: true, emitClose: true })
}

public createContinousReadStream(objectId: string, options: FileContinousReadStreamOptions): FileContinousReadStream {
return new FileContinousReadStream(this.path(objectId), options)
public createContinuousReadStream(
objectId: string,
options: FileContinuousReadStreamOptions
): FileContinuousReadStream {
return new FileContinuousReadStream(this.path(objectId), options)
}

public async readFileChunk(path: string, bytes: number): Promise<Buffer> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { Readable } from 'stream'
import fs from 'fs'
import { Readable } from 'stream'

export interface FileContinousReadStreamOptions {
export interface FileContinuousReadStreamOptions {
end: number
start?: number
chunkSize?: number
missingDataRetryTime?: number
maxRetries?: number
}

export class FileContinousReadStream extends Readable {
export class FileContinuousReadStream extends Readable {
private fd: number
private position: number
private lastByte: number
Expand All @@ -18,7 +18,7 @@ export class FileContinousReadStream extends Readable {
private finished: boolean
private interval: NodeJS.Timeout | undefined

public constructor(path: string, options: FileContinousReadStreamOptions) {
public constructor(path: string, options: FileContinuousReadStreamOptions) {
super({
highWaterMark: options.chunkSize || 1 * 1024 * 1024, // default: 1 MB
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ export class PublicApiController {
range?: { start: number; end: number }
) {
this.logger.verbose(`Serving pending download asset from file`, { objectId, objectSize, range })
const stream = this.content.createContinousReadStream(objectId, {
const stream = this.content.createContinuousReadStream(objectId, {
start: range?.start,
end: range !== undefined ? range.end : objectSize - 1,
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ export class NetworkingService {
})
)
} catch (err) {
this.logger.error("Couldn't check active storage node endpooints", { err })
this.logger.error("Couldn't check active storage node endpoints", { err })
}
}

Expand Down

0 comments on commit 86a278a

Please sign in to comment.