Skip to content

Commit

Permalink
fixed stream.push() after EOF error
Browse files Browse the repository at this point in the history
maybe
  • Loading branch information
404invalid-user committed Aug 23, 2024
1 parent 8f8b73c commit 85504da
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 22 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "redbased",
"version": "0.2.0",
"version": "0.2.3",
"description": "use reddis like a database with predefined schemas",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
Expand Down
51 changes: 32 additions & 19 deletions src/SchemaInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import pluralize from './helpers/pluralize';
import documentValidation from './helpers/documentValidation';
import constructDocument from './helpers/constructDocument';
import { Redis } from 'ioredis';
import { Console } from 'console';



Expand All @@ -27,7 +28,7 @@ export class SchemaInstance {
public async create(data: Document): Promise<Document> {
if (this.redisClient === null) throw new Error("no redis connection detected please first await successfull connection");
const documentData = await constructDocument(this.redisClient, this.name, data, this.fields);
const validateDoc = documentValidation(data, this.fields, false);
const validateDoc = documentValidation(documentData, this.fields, false);
if (validateDoc.pass === false) {
const err = new Error(`Failed to create ${this.name} ${validateDoc.msg}`);
Error.captureStackTrace(err);
Expand Down Expand Up @@ -79,12 +80,13 @@ export class SchemaInstance {
let foundDocumentsArr: DocumentInstance[] = [];
let processedDocuments: number = 0;


console.log("creating stream for " + this.name)
// Create a readable stream to emit the matching documents
const readableStream = new Readable({
objectMode: true,
read() { }
});
let streamEnded = false;


/** check if a document fits the filter */
Expand All @@ -111,24 +113,17 @@ export class SchemaInstance {
function constructDocInStream(id: string, value: string) {
const valueJson = JSON.parse(value);
if (valueJson.id != id) valueJson.id = id;
processedDocuments++;
return valueJson;
}
/** push and reset local array */
function pushAndResetArray() {
//NOTE - push an empy array before stream close incase there was no items
//if (foundDocumentsArr.length <= 0) return false;
readableStream.push(foundDocumentsArr);
foundDocumentsArr = [];
return true;
}



/** handle the redis stream */
const handleRedisStreamData = (results: string[]) => {
const handleRedisStreamData = async (results: string[]) => {
//return empyty arr and end stream if lenght 0
if (results.length <= 0) {
readableStream.push([]);
streamEnded = true;
return readableStream.push(null);;
}

Expand All @@ -141,19 +136,37 @@ export class SchemaInstance {
if (raw == false) foundDocumentsArr.push(new DocumentInstance(this.redisClient, this.name, doc, this.fields));
}

//check if we should emit the local array to stream
//check len of arr and see if matches limit or total count
if (foundDocumentsArr.length >= limit || foundDocumentsArr.length >= SchemaDocumentCount) {
pushAndResetArray();
//NOTE - push an empy array before stream close incase there was no items
if (!readableStream.readableEnded && !readableStream.destroyed) {
if (!streamEnded) {
readableStream.push(foundDocumentsArr);
}
} else {
console.error('Attempted to push data after the stream has ended or been destroyed.');
}
foundDocumentsArr = [];
}




processedDocuments++;
console.log(processedDocuments, SchemaDocumentCount)
//push rest of docs if any and end stream when finished processing all documents
if (processedDocuments >= SchemaDocumentCount) {
pushAndResetArray();
if (!streamEnded) {
if (foundDocumentsArr.length > 0) readableStream.push(foundDocumentsArr);
} else {
console.error('Attempted to push data after the stream has ended or been destroyed.');

console.log(foundDocumentsArr)
}
console.log('\n\n\n\npushing LAST docs for ' + this.name);
console.log(foundDocumentsArr)
console.log("pushing null")
readableStream.push(null);
streamEnded = true;
break;
}
console.log(i, results.length)
}
}
//redis stream
Expand Down
4 changes: 2 additions & 2 deletions src/helpers/documentValidation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function documentValidation(doc: Document | null = null, validationFeilds: Field
//does not contain extra unknown keys
if (doc !== null) {
for (const key of Object.keys(doc)) {
if (!(key in validationFeilds)) {
if (!(key in validationFeilds) && key !== 'id') {
return {
pass: false,
msg: `Schema does not contain field '${key}'.`
Expand Down Expand Up @@ -53,7 +53,7 @@ function documentValidation(doc: Document | null = null, validationFeilds: Field
if (required === false && typeOfSupplyData !== "Undefined") {
return {
pass: false,
msg: `Field '${field}' must be of type '${typeStr}' ${required === false ? "or 'undefined'": ''}.`
msg: `Field '${field}' must be of type '${typeStr}' ${required === false ? "or 'undefined'" : ''}.`
}
}
}
Expand Down

0 comments on commit 85504da

Please sign in to comment.