-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: parallelization #352
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 16 commits
2599712
0c655fc
641e182
352447b
4ffbcba
4c0eb30
bd25950
8360d3d
38539d5
e0b52c1
b2e8332
7d0339a
c4c77e6
5c6f478
5becc84
5e8a6d1
73c2cc3
6984401
7105749
a931a13
5411484
7650794
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,10 @@ | ||||||||||
export const kafkaConfig = { | ||||||||||
clientId: 'maxun-scraper', | ||||||||||
brokers: ['localhost:29092'], | ||||||||||
topics: { | ||||||||||
SCRAPING_TASKS: 'scraping-tasks', | ||||||||||
SCRAPING_RESULTS: 'scraping-results', | ||||||||||
SCRAPING_DLQ: 'scraping-dlq' | ||||||||||
}, | ||||||||||
consumerGroup: 'scraping-group' | ||||||||||
}; | ||||||||||
Comment on lines
+9
to
+10
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add environment separation for consumer groups. The consumer group should be environment-aware to prevent cross-environment message processing. Apply this diff: - consumerGroup: 'scraping-group'
+ consumerGroup: `${process.env.ENV || 'dev'}-scraping-group` 📝 Committable suggestion
Suggested change
Comment on lines
+1
to
+10
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Remove unused Kafka configuration The Kafka configuration appears to be unused in the codebase. There are no imports of this configuration, no Kafka client instantiations, and no other Kafka-related code present. Consider removing this unused configuration to maintain a clean codebase. 🔗 Analysis chainVerify Kafka configuration in the codebase. Let's ensure the configuration is properly utilized across the codebase. 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Description: Check for proper usage of Kafka configuration and potential security issues
# Check for hardcoded Kafka configurations
echo "Checking for hardcoded Kafka configurations..."
rg -g '!*.{json,md}' -i 'localhost:29092'
# Check for proper import usage
echo "Checking kafka config imports..."
rg "from ['|\"].*config/kafka" -A 3
# Check for potential security-sensitive Kafka operations
echo "Checking for direct Kafka operations..."
ast-grep --pattern 'new Kafka($$$)'
Length of output: 451 Script: #!/bin/bash
# Search for Kafka-related code and configurations
# Check for usage of kafkaConfig
echo "Checking for kafkaConfig usage..."
rg "kafkaConfig"
# Check for usage of topic names
echo "Checking for topic name usage..."
rg "scraping-tasks|scraping-results|scraping-dlq"
# Check for any Kafka-related imports or code
echo "Checking for Kafka-related code..."
rg -i "kafka" -g "!*.{json,md,txt}" -A 3
Length of output: 844 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,8 @@ import Concurrency from './utils/concurrency'; | |
import Preprocessor from './preprocessor'; | ||
import log, { Level } from './utils/logger'; | ||
|
||
import os from 'os'; | ||
|
||
/** | ||
* Extending the Window interface for custom scraping functions. | ||
*/ | ||
|
@@ -451,7 +453,7 @@ export default class Interpreter extends EventEmitter { | |
const scrapeResults: Record<string, any>[] = await page.evaluate((cfg) => window.scrapeList(cfg), config); | ||
await this.options.serializableCallback(scrapeResults); | ||
} else { | ||
const scrapeResults: Record<string, any>[] = await this.handlePagination(page, config); | ||
const scrapeResults: Record<string, any>[] = await this.handleParallelPagination(page, config); | ||
await this.options.serializableCallback(scrapeResults); | ||
} | ||
}, | ||
|
@@ -540,6 +542,131 @@ export default class Interpreter extends EventEmitter { | |
} | ||
} | ||
|
||
private async handleParallelPagination(page: Page, config: any): Promise<any[]> { | ||
if (config.limit > 10000 && config.pagination.type === 'clickNext') { | ||
console.time('parallel-scraping'); | ||
|
||
const numWorkers = Math.max(1, Math.min(os.cpus().length - 1, 4)); | ||
const batchSize = Math.ceil(config.limit / numWorkers); | ||
const pageUrls: string[] = []; | ||
|
||
let workers: any = null; | ||
let availableSelectors = config.pagination.selector.split(','); | ||
let visitedUrls: string[] = []; | ||
|
||
const { itemsPerPage, estimatedPages } = await page.evaluate( | ||
({ listSelector, limit }) => { | ||
const items = document.querySelectorAll(listSelector).length; | ||
return { | ||
itemsPerPage: items, | ||
estimatedPages: Math.ceil(limit / items) | ||
}; | ||
}, | ||
{ listSelector: config.listSelector, limit: config.limit } | ||
); | ||
|
||
console.log(`Items per page: ${itemsPerPage}`); | ||
console.log(`Estimated pages needed: ${estimatedPages}`); | ||
|
||
try { | ||
while (true) { | ||
pageUrls.push(page.url()) | ||
|
||
if (pageUrls.length >= estimatedPages) { | ||
console.log('Reached estimated number of pages. Stopping pagination.'); | ||
break; | ||
} | ||
|
||
let checkButton = null; | ||
let workingSelector = null; | ||
|
||
for (let i = 0; i < availableSelectors.length; i++) { | ||
const selector = availableSelectors[i]; | ||
try { | ||
// Wait for selector with a short timeout | ||
checkButton = await page.waitForSelector(selector, { state: 'attached' }); | ||
if (checkButton) { | ||
workingSelector = selector; | ||
break; | ||
} | ||
} catch (error) { | ||
console.log(`Selector failed: ${selector}`); | ||
} | ||
} | ||
|
||
if(!workingSelector) { | ||
break; | ||
} | ||
|
||
const nextButton = await page.$(workingSelector); | ||
if (!nextButton) { | ||
break; | ||
} | ||
|
||
const selectorIndex = availableSelectors.indexOf(workingSelector!); | ||
availableSelectors = availableSelectors.slice(selectorIndex); | ||
|
||
const previousUrl = page.url(); | ||
visitedUrls.push(previousUrl); | ||
|
||
try { | ||
// Try both click methods simultaneously | ||
await Promise.race([ | ||
Promise.all([ | ||
page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), | ||
nextButton.click() | ||
]), | ||
Promise.all([ | ||
page.waitForNavigation({ waitUntil: 'networkidle', timeout: 30000 }), | ||
nextButton.dispatchEvent('click') | ||
]) | ||
]); | ||
} catch (error) { | ||
// Verify if navigation actually succeeded | ||
const currentUrl = page.url(); | ||
if (currentUrl === previousUrl) { | ||
console.log("Previous URL same as current URL. Navigation failed."); | ||
} | ||
} | ||
|
||
const currentUrl = page.url(); | ||
if (visitedUrls.includes(currentUrl)) { | ||
console.log(`Detected navigation to a previously visited URL: ${currentUrl}`); | ||
|
||
// Extract the current page number from the URL | ||
const match = currentUrl.match(/\d+/); | ||
if (match) { | ||
const currentNumber = match[0]; | ||
// Use visitedUrls.length + 1 as the next page number | ||
const nextNumber = visitedUrls.length + 1; | ||
|
||
// Create new URL by replacing the current number with the next number | ||
const nextUrl = currentUrl.replace(currentNumber, nextNumber.toString()); | ||
|
||
console.log(`Navigating to constructed URL: ${nextUrl}`); | ||
|
||
// Navigate to the next page | ||
await Promise.all([ | ||
page.waitForNavigation({ waitUntil: 'networkidle' }), | ||
page.goto(nextUrl) | ||
]); | ||
} | ||
} | ||
|
||
await page.waitForTimeout(1000); | ||
} | ||
Comment on lines
+596
to
+682
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Modularize the pagination navigation logic. The navigation logic is complex and could benefit from being split into smaller, focused functions for better maintainability and testing. Consider extracting these functionalities:
Example refactor for URL collection: private async collectPageUrls(page: Page, config: any): Promise<string[]> {
const pageUrls: string[] = [];
const visitedUrls: Set<string> = new Set();
while (true) {
const currentUrl = page.url();
if (visitedUrls.has(currentUrl)) {
break;
}
pageUrls.push(currentUrl);
visitedUrls.add(currentUrl);
if (!await this.navigateToNextPage(page)) {
break;
}
}
return pageUrls;
} |
||
} catch (error) { | ||
console.error('Error collecting page URLs:', error); | ||
} | ||
|
||
console.log(`Collected ${pageUrls.length} unique page URLs`); | ||
|
||
|
||
} | ||
|
||
return this.handlePagination(page, config); | ||
} | ||
|
||
private async handlePagination(page: Page, config: { listSelector: string, fields: any, limit?: number, pagination: any }) { | ||
let allResults: Record<string, any>[] = []; | ||
let previousHeight = 0; | ||
|
@@ -556,6 +683,7 @@ export default class Interpreter extends EventEmitter { | |
await page.waitForTimeout(2000); | ||
|
||
const currentHeight = await page.evaluate(() => document.body.scrollHeight); | ||
console.log(`Current scroll height: ${currentHeight}`); | ||
if (currentHeight === previousHeight) { | ||
const finalResults = await page.evaluate((cfg) => window.scrapeList(cfg), config); | ||
allResults = allResults.concat(finalResults); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
export interface WorkerConfig { | ||
workerIndex: number; | ||
startIndex: number; | ||
endIndex: number; | ||
batchSize: number; | ||
pageUrls: string[]; | ||
listSelector: string; | ||
fields: any; | ||
pagination: { | ||
type: string; | ||
selector: string; | ||
}; | ||
} | ||
|
||
export interface SharedState { | ||
totalScraped: number; | ||
results: any[]; | ||
} | ||
Comment on lines
+15
to
+18
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification Thread safety and type safety concerns are valid issues that need addressing The codebase shows:
Suggested improvements:
🔗 Analysis chainConsider thread safety and type safety for SharedState. The shared state between workers needs careful consideration:
Consider using a thread-safe data structure or implementing proper synchronization mechanisms. Also, define a generic type parameter: -export interface SharedState {
+export interface SharedState<T = unknown> {
totalScraped: number;
- results: any[];
+ results: T[];
} 🏁 Scripts executedThe following scripts were executed for the analysis: Error: No shell scripts found Script: #!/bin/bash
# Find all files importing or using SharedState
rg "SharedState" -l
# Look for actual usage patterns of SharedState
rg "SharedState" -A 5 -B 5
# Check for any thread synchronization patterns
rg -i "sync|mutex|lock|atomic" maxun-core/src/
# Look for type definitions related to worker results
ast-grep --pattern 'type $_ = $_' maxun-core/src/types/
Length of output: 12076 |
||
|
||
export interface WorkerProgressData { | ||
percentage: number; | ||
currentUrl: string; | ||
scrapedItems: number; | ||
timeElapsed: number; | ||
estimatedTimeRemaining: number; | ||
failures: number; | ||
performance: PerformanceMetrics; | ||
} | ||
|
||
export interface PerformanceMetrics { | ||
startTime: number; | ||
endTime: number; | ||
duration: number; | ||
pagesProcessed: number; | ||
itemsScraped: number; | ||
failedPages: number; | ||
averageTimePerPage: number; | ||
memoryUsage: { | ||
heapUsed: number; | ||
heapTotal: number; | ||
external: number; | ||
rss: number; | ||
}; | ||
cpuUsage: { | ||
user: number; | ||
system: number; | ||
}; | ||
} | ||
|
||
export interface GlobalMetrics { | ||
totalPagesProcessed: number; | ||
totalItemsScraped: number; | ||
totalFailures: number; | ||
workersActive: number; | ||
averageSpeed: number; | ||
timeElapsed: number; | ||
memoryUsage: NodeJS.MemoryUsage; | ||
cpuUsage: NodeJS.CpuUsage; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import { Kafka, Consumer, Producer } from 'kafkajs'; | ||
import { kafkaConfig } from '../config/kafka'; | ||
import { EventEmitter } from 'events'; | ||
|
||
export class KafkaManager extends EventEmitter { | ||
private kafka: Kafka; | ||
private producer: Producer; | ||
private consumer: Consumer; | ||
private metricsInterval: NodeJS.Timeout | null = null; | ||
|
||
constructor() { | ||
super(); | ||
this.kafka = new Kafka({ | ||
clientId: kafkaConfig.clientId, | ||
brokers: kafkaConfig.brokers | ||
}); | ||
|
||
this.producer = this.kafka.producer(); | ||
this.consumer = this.kafka.consumer({ | ||
groupId: kafkaConfig.consumerGroup, | ||
sessionTimeout: 30000 | ||
}); | ||
} | ||
|
||
async initialize() { | ||
await this.producer.connect(); | ||
await this.consumer.connect(); | ||
await this.createTopics(); | ||
this.startMetricsReporting(); | ||
} | ||
|
||
private async createTopics() { | ||
const admin = this.kafka.admin(); | ||
await admin.createTopics({ | ||
topics: [ | ||
{ topic: kafkaConfig.topics.SCRAPING_TASKS, numPartitions: 10 }, | ||
{ topic: kafkaConfig.topics.SCRAPING_RESULTS, numPartitions: 10 }, | ||
{ topic: kafkaConfig.topics.SCRAPING_DLQ, numPartitions: 1 } | ||
] | ||
}); | ||
await admin.disconnect(); | ||
} | ||
|
||
private startMetricsReporting() { | ||
this.metricsInterval = setInterval(async () => { | ||
const admin = this.kafka.admin(); | ||
const metrics = await admin.fetchTopicMetadata({ | ||
topics: [ | ||
kafkaConfig.topics.SCRAPING_TASKS, | ||
kafkaConfig.topics.SCRAPING_RESULTS | ||
] | ||
}); | ||
|
||
this.emit('metrics', metrics); | ||
await admin.disconnect(); | ||
}, 5000); | ||
Comment on lines
+45
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add error handling in The asynchronous function inside Apply this diff to implement error handling: this.metricsInterval = setInterval(async () => {
+ try {
const admin = this.admin;
const metrics = await admin.fetchTopicMetadata({
topics: [
kafkaConfig.topics.SCRAPING_TASKS,
kafkaConfig.topics.SCRAPING_RESULTS
]
});
this.emit('metrics', metrics);
+ } catch (error) {
+ // Handle errors, e.g., log the error
+ console.error('Error fetching Kafka metrics:', error);
+ }
}, 5000);
}
|
||
} | ||
|
||
async cleanup() { | ||
if (this.metricsInterval) { | ||
clearInterval(this.metricsInterval); | ||
} | ||
await this.producer.disconnect(); | ||
await this.consumer.disconnect(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use environment variables for Kafka configuration.
The broker address and client ID are hardcoded, which limits deployment flexibility and poses security risks. Consider using environment variables for configuration.
Apply this diff to make the configuration more flexible:
Also, consider adding SSL/authentication configuration for production environments: