Skip to content

feat: parallelization #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2599712
feat: handle parallel pagination for scrape list
RohitR311 Jan 14, 2025
0c655fc
feat: add support to handle parallel pagination
RohitR311 Jan 15, 2025
641e182
feat: add parallel scraping logic for click next pagination
RohitR311 Jan 15, 2025
352447b
feat: add parallel scraping support for scroll pagination types
RohitR311 Jan 15, 2025
4ffbcba
feat: rm parallelism for paginations except click next
RohitR311 Jan 15, 2025
4c0eb30
feat: estimate number of items per page
RohitR311 Jan 16, 2025
bd25950
chore: include all files from src dir
RohitR311 Jan 17, 2025
8360d3d
feat: add worker pool to support parallelism for click next
RohitR311 Jan 17, 2025
38539d5
feat: add worker pool for parallelization
RohitR311 Jan 17, 2025
e0b52c1
feat: add worker types
RohitR311 Jan 17, 2025
b2e8332
feat: rm worker pool logic
RohitR311 Jan 17, 2025
7d0339a
feat: rm worker pool logic
RohitR311 Jan 17, 2025
c4c77e6
Merge branch 'parallelization' of https://github.com/RohitR311/maxun …
RohitR311 Jan 17, 2025
5c6f478
feat: add kafka config
RohitR311 Jan 18, 2025
5becc84
feat: add kafka manager to create topics
RohitR311 Jan 19, 2025
5e8a6d1
feat: add scraper to scrape data and store in kafka
RohitR311 Jan 19, 2025
73c2cc3
feat: add kafka util to consume task data and produce messages
RohitR311 Jan 20, 2025
6984401
feat: add parallel scraping support using kafka
RohitR311 Jan 20, 2025
7105749
feat: add initial kafka setup script
RohitR311 Jan 20, 2025
a931a13
feat: add start consumer kafka script
RohitR311 Jan 20, 2025
5411484
feat: add limit in task config
RohitR311 Jan 20, 2025
7650794
chore: add kafka services
RohitR311 Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions maxun-core/src/config/kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export const kafkaConfig = {
clientId: 'maxun-scraper',
brokers: ['localhost:29092'],
topics: {
Comment on lines +1 to +4
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use environment variables for Kafka configuration.

The broker address and client ID are hardcoded, which limits deployment flexibility and poses security risks. Consider using environment variables for configuration.

Apply this diff to make the configuration more flexible:

+import { config } from 'dotenv';
+
+config();
+
 export const kafkaConfig = {
-    clientId: 'maxun-scraper',
-    brokers: ['localhost:29092'],
+    clientId: process.env.KAFKA_CLIENT_ID || 'maxun-scraper',
+    brokers: (process.env.KAFKA_BROKERS || 'localhost:29092').split(','),

Also, consider adding SSL/authentication configuration for production environments:

    // Add these properties to kafkaConfig
    ssl: process.env.KAFKA_SSL === 'true',
    sasl: process.env.KAFKA_SASL === 'true' ? {
      mechanism: process.env.KAFKA_SASL_MECHANISM,
      username: process.env.KAFKA_USERNAME,
      password: process.env.KAFKA_PASSWORD,
    } : undefined,

SCRAPING_TASKS: 'scraping-tasks',
SCRAPING_RESULTS: 'scraping-results',
SCRAPING_DLQ: 'scraping-dlq'
},
consumerGroup: 'scraping-group'
};
Comment on lines +9 to +10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add environment separation for consumer groups.

The consumer group should be environment-aware to prevent cross-environment message processing.

Apply this diff:

-    consumerGroup: 'scraping-group'
+    consumerGroup: `${process.env.ENV || 'dev'}-scraping-group`
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
consumerGroup: 'scraping-group'
};
consumerGroup: `${process.env.ENV || 'dev'}-scraping-group`
};

Comment on lines +1 to +10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Remove unused Kafka configuration

The Kafka configuration appears to be unused in the codebase. There are no imports of this configuration, no Kafka client instantiations, and no other Kafka-related code present. Consider removing this unused configuration to maintain a clean codebase.

🔗 Analysis chain

Verify Kafka configuration in the codebase.

Let's ensure the configuration is properly utilized across the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for proper usage of Kafka configuration and potential security issues

# Check for hardcoded Kafka configurations
echo "Checking for hardcoded Kafka configurations..."
rg -g '!*.{json,md}' -i 'localhost:29092'

