Skip to content

feat: parallelization #352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2599712
feat: handle parallel pagination for scrape list
RohitR311 Jan 14, 2025
0c655fc
feat: add support to handle parallel pagination
RohitR311 Jan 15, 2025
641e182
feat: add parallel scraping logic for click next pagination
RohitR311 Jan 15, 2025
352447b
feat: add parallel scraping support for scroll pagination types
RohitR311 Jan 15, 2025
4ffbcba
feat: rm parallelism for paginations except click next
RohitR311 Jan 15, 2025
4c0eb30
feat: estimate number of items per page
RohitR311 Jan 16, 2025
bd25950
chore: include all files from src dir
RohitR311 Jan 17, 2025
8360d3d
feat: add worker pool to support parallelism for click next
RohitR311 Jan 17, 2025
38539d5
feat: add worker pool for parallelization
RohitR311 Jan 17, 2025
e0b52c1
feat: add worker types
RohitR311 Jan 17, 2025
b2e8332
feat: rm worker pool logic
RohitR311 Jan 17, 2025
7d0339a
feat: rm worker pool logic
RohitR311 Jan 17, 2025
c4c77e6
Merge branch 'parallelization' of https://github.com/RohitR311/maxun …
RohitR311 Jan 17, 2025
5c6f478
feat: add kafka config
RohitR311 Jan 18, 2025
5becc84
feat: add kafka manager to create topics
RohitR311 Jan 19, 2025
5e8a6d1
feat: add scraper to scrape data and store in kafka
RohitR311 Jan 19, 2025
73c2cc3
feat: add kafka util to consume task data and produce messages
RohitR311 Jan 20, 2025
6984401
feat: add parallel scraping support using kafka
RohitR311 Jan 20, 2025
7105749
feat: add initial kafka setup script
RohitR311 Jan 20, 2025
a931a13
feat: add start consumer kafka script
RohitR311 Jan 20, 2025
5411484
feat: add limit in task config
RohitR311 Jan 20, 2025
7650794
chore: add kafka services
RohitR311 Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions maxun-core/src/config/kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export const kafkaConfig = {
clientId: 'maxun-scraper',
brokers: ['localhost:29092'],
topics: {
Comment on lines +1 to +4
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use environment variables for Kafka configuration.

The broker address and client ID are hardcoded, which limits deployment flexibility and poses security risks. Consider using environment variables for configuration.

Apply this diff to make the configuration more flexible:

+import { config } from 'dotenv';
+
+config();
+
 export const kafkaConfig = {
-    clientId: 'maxun-scraper',
-    brokers: ['localhost:29092'],
+    clientId: process.env.KAFKA_CLIENT_ID || 'maxun-scraper',
+    brokers: (process.env.KAFKA_BROKERS || 'localhost:29092').split(','),

Also, consider adding SSL/authentication configuration for production environments:

    // Add these properties to kafkaConfig
    ssl: process.env.KAFKA_SSL === 'true',
    sasl: process.env.KAFKA_SASL === 'true' ? {
      mechanism: process.env.KAFKA_SASL_MECHANISM,
      username: process.env.KAFKA_USERNAME,
      password: process.env.KAFKA_PASSWORD,
    } : undefined,

SCRAPING_TASKS: 'scraping-tasks',
SCRAPING_RESULTS: 'scraping-results',
SCRAPING_DLQ: 'scraping-dlq'
},
consumerGroup: 'scraping-group'
};
Comment on lines +9 to +10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add environment separation for consumer groups.

The consumer group should be environment-aware to prevent cross-environment message processing.

Apply this diff:

-    consumerGroup: 'scraping-group'
+    consumerGroup: `${process.env.ENV || 'dev'}-scraping-group`
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
consumerGroup: 'scraping-group'
};
consumerGroup: `${process.env.ENV || 'dev'}-scraping-group`
};

Comment on lines +1 to +10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Remove unused Kafka configuration

The Kafka configuration appears to be unused in the codebase. There are no imports of this configuration, no Kafka client instantiations, and no other Kafka-related code present. Consider removing this unused configuration to maintain a clean codebase.

🔗 Analysis chain

Verify Kafka configuration in the codebase.

Let's ensure the configuration is properly utilized across the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for proper usage of Kafka configuration and potential security issues

# Check for hardcoded Kafka configurations
echo "Checking for hardcoded Kafka configurations..."
rg -g '!*.{json,md}' -i 'localhost:29092'

# Check for proper import usage
echo "Checking kafka config imports..."
rg "from ['|\"].*config/kafka" -A 3

# Check for potential security-sensitive Kafka operations
echo "Checking for direct Kafka operations..."
ast-grep --pattern 'new Kafka($$$)'

Length of output: 451


Script:

#!/bin/bash
# Search for Kafka-related code and configurations

# Check for usage of kafkaConfig
echo "Checking for kafkaConfig usage..."
rg "kafkaConfig"

# Check for usage of topic names
echo "Checking for topic name usage..."
rg "scraping-tasks|scraping-results|scraping-dlq"

# Check for any Kafka-related imports or code
echo "Checking for Kafka-related code..."
rg -i "kafka" -g "!*.{json,md,txt}" -A 3

Length of output: 844

130 changes: 129 additions & 1 deletion maxun-core/src/interpret.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import Concurrency from './utils/concurrency';
import Preprocessor from './preprocessor';
import log, { Level } from './utils/logger';

import os from 'os';

/**
* Extending the Window interface for custom scraping functions.
*/
Expand Down Expand Up @@ -451,7 +453,7 @@ export default class Interpreter extends EventEmitter {
const scrapeResults: Record<string, any>[] = await page.evaluate((cfg) => window.scrapeList(cfg), config);
await this.options.serializableCallback(scrapeResults);
} else {
const scrapeResults: Record<string, any>[] = await this.handlePagination(page, config);
const scrapeResults: Record<string, any>[] = await this.handleParallelPagination(page, config);
await this.options.serializableCallback(scrapeResults);
}
},
Expand Down Expand Up @@ -540,6 +542,131 @@ export default class Interpreter extends EventEmitter {
}
}

private async handleParallelPagination(page: Page, config: any): Promise<any[]> {
if (config.limit > 10000 && config.pagination.type === 'clickNext') {
console.time('parallel-scraping');

const numWorkers = Math.max(1, Math.min(os.cpus().length - 1, 4));
const batchSize = Math.ceil(config.limit / numWorkers);
const pageUrls: string[] = [];

let workers: any = null;
let availableSelectors = config.pagination.selector.split(',');
let visitedUrls: string[] = [];

const { itemsPerPage, estimatedPages } = await page.evaluate(
({ listSelector, limit }) => {
const items = document.querySelectorAll(listSelector).length;
return {
itemsPerPage: items,
estimatedPages: Math.ceil(limit / items)
};
},
{ listSelector: config.listSelector, limit: config.limit }
);

console.log(`Items per page: ${itemsPerPage}`);
console.log(`Estimated pages needed: ${estimatedPages}`);

try {
while (true) {
pageUrls.push(page.url())

if (pageUrls.length >= estimatedPages) {
console.log('Reached estimated number of pages. Stopping pagination.');
break;
}

let checkButton = null;
let workingSelector = null;

for (let i = 0; i < availableSelectors.length; i++) {
const selector = availableSelectors[i];
try {
// Wait for selector with a short timeout
checkButton = await page.waitForSelector(selector, { state: 'attached' });
if (checkButton) {
workingSelector = selector;
break;
}
} catch (error) {
console.log(`Selector failed: ${selector}`);
}
}

if(!workingSelector) {
break;
}

const nextButton = await page.$(workingSelector);
if (!nextButton) {
break;
}

const selectorIndex = availableSelectors.indexOf(workingSelector!);
availableSelectors = availableSelectors.slice(selectorIndex);

const previousUrl = page.url();
visitedUrls.push(previousUrl);

try {
// Try both click methods simultaneously
await Promise.race([
Promise.all([
page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }),
nextButton.click()
]),
Promise.all([
page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }),
nextButton.dispatchEvent('click')
])
]);
} catch (error) {
// Verify if navigation actually succeeded
const currentUrl = page.url();
if (currentUrl === previousUrl) {
console.log("Previous URL same as current URL. Navigation failed.");
}
}

const currentUrl = page.url();
if (visitedUrls.includes(currentUrl)) {
console.log(`Detected navigation to a previously visited URL: ${currentUrl}`);

// Extract the current page number from the URL
const match = currentUrl.match(/\d+/);
if (match) {
const currentNumber = match[0];
// Use visitedUrls.length + 1 as the next page number
const nextNumber = visitedUrls.length + 1;

// Create new URL by replacing the current number with the next number
const nextUrl = currentUrl.replace(currentNumber, nextNumber.toString());

console.log(`Navigating to constructed URL: ${nextUrl}`);

// Navigate to the next page
await Promise.all([
page.waitForNavigation({ waitUntil: 'networkidle' }),
page.goto(nextUrl)
]);
}
}

await page.waitForTimeout(1000);
}
Comment on lines +596 to +682
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Modularize the pagination navigation logic.

The navigation logic is complex and could benefit from being split into smaller, focused functions for better maintainability and testing.

Consider extracting these functionalities:

  1. URL collection logic
  2. Navigation handling
  3. URL construction for pagination

Example refactor for URL collection:

private async collectPageUrls(page: Page, config: any): Promise<string[]> {
  const pageUrls: string[] = [];
  const visitedUrls: Set<string> = new Set();
  
  while (true) {
    const currentUrl = page.url();
    if (visitedUrls.has(currentUrl)) {
      break;
    }
    
    pageUrls.push(currentUrl);
    visitedUrls.add(currentUrl);
    
    if (!await this.navigateToNextPage(page)) {
      break;
    }
  }
  
  return pageUrls;
}

} catch (error) {
console.error('Error collecting page URLs:', error);
}

console.log(`Collected ${pageUrls.length} unique page URLs`);


}

return this.handlePagination(page, config);
}

