11import type { FastifyBaseLogger } from 'fastify' ;
22import type { AnyDatabase } from '../db' ;
33import { JobQueueService } from './jobQueueService' ;
4+ import { mcpServers } from '../db/schema.sqlite' ;
5+ import { inArray } from 'drizzle-orm' ;
46
57/**
68 * Sync configuration options
@@ -64,18 +66,22 @@ export class RegistrySyncService {
6466 } , 'Starting MCP Registry sync via job queue' ) ;
6567
6668 try {
67- // Fetch all servers from official registry
68- const allServers = await this . fetchAllServersFromRegistry ( finalConfig . maxServers ) ;
69+ // Fetch servers from official registry with smart pagination
70+ // If skipExisting + maxServers, fetch page-by-page and filter until we have enough
71+ const serversToSync = await this . fetchNewServersFromRegistry (
72+ finalConfig . maxServers ,
73+ finalConfig . skipExisting
74+ ) ;
6975
7076 this . logger . info ( {
71- totalServers : allServers . length ,
77+ totalNewServers : serversToSync . length ,
7278 maxServers : finalConfig . maxServers ,
73- } , 'Fetched servers from official registry' ) ;
79+ } , 'Collected new servers from official registry' ) ;
7480
7581 // Create job batch for tracking
7682 const batch = await this . jobQueueService . createBatch (
7783 'mcp_registry_sync' ,
78- allServers . length ,
84+ serversToSync . length ,
7985 {
8086 syncedBy,
8187 config : finalConfig ,
@@ -85,8 +91,8 @@ export class RegistrySyncService {
8591
8692 // Create individual jobs for each server
8793 let jobsCreated = 0 ;
88- for ( let i = 0 ; i < allServers . length ; i ++ ) {
89- const server = allServers [ i ] ;
94+ for ( let i = 0 ; i < serversToSync . length ; i ++ ) {
95+ const server = serversToSync [ i ] ;
9096 const scheduledFor = new Date ( Date . now ( ) + ( i * finalConfig . rateLimitDelay * 1000 ) ) ;
9197
9298 await this . jobQueueService . createJob (
@@ -101,7 +107,7 @@ export class RegistrySyncService {
101107 batchInfo : {
102108 batchId : batch . id ,
103109 serverIndex : i + 1 ,
104- totalServers : allServers . length ,
110+ totalServers : serversToSync . length ,
105111 } ,
106112 } ,
107113 {
@@ -114,21 +120,21 @@ export class RegistrySyncService {
114120 }
115121
116122 const estimatedCompletion = new Date (
117- Date . now ( ) + ( allServers . length * finalConfig . rateLimitDelay * 1000 )
123+ Date . now ( ) + ( serversToSync . length * finalConfig . rateLimitDelay * 1000 )
118124 ) ;
119125
120126 this . logger . info ( {
121127 operation : 'mcp_registry_sync_jobs_created' ,
122128 batchId : batch . id ,
123- totalServers : allServers . length ,
129+ totalServers : serversToSync . length ,
124130 jobsCreated,
125131 rateLimitDelay : finalConfig . rateLimitDelay ,
126132 estimatedCompletion : estimatedCompletion . toISOString ( ) ,
127133 } , 'Created MCP server sync jobs' ) ;
128134
129135 return {
130136 batchId : batch . id ,
131- totalServers : allServers . length ,
137+ totalServers : serversToSync . length ,
132138 jobsCreated,
133139 startTime : new Date ( ) ,
134140 estimatedCompletion,
@@ -141,7 +147,183 @@ export class RegistrySyncService {
141147 }
142148
143149 /**
144- * Fetch all servers from the official registry
150+ * Filter out servers that already exist in the database
151+ * Queries the database for existing servers by official_name
152+ * and returns only servers that don't exist yet
153+ */
154+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
155+ private async filterExistingServers ( servers : any [ ] ) : Promise < any [ ] > {
156+ if ( servers . length === 0 ) return servers ;
157+
158+ // Extract official names from servers
159+ const officialNames = servers
160+ . map ( s => s . name ) // The 'name' field contains the official reverse-DNS name
161+ . filter ( Boolean ) ;
162+
163+ if ( officialNames . length === 0 ) {
164+ this . logger . warn ( 'No official names found in servers, cannot filter existing servers' ) ;
165+ return servers ;
166+ }
167+
168+ this . logger . debug ( {
169+ totalServers : servers . length ,
170+ officialNamesToCheck : officialNames . length ,
171+ } , 'Checking for existing servers in database by official_name' ) ;
172+
173+ try {
174+ // Query database for existing servers by official_name
175+ const existing = await this . db
176+ . select ( { official_name : mcpServers . official_name } )
177+ . from ( mcpServers )
178+ . where ( inArray ( mcpServers . official_name , officialNames ) ) ;
179+
180+ // Build Set of existing names for fast lookup
181+ const existingNames = new Set (
182+ existing
183+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
184+ . map ( ( s : any ) => s . official_name )
185+ . filter ( Boolean )
186+ ) ;
187+
188+ this . logger . debug ( {
189+ existingCount : existingNames . size ,
190+ } , 'Found existing servers in database' ) ;
191+
192+ // Return only servers that don't exist
193+ return servers . filter ( s => {
194+ const officialName = s . name ;
195+ return officialName && ! existingNames . has ( officialName ) ;
196+ } ) ;
197+
198+ } catch ( error ) {
199+ this . logger . error ( { error } , 'Failed to query existing servers, continuing without filtering' ) ;
200+ return servers ;
201+ }
202+ }
203+
204+ /**
205+ * Fetch new servers from official registry with smart pagination
206+ *
207+ * Strategy:
208+ * 1. Fetch page 1 (50 servers)
209+ * 2. Query DB for those 50 names only
210+ * 3. Filter → get X new servers
211+ * 4. If X >= maxServers → stop and return first maxServers
212+ * 5. If X < maxServers → fetch page 2, repeat
213+ *
214+ * This ensures we only fetch what we need and don't overload the database.
215+ */
216+
217+ private async fetchNewServersFromRegistry (
218+ maxServers : number | null ,
219+ skipExisting : boolean
220+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
221+ ) : Promise < any [ ] > {
222+ // If not skipping existing, use the old method (fetch all then filter)
223+ if ( ! skipExisting ) {
224+ return this . fetchAllServersFromRegistry ( maxServers ) ;
225+ }
226+
227+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
228+ const accumulatedNewServers : any [ ] = [ ] ;
229+ let cursor : string | undefined ;
230+ let hasMore = true ;
231+ let pageNumber = 0 ;
232+
233+ this . logger . debug ( {
234+ maxServers,
235+ skipExisting,
236+ operation : 'smart_pagination_start'
237+ } , 'Starting smart page-by-page fetch with filtering' ) ;
238+
239+ while ( hasMore ) {
240+ pageNumber ++ ;
241+
242+ // Check if we have enough new servers
243+ if ( maxServers && accumulatedNewServers . length >= maxServers ) {
244+ this . logger . debug ( {
245+ pageNumber,
246+ accumulatedCount : accumulatedNewServers . length ,
247+ maxServers,
248+ operation : 'smart_pagination_complete'
249+ } , 'Collected enough new servers, stopping pagination' ) ;
250+ break ;
251+ }
252+
253+ // Fetch one page (50 servers)
254+ const batchResult = await this . fetchServersBatch ( cursor , 50 ) ;
255+
256+ if ( ! batchResult . success ) {
257+ throw new Error ( `Failed to fetch servers: ${ batchResult . error } ` ) ;
258+ }
259+
260+ const { servers, metadata } = batchResult . data ;
261+
262+ // Extract server data and filter for isLatest === true (CRITICAL: keep this filter!)
263+ const serverData = servers
264+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
265+ . filter ( ( item : any ) => {
266+ const meta = item . _meta ?. [ 'io.modelcontextprotocol.registry/official' ] ;
267+ return meta ?. isLatest === true ; // KEEP THIS AT ALL COSTS
268+ } )
269+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
270+ . map ( ( item : any ) => {
271+ const server = item . server || item ;
272+ return {
273+ ...server ,
274+ _meta : item . _meta
275+ } ;
276+ } ) ;
277+
278+ this . logger . debug ( {
279+ pageNumber,
280+ rawBatchSize : servers . length ,
281+ afterIsLatestFilter : serverData . length ,
282+ cursor,
283+ } , 'Fetched and filtered page from official registry' ) ;
284+
285+ // Filter this page against database (only check these 50 servers)
286+ const newServersInPage = await this . filterExistingServers ( serverData ) ;
287+
288+ this . logger . debug ( {
289+ pageNumber,
290+ serversInPage : serverData . length ,
291+ newInPage : newServersInPage . length ,
292+ existingInPage : serverData . length - newServersInPage . length ,
293+ accumulatedSoFar : accumulatedNewServers . length ,
294+ } , 'Filtered page against database' ) ;
295+
296+ // Add new servers from this page to accumulated list
297+ accumulatedNewServers . push ( ...newServersInPage ) ;
298+
299+ // Check for next page
300+ cursor = metadata . next_cursor ;
301+ hasMore = ! ! cursor ;
302+
303+ // Rate limiting for registry API
304+ if ( hasMore ) {
305+ await new Promise ( resolve => setTimeout ( resolve , 1000 ) ) ;
306+ }
307+ }
308+
309+ // Return up to maxServers (or all if maxServers is null)
310+ const result = maxServers
311+ ? accumulatedNewServers . slice ( 0 , maxServers )
312+ : accumulatedNewServers ;
313+
314+ this . logger . info ( {
315+ totalPagesChecked : pageNumber ,
316+ totalNewServersFound : accumulatedNewServers . length ,
317+ returningCount : result . length ,
318+ maxServers,
319+ operation : 'smart_pagination_result'
320+ } , 'Smart pagination completed' ) ;
321+
322+ return result ;
323+ }
324+
325+ /**
326+ * Fetch all servers from the official registry (fallback method)
145327 */
146328 // eslint-disable-next-line @typescript-eslint/no-explicit-any
147329 private async fetchAllServersFromRegistry ( maxServers : number | null ) : Promise < any [ ] > {
@@ -171,10 +353,12 @@ export class RegistrySyncService {
171353 // We need to attach _meta to the server object so it's available for metadata extraction
172354 // FILTER: Only include servers where _meta.io.modelcontextprotocol.registry/official.isLatest === true
173355 const serverData = servers
356+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
174357 . filter ( ( item : any ) => {
175358 const meta = item . _meta ?. [ 'io.modelcontextprotocol.registry/official' ] ;
176359 return meta ?. isLatest === true ;
177360 } )
361+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
178362 . map ( ( item : any ) => {
179363 const server = item . server || item ;
180364 // Attach _meta to the server object for later extraction
0 commit comments