# Check for proper import usage
echo "Checking kafka config imports..."
rg "from ['|\"].*config/kafka" -A 3

# Check for potential security-sensitive Kafka operations
echo "Checking for direct Kafka operations..."
ast-grep --pattern 'new Kafka($$$)'

Length of output: 451


Script:

#!/bin/bash
# Search for Kafka-related code and configurations

# Check for usage of kafkaConfig
echo "Checking for kafkaConfig usage..."
rg "kafkaConfig"

# Check for usage of topic names
echo "Checking for topic name usage..."
rg "scraping-tasks|scraping-results|scraping-dlq"

# Check for any Kafka-related imports or code
echo "Checking for Kafka-related code..."
rg -i "kafka" -g "!*.{json,md,txt}" -A 3

Length of output: 844

297 changes: 296 additions & 1 deletion maxun-core/src/interpret.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import Concurrency from './utils/concurrency';
import Preprocessor from './preprocessor';
import log, { Level } from './utils/logger';

import { Kafka } from 'kafkajs';
import { kafkaConfig } from './config/kafka';

import os from 'os';

/**
* Extending the Window interface for custom scraping functions.
*/
Expand All @@ -39,6 +44,7 @@ declare global {
interface InterpreterOptions {
maxRepeats: number;
maxConcurrency: number;
maxWorkers: number;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add missing property to InterpreterOptions interface

The serializableCallback property is used in the code but not defined in the provided InterpreterOptions interface.

Ensure that serializableCallback is correctly defined in the interface to maintain type safety.

serializableCallback: (output: any) => (void | Promise<void>);
binaryCallback: (output: any, mimeType: string) => (void | Promise<void>);
debug: boolean;
Expand Down Expand Up @@ -68,13 +74,31 @@ export default class Interpreter extends EventEmitter {

private cumulativeResults: Record<string, any>[] = [];

private kafka: Kafka;

private producer: any;

private async initializeKafka() {
this.producer = this.kafka.producer({
allowAutoTopicCreation: true,
idempotent: true
});
await this.producer.connect();
}

Comment on lines +77 to +88
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve Kafka producer initialization.

The current implementation has several issues:

  1. Hardcoded producer options
  2. Missing error handling
  3. No cleanup handling

Apply this diff to improve the implementation:

 private kafka: Kafka;
 private producer: any;
+private readonly producerConfig = {
+    allowAutoTopicCreation: true,
+    idempotent: true,
+    // Add other configurable options
+};
 
 private async initializeKafka() {
-    this.producer = this.kafka.producer({
-      allowAutoTopicCreation: true,
-      idempotent: true
-    });
-    await this.producer.connect();
+    try {
+        this.producer = this.kafka.producer(this.producerConfig);
+        await this.producer.connect();
+        console.log('Kafka producer connected successfully');
+    } catch (error) {
+        console.error('Failed to initialize Kafka producer:', error);
+        throw error;
+    }
 }
+
+private async cleanup() {
+    if (this.producer) {
+        try {
+            await this.producer.disconnect();
+            console.log('Kafka producer disconnected successfully');
+        } catch (error) {
+            console.error('Error disconnecting Kafka producer:', error);
+        }
+    }
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private kafka: Kafka;
private producer: any;
private async initializeKafka() {
this.producer = this.kafka.producer({
allowAutoTopicCreation: true,
idempotent: true
});
await this.producer.connect();
}
private kafka: Kafka;
private producer: any;
private readonly producerConfig = {
allowAutoTopicCreation: true,
idempotent: true,
// Add other configurable options
};
private async initializeKafka() {
try {
this.producer = this.kafka.producer(this.producerConfig);
await this.producer.connect();
console.log('Kafka producer connected successfully');
} catch (error) {
console.error('Failed to initialize Kafka producer:', error);
throw error;
}
}
private async cleanup() {
if (this.producer) {
try {
await this.producer.disconnect();
console.log('Kafka producer disconnected successfully');
} catch (error) {
console.error('Error disconnecting Kafka producer:', error);
}
}
}

constructor(workflow: WorkflowFile, options?: Partial<InterpreterOptions>) {
super();
this.workflow = workflow.workflow;
this.initializedWorkflow = null;
this.kafka = new Kafka({
clientId: kafkaConfig.clientId,
brokers: kafkaConfig.brokers
});
this.initializeKafka();
this.options = {
maxRepeats: 5,
maxConcurrency: 5,
maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)),
Comment on lines +93 to +101
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve constructor initialization.

The current implementation has several issues:

  1. Kafka initialization is not awaited
  2. maxWorkers calculation logic is embedded in the constructor
  3. Missing error handling for Kafka initialization

Apply this diff to improve the implementation:

+private calculateMaxWorkers(): number {
+    return Math.max(1, Math.min(os.cpus().length - 1, 4));
+}
+
 constructor(workflow: WorkflowFile, options?: Partial<InterpreterOptions>) {
     super();
     this.workflow = workflow.workflow;
     this.initializedWorkflow = null;
     this.kafka = new Kafka({
         clientId: kafkaConfig.clientId,
         brokers: kafkaConfig.brokers
     });
-    this.initializeKafka();
+    // Initialize Kafka asynchronously
+    this.initializeKafka().catch(error => {
+        console.error('Failed to initialize Kafka:', error);
+        throw error;
+    });
     this.options = {
         maxRepeats: 5,
         maxConcurrency: 5,
-        maxWorkers: Math.max(1, Math.min(os.cpus().length - 1, 4)),
+        maxWorkers: this.calculateMaxWorkers(),
         serializableCallback: (data) => { 
             log(JSON.stringify(data), Level.WARN);
         },

Committable suggestion skipped: line range outside the PR's diff.

serializableCallback: (data) => {
log(JSON.stringify(data), Level.WARN);
},
Expand Down Expand Up @@ -451,7 +475,7 @@ export default class Interpreter extends EventEmitter {
const scrapeResults: Record<string, any>[] = await page.evaluate((cfg) => window.scrapeList(cfg), config);
await this.options.serializableCallback(scrapeResults);
} else {
const scrapeResults: Record<string, any>[] = await this.handlePagination(page, config);
const scrapeResults: Record<string, any>[] = await this.handleParallelPagination(page, config);
await this.options.serializableCallback(scrapeResults);
}
},
Expand Down Expand Up @@ -540,6 +564,276 @@ export default class Interpreter extends EventEmitter {
}
}

private async handleParallelPagination(page: Page, config: any): Promise<any[]> {
if (config.limit > 10000 && config.pagination.type === 'clickNext') {
console.time('parallel-scraping');

const workflowId = `workflow-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
console.log(`Starting workflow with ID: ${workflowId}`);

const numWorkers = Math.max(1, Math.min(os.cpus().length - 1, 4));
const batchSize = Math.ceil(config.limit / numWorkers);
const tasks = [];
const pageUrls: string[] = [];

let availableSelectors = config.pagination.selector.split(',');
let visitedUrls: string[] = [];

const { itemsPerPage, estimatedPages } = await page.evaluate(
({ listSelector, limit }) => {
const items = document.querySelectorAll(listSelector).length;
return {
itemsPerPage: items,
estimatedPages: Math.ceil(limit / items)
};
},
{ listSelector: config.listSelector, limit: config.limit }
);

console.log(`Items per page: ${itemsPerPage}`);
console.log(`Estimated pages needed: ${estimatedPages}`);

try {
while (true) {
pageUrls.push(page.url())

if (pageUrls.length >= estimatedPages) {
console.log('Reached estimated number of pages. Stopping pagination.');
break;
}

let checkButton = null;
let workingSelector = null;

for (let i = 0; i < availableSelectors.length; i++) {
const selector = availableSelectors[i];
try {
// Wait for selector with a short timeout
checkButton = await page.waitForSelector(selector, { state: 'attached' });
if (checkButton) {
workingSelector = selector;
break;
}
} catch (error) {
console.log(`Selector failed: ${selector}`);
}
}

if(!workingSelector) {
break;
}

const nextButton = await page.$(workingSelector);
if (!nextButton) {
break;
}

const selectorIndex = availableSelectors.indexOf(workingSelector!);
availableSelectors = availableSelectors.slice(selectorIndex);

const previousUrl = page.url();
visitedUrls.push(previousUrl);

try {
// Try both click methods simultaneously
await Promise.race([
Promise.all([
page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }),
nextButton.click()
]),
Promise.all([
page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }),
nextButton.dispatchEvent('click')
])
]);
} catch (error) {
// Verify if navigation actually succeeded
const currentUrl = page.url();
if (currentUrl === previousUrl) {
console.log("Previous URL same as current URL. Navigation failed.");
}
}

const currentUrl = page.url();
if (visitedUrls.includes(currentUrl)) {
console.log(`Detected navigation to a previously visited URL: ${currentUrl}`);

// Extract the current page number from the URL
const match = currentUrl.match(/\d+/);
if (match) {
const currentNumber = match[0];
// Use visitedUrls.length + 1 as the next page number
const nextNumber = visitedUrls.length + 1;

// Create new URL by replacing the current number with the next number
const nextUrl = currentUrl.replace(currentNumber, nextNumber.toString());

console.log(`Navigating to constructed URL: ${nextUrl}`);

// Navigate to the next page
await Promise.all([
page.waitForNavigation({ waitUntil: 'networkidle' }),
page.goto(nextUrl)
]);
}
}

await page.waitForTimeout(1000);
}
Comment on lines +596 to +682
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Modularize the pagination navigation logic.

The navigation logic is complex and could benefit from being split into smaller, focused functions for better maintainability and testing.

Consider extracting these functionalities:

  1. URL collection logic
  2. Navigation handling
  3. URL construction for pagination

Example refactor for URL collection:

private async collectPageUrls(page: Page, config: any): Promise<string[]> {
  const pageUrls: string[] = [];
  const visitedUrls: Set<string> = new Set();
  
  while (true) {
    const currentUrl = page.url();
    if (visitedUrls.has(currentUrl)) {
      break;
    }
    
    pageUrls.push(currentUrl);
    visitedUrls.add(currentUrl);
    
    if (!await this.navigateToNextPage(page)) {
      break;
    }
  }
  
  return pageUrls;
}

} catch (error) {
console.error('Error collecting page URLs:', error);
}

console.log(`Collected ${pageUrls.length} unique page URLs`);

for (let i = 0; i < numWorkers; i++) {
const startIndex = i * batchSize;
const endIndex = Math.min((i + 1) * batchSize, config.limit);
const workerUrls = pageUrls.slice(
i * Math.ceil(pageUrls.length / numWorkers),
(i + 1) * Math.ceil(pageUrls.length / numWorkers)
);

const task = {
taskId: `${workflowId}-task-${i}`,
workflowId,
urls: workerUrls,
config: {
listSelector: config.listSelector,
fields: config.fields,
pagination: config.pagination,
batchSize: endIndex - startIndex,
startIndex,
endIndex
}
};

await this.producer.send({
topic: kafkaConfig.topics.SCRAPING_TASKS,
messages: [{
key: task.taskId,
value: JSON.stringify(task),
headers: {
'workflow-id': workflowId,
'retry-count': '0',
'total-tasks': numWorkers.toString()
}
}]
});

tasks.push(task);
}

console.log("TASKS SENT TO KAFKA (Not stringified)", tasks);

// Wait for results from Kafka
const results = await this.waitForScrapingResults(tasks);
console.timeEnd('parallel-scraping');
return results;
}

return this.handlePagination(page, config);
}

private async waitForScrapingResults(tasks: any[]): Promise<any[]> {
// Create a map to store our workflow's results
const resultsMap = new Map<string, any[]>();

// Extract the workflow ID from the first task - all tasks in this batch will share the same workflow ID
const workflowId = tasks[0].workflowId;
console.log(`Waiting for results from workflow: ${workflowId}`);

// Create a Set of task IDs for quick lookup - these are the only tasks we care about
const expectedTaskIds = new Set(tasks.map(task => task.taskId));

// Create a consumer specifically for this workflow
const resultConsumer = this.kafka.consumer({
groupId: `scraping-group-results-${workflowId}`,
maxWaitTimeInMs: 1000,
maxBytesPerPartition: 2097152 // 2MB
});

try {
await resultConsumer.connect();
console.log('Result consumer connected successfully');

await resultConsumer.subscribe({
topic: kafkaConfig.topics.SCRAPING_RESULTS,
fromBeginning: true
});
console.log('Result consumer subscribed to topic successfully');

return new Promise((resolve, reject) => {
let isRunning = true;

resultConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!isRunning) return;

try {
const result = JSON.parse(message.value!.toString());

// Verify both task ID and workflow ID match
if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) {
// Store this task's results
if (!resultsMap.has(result.taskId)) {
resultsMap.set(result.taskId, result.data);
console.log(`Received results for task ${result.taskId}. ` +
`Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}`);
}

// Check if we have all our workflow's results
if (resultsMap.size === tasks.length) {
isRunning = false;

// Sort tasks by their numeric index (extract number from task ID)
const sortedTasks = [...tasks].sort((a, b) => {
const aIndex = parseInt(a.taskId.split('-').pop() || '0');
const bIndex = parseInt(b.taskId.split('-').pop() || '0');
return aIndex - bIndex;
});

// Combine results in the sorted task order
const allResults = sortedTasks
.map(task => {
const taskResults = resultsMap.get(task.taskId);
if (!taskResults) {
console.warn(`Missing results for task ${task.taskId} in workflow ${workflowId}`);
return [];
}
return taskResults;
})
.flat();

console.log(`Successfully collected all results from workflow ${workflowId}`);

resolve(allResults);
}
}
} catch (error) {
console.error(`Error processing message in workflow ${workflowId}:`, error);
reject(error);
}
}
});

// // Add a timeout to prevent hanging
// const timeout = setTimeout(() => {
// if (isRunning) {
// isRunning = false;
// console.error(`Timeout waiting for results from workflow ${workflowId}. ` +
// `Received ${resultsMap.size} of ${tasks.length} expected results.`);
// reject(new Error(`Timeout waiting for results from workflow ${workflowId}`));
// }
// }, 30000); // 30 second timeout
});

} catch (error) {
console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error);
throw error;
}
}

Comment on lines +738 to +836
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Improve result collection reliability.

The current implementation has several issues:

  1. Commented-out timeout code
  2. Missing proper consumer cleanup
  3. No handling of partial results

Apply this diff to improve the implementation:

+interface ScrapingResult {
+    workflowId: string;
+    taskId: string;
+    data: any[];
+}
+
+const RESULT_COLLECTION_TIMEOUT_MS = 30000;
+
 private async waitForScrapingResults(tasks: any[]): Promise<any[]> {
     const resultsMap = new Map<string, any[]>();
     const workflowId = tasks[0].workflowId;
     console.log(`Waiting for results from workflow: ${workflowId}`);
     
     const expectedTaskIds = new Set(tasks.map(task => task.taskId));
     
     const resultConsumer = this.kafka.consumer({ 
         groupId: `scraping-group-results-${workflowId}`,
         maxWaitTimeInMs: 1000,
         maxBytesPerPartition: 2097152
     });
 
     try {
         await resultConsumer.connect();
         console.log('Result consumer connected successfully');
         
         await resultConsumer.subscribe({ 
             topic: kafkaConfig.topics.SCRAPING_RESULTS,
             fromBeginning: true 
         });
         console.log('Result consumer subscribed to topic successfully');
 
         return new Promise((resolve, reject) => {
             let isRunning = true;
+            const timeout = setTimeout(() => {
+                if (isRunning) {
+                    isRunning = false;
+                    const error = new Error(
+                        `Timeout waiting for results from workflow ${workflowId}. ` +
+                        `Received ${resultsMap.size} of ${tasks.length} expected results.`
+                    );
+                    reject(error);
+                }
+            }, RESULT_COLLECTION_TIMEOUT_MS);
 
             resultConsumer.run({
                 eachMessage: async ({ topic, partition, message }) => {
                     if (!isRunning) return;
                     
                     try {
-                        const result = JSON.parse(message.value!.toString());
+                        const result = JSON.parse(message.value!.toString()) as ScrapingResult;
                         
                         if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) {
                             if (!resultsMap.has(result.taskId)) {
                                 resultsMap.set(result.taskId, result.data);
                                 console.log(
                                     `Received results for task ${result.taskId}. ` +
                                     `Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}`
                                 );
                             }
 
                             if (resultsMap.size === tasks.length) {
                                 isRunning = false;
+                                clearTimeout(timeout);
                                 
                                 const sortedTasks = [...tasks].sort((a, b) => {
                                     const aIndex = parseInt(a.taskId.split('-').pop() || '0');
                                     const bIndex = parseInt(b.taskId.split('-').pop() || '0');
                                     return aIndex - bIndex;
                                 });
 
                                 const allResults = sortedTasks
                                     .map(task => resultsMap.get(task.taskId) || [])
                                     .flat();
 
                                 console.log(`Successfully collected all results from workflow ${workflowId}`);
                                 
+                                await resultConsumer.disconnect();
                                 resolve(allResults);
                             }
                         }
                     } catch (error) {
                         console.error(`Error processing message in workflow ${workflowId}:`, error);
+                        clearTimeout(timeout);
+                        await resultConsumer.disconnect();
                         reject(error);
                     }
                 }
             });
         });
     } catch (error) {
         console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error);
+        await resultConsumer.disconnect();
         throw error;
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private async waitForScrapingResults(tasks: any[]): Promise<any[]> {
// Create a map to store our workflow's results
const resultsMap = new Map<string, any[]>();
// Extract the workflow ID from the first task - all tasks in this batch will share the same workflow ID
const workflowId = tasks[0].workflowId;
console.log(`Waiting for results from workflow: ${workflowId}`);
// Create a Set of task IDs for quick lookup - these are the only tasks we care about
const expectedTaskIds = new Set(tasks.map(task => task.taskId));
// Create a consumer specifically for this workflow
const resultConsumer = this.kafka.consumer({
groupId: `scraping-group-results-${workflowId}`,
maxWaitTimeInMs: 1000,
maxBytesPerPartition: 2097152 // 2MB
});
try {
await resultConsumer.connect();
console.log('Result consumer connected successfully');
await resultConsumer.subscribe({
topic: kafkaConfig.topics.SCRAPING_RESULTS,
fromBeginning: true
});
console.log('Result consumer subscribed to topic successfully');
return new Promise((resolve, reject) => {
let isRunning = true;
resultConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!isRunning) return;
try {
const result = JSON.parse(message.value!.toString());
// Verify both task ID and workflow ID match
if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) {
// Store this task's results
if (!resultsMap.has(result.taskId)) {
resultsMap.set(result.taskId, result.data);
console.log(`Received results for task ${result.taskId}. ` +
`Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}`);
}
// Check if we have all our workflow's results
if (resultsMap.size === tasks.length) {
isRunning = false;
// Sort tasks by their numeric index (extract number from task ID)
const sortedTasks = [...tasks].sort((a, b) => {
const aIndex = parseInt(a.taskId.split('-').pop() || '0');
const bIndex = parseInt(b.taskId.split('-').pop() || '0');
return aIndex - bIndex;
});
// Combine results in the sorted task order
const allResults = sortedTasks
.map(task => {
const taskResults = resultsMap.get(task.taskId);
if (!taskResults) {
console.warn(`Missing results for task ${task.taskId} in workflow ${workflowId}`);
return [];
}
return taskResults;
})
.flat();
console.log(`Successfully collected all results from workflow ${workflowId}`);
resolve(allResults);
}
}
} catch (error) {
console.error(`Error processing message in workflow ${workflowId}:`, error);
reject(error);
}
}
});
// // Add a timeout to prevent hanging
// const timeout = setTimeout(() => {
// if (isRunning) {
// isRunning = false;
// console.error(`Timeout waiting for results from workflow ${workflowId}. ` +
// `Received ${resultsMap.size} of ${tasks.length} expected results.`);
// reject(new Error(`Timeout waiting for results from workflow ${workflowId}`));
// }
// }, 30000); // 30 second timeout
});
} catch (error) {
console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error);
throw error;
}
}
interface ScrapingResult {
workflowId: string;
taskId: string;
data: any[];
}
const RESULT_COLLECTION_TIMEOUT_MS = 30000;
private async waitForScrapingResults(tasks: any[]): Promise<any[]> {
// Create a map to store our workflow's results
const resultsMap = new Map<string, any[]>();
// Extract the workflow ID from the first task - all tasks in this batch will share the same workflow ID
const workflowId = tasks[0].workflowId;
console.log(`Waiting for results from workflow: ${workflowId}`);
// Create a Set of task IDs for quick lookup - these are the only tasks we care about
const expectedTaskIds = new Set(tasks.map(task => task.taskId));
// Create a consumer specifically for this workflow
const resultConsumer = this.kafka.consumer({
groupId: `scraping-group-results-${workflowId}`,
maxWaitTimeInMs: 1000,
maxBytesPerPartition: 2097152 // 2MB
});
try {
await resultConsumer.connect();
console.log('Result consumer connected successfully');
await resultConsumer.subscribe({
topic: kafkaConfig.topics.SCRAPING_RESULTS,
fromBeginning: true
});
console.log('Result consumer subscribed to topic successfully');
return new Promise((resolve, reject) => {
let isRunning = true;
const timeout = setTimeout(() => {
if (isRunning) {
isRunning = false;
const error = new Error(
`Timeout waiting for results from workflow ${workflowId}. ` +
`Received ${resultsMap.size} of ${tasks.length} expected results.`
);
reject(error);
}
}, RESULT_COLLECTION_TIMEOUT_MS);
resultConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
if (!isRunning) return;
try {
const result = JSON.parse(message.value!.toString()) as ScrapingResult;
// Verify both task ID and workflow ID match
if (result.workflowId === workflowId && expectedTaskIds.has(result.taskId)) {
// Store this task's results
if (!resultsMap.has(result.taskId)) {
resultsMap.set(result.taskId, result.data);
console.log(
`Received results for task ${result.taskId}. ` +
`Got ${resultsMap.size} of ${tasks.length} tasks from workflow ${workflowId}`
);
}
// Check if we have all our workflow's results
if (resultsMap.size === tasks.length) {
isRunning = false;
clearTimeout(timeout);
// Sort tasks by their numeric index (extract number from task ID)
const sortedTasks = [...tasks].sort((a, b) => {
const aIndex = parseInt(a.taskId.split('-').pop() || '0');
const bIndex = parseInt(b.taskId.split('-').pop() || '0');
return aIndex - bIndex;
});
// Combine results in the sorted task order
const allResults = sortedTasks
.map(task => resultsMap.get(task.taskId) || [])
.flat();
console.log(`Successfully collected all results from workflow ${workflowId}`);
await resultConsumer.disconnect();
resolve(allResults);
}
}
} catch (error) {
console.error(`Error processing message in workflow ${workflowId}:`, error);
clearTimeout(timeout);
await resultConsumer.disconnect();
reject(error);
}
}
});
});
} catch (error) {
console.error(`Fatal error in waitForScrapingResults for workflow ${workflowId}:`, error);
await resultConsumer.disconnect();
throw error;
}
}

private async handlePagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) {
let allResults: Record<string, any>[] = [];
let previousHeight = 0;
Expand All @@ -556,6 +850,7 @@ export default class Interpreter extends EventEmitter {
await page.waitForTimeout(2000);

const currentHeight = await page.evaluate(() => document.body.scrollHeight);
console.log(`Current scroll height: ${currentHeight}`);
if (currentHeight === previousHeight) {
const finalResults = await page.evaluate((cfg) => window.scrapeList(cfg), config);
allResults = allResults.concat(finalResults);
Expand Down
23 changes: 23 additions & 0 deletions maxun-core/src/scripts/setup-kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { KafkaManager } from '../utils/kafka-manager';

async function setupKafka() {
const manager = new KafkaManager();

try {
console.log('Initializing Kafka manager...');
await manager.initialize();
console.log('Kafka setup completed successfully');

// Keep monitoring for a while to verify setup
setTimeout(async () => {
await manager.cleanup();
process.exit(0);
}, 10000);

} catch (error) {
console.error('Failed to setup Kafka:', error);
process.exit(1);
}
}
Comment on lines +3 to +21
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve Kafka setup monitoring.

The current implementation has several issues:

  1. Hardcoded timeout duration
  2. Forced cleanup after timeout regardless of setup status
  3. Missing configuration validation

Apply this diff to improve the implementation:

+const KAFKA_SETUP_TIMEOUT_MS = 10000; // Move to config
+
 async function setupKafka() {
     const manager = new KafkaManager();
     
     try {
         console.log('Initializing Kafka manager...');
+        // Validate Kafka configuration
+        if (!manager.validateConfig()) {
+            throw new Error('Invalid Kafka configuration');
+        }
+
         await manager.initialize();
         console.log('Kafka setup completed successfully');
         
-        // Keep monitoring for a while to verify setup
-        setTimeout(async () => {
-            await manager.cleanup();
-            process.exit(0);
-        }, 10000);
+        // Monitor setup status
+        const setupTimeout = new Promise((_, reject) => {
+            setTimeout(() => reject(new Error('Kafka setup verification timeout')), 
+                KAFKA_SETUP_TIMEOUT_MS);
+        });
+
+        try {
+            await Promise.race([
+                manager.verifySetup(),
+                setupTimeout
+            ]);
+            console.log('Kafka setup verified successfully');
+            await manager.cleanup();
+            process.exit(0);
+        } catch (error) {
+            throw new Error(`Kafka setup verification failed: ${error.message}`);
+        }
     } catch (error) {
         console.error('Failed to setup Kafka:', error);
         process.exit(1);
     }
 }

Committable suggestion skipped: line range outside the PR's diff.


setupKafka().catch(console.error);
Loading