private async handlePagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) {
let allResults: Record<string, any>[] = [];
let previousHeight = 0;
Expand All @@ -556,6 +683,7 @@ export default class Interpreter extends EventEmitter {
await page.waitForTimeout(2000);

const currentHeight = await page.evaluate(() => document.body.scrollHeight);
console.log(`Current scroll height: ${currentHeight}`);
if (currentHeight === previousHeight) {
const finalResults = await page.evaluate((cfg) => window.scrapeList(cfg), config);
allResults = allResults.concat(finalResults);
Expand Down
59 changes: 59 additions & 0 deletions maxun-core/src/types/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
export interface WorkerConfig {
workerIndex: number;
startIndex: number;
endIndex: number;
batchSize: number;
pageUrls: string[];
listSelector: string;
fields: any;
pagination: {
type: string;
selector: string;
};
}

export interface SharedState {
totalScraped: number;
results: any[];
}
Comment on lines +15 to +18
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Thread safety and type safety concerns are valid issues that need addressing

The codebase shows:

  • Multiple workers concurrently access and modify SharedState without synchronization
  • results: any[] is used consistently, lacking type safety
  • No thread-safe mechanisms are implemented for shared state access

Suggested improvements:

  1. Implement proper synchronization using worker_threads features like SharedArrayBuffer or message-based state updates
  2. Add generic type parameter as suggested in the original review
  3. Consider using an intermediary class to manage thread-safe state access
🔗 Analysis chain

Consider thread safety and type safety for SharedState.

The shared state between workers needs careful consideration:

  1. The results array should have a more specific type than any[]
  2. Thread safety should be ensured when multiple workers access this state

Consider using a thread-safe data structure or implementing proper synchronization mechanisms. Also, define a generic type parameter:

-export interface SharedState {
+export interface SharedState<T = unknown> {
     totalScraped: number;
-    results: any[];
+    results: T[];
 }
🏁 Scripts executed

The following scripts were executed for the analysis:

Error: No shell scripts found


Script:

#!/bin/bash
# Find all files importing or using SharedState
rg "SharedState" -l

# Look for actual usage patterns of SharedState
rg "SharedState" -A 5 -B 5

# Check for any thread synchronization patterns
rg -i "sync|mutex|lock|atomic" maxun-core/src/

# Look for type definitions related to worker results
ast-grep --pattern 'type $_ = $_' maxun-core/src/types/

Length of output: 12076


export interface WorkerProgressData {
percentage: number;
currentUrl: string;
scrapedItems: number;
timeElapsed: number;
estimatedTimeRemaining: number;
failures: number;
performance: PerformanceMetrics;
}

export interface PerformanceMetrics {
startTime: number;
endTime: number;
duration: number;
pagesProcessed: number;
itemsScraped: number;
failedPages: number;
averageTimePerPage: number;
memoryUsage: {
heapUsed: number;
heapTotal: number;
external: number;
rss: number;
};
cpuUsage: {
user: number;
system: number;
};
}

export interface GlobalMetrics {
totalPagesProcessed: number;
totalItemsScraped: number;
totalFailures: number;
workersActive: number;
averageSpeed: number;
timeElapsed: number;
memoryUsage: NodeJS.MemoryUsage;
cpuUsage: NodeJS.CpuUsage;
}
66 changes: 66 additions & 0 deletions maxun-core/src/utils/kafka-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { Kafka, Consumer, Producer } from 'kafkajs';
import { kafkaConfig } from '../config/kafka';
import { EventEmitter } from 'events';

export class KafkaManager extends EventEmitter {
private kafka: Kafka;
private producer: Producer;
private consumer: Consumer;
private metricsInterval: NodeJS.Timeout | null = null;

constructor() {
super();
this.kafka = new Kafka({
clientId: kafkaConfig.clientId,
brokers: kafkaConfig.brokers
});

this.producer = this.kafka.producer();
this.consumer = this.kafka.consumer({
groupId: kafkaConfig.consumerGroup,
sessionTimeout: 30000
});
}

async initialize() {
await this.producer.connect();
await this.consumer.connect();
await this.createTopics();
this.startMetricsReporting();
}

private async createTopics() {
const admin = this.kafka.admin();
await admin.createTopics({
topics: [
{ topic: kafkaConfig.topics.SCRAPING_TASKS, numPartitions: 10 },
{ topic: kafkaConfig.topics.SCRAPING_RESULTS, numPartitions: 10 },
{ topic: kafkaConfig.topics.SCRAPING_DLQ, numPartitions: 1 }
]
});
await admin.disconnect();
}

private startMetricsReporting() {
this.metricsInterval = setInterval(async () => {
const admin = this.kafka.admin();
const metrics = await admin.fetchTopicMetadata({
topics: [
kafkaConfig.topics.SCRAPING_TASKS,
kafkaConfig.topics.SCRAPING_RESULTS
]
});

this.emit('metrics', metrics);
await admin.disconnect();
}, 5000);
Comment on lines +45 to +56
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling in startMetricsReporting

The asynchronous function inside setInterval may throw exceptions (e.g., network errors during fetchTopicMetadata). Unhandled exceptions can cause the interval to stop executing. Wrap the interval's logic in a try/catch block to ensure continuous metrics reporting.

Apply this diff to implement error handling:

     this.metricsInterval = setInterval(async () => {
+      try {
         const admin = this.admin;
         const metrics = await admin.fetchTopicMetadata({
           topics: [
             kafkaConfig.topics.SCRAPING_TASKS,
             kafkaConfig.topics.SCRAPING_RESULTS
           ]
         });
         this.emit('metrics', metrics);
+      } catch (error) {
+        // Handle errors, e.g., log the error
+        console.error('Error fetching Kafka metrics:', error);
+      }
       }, 5000);
     }

Committable suggestion skipped: line range outside the PR's diff.

}

async cleanup() {
if (this.metricsInterval) {
clearInterval(this.metricsInterval);
}
await this.producer.disconnect();
await this.consumer.disconnect();
}
}
Loading