From ba2c893f394efab007035d28ad5b4e8ed4b3117a Mon Sep 17 00:00:00 2001 From: hhftechnologies Date: Mon, 19 May 2025 18:02:40 +0530 Subject: [PATCH 01/10] Update plugins.json --- plugin/plugins.json | 343 ++------------------------------------------ 1 file changed, 9 insertions(+), 334 deletions(-) diff --git a/plugin/plugins.json b/plugin/plugins.json index acece569e..c31a56a31 100644 --- a/plugin/plugins.json +++ b/plugin/plugins.json @@ -26,341 +26,16 @@ "docs": "https://plugins.traefik.io/plugins/623c53a712c0b093a60f441a/geo-block" }, { - "displayName": "Statiq", + "displayName": "Fail2Ban", "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", + "iconPath": ".assets/f2b.png", + "import": "github.com/tomMoulard/fail2ban", + "summary": "Fail2ban for Traefik", + "author": "tomMoulard", + "version": "v0.8.5", "tested_with": "Traefik v2.9+, v3.0+", - "stars": 2, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "BlockPath", - "type": "middleware", - "iconPath": ".assets/blockpath/icon.svg", - "import": "github.com/traefik/plugin-blockpath", - "summary": "Blocks requests based on path, including regex and case insensitivity.", - "author": "Traefik Labs", - "version": "v0.2.0", - "tested_with": "Traefik v2.x", - "stars": 50, - "homepage": "https://github.com/traefik/plugin-blockpath", - "docs": "https://plugins.traefik.io/plugins/623c53b312c0b093a60f45ac/block-path" - }, - { - "displayName": "RequestLogger", - "type": "middleware", - "iconPath": "", - "import": "github.com/acouvreur/traefik-plugin-request-logger", - "summary": "Logs incoming requests with configurable format.", - "author": "Antoine Couvreur", - "version": "v1.3.0", - "tested_with": "Traefik v2.x", - "stars": 70, - "homepage": "https://github.com/acouvreur/traefik-plugin-request-logger", - "docs": "https://plugins.traefik.io/plugins/623c53a712c0b093a60f440c/request-logger" - }, - { - "displayName": "Cloudflare Warp", - "type": "middleware", - "iconPath": ".assets/cloudflarewarp/icon.png", - "import": "github.com/BetterCorp/cloudflarewarp", - "summary": "Authenticate requests through Cloudflare Warp.", - "author": "BetterCorp", - "version": "v0.1.2", - "tested_with": "Traefik v2.x", - "homepage": "https://github.com/BetterCorp/cloudflarewarp", - "docs": "https://plugins.traefik.io/plugins/62830a3ea380546bcf5399e7/cloudflare-warp-authentication" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" - }, - { - "displayName": "Statiq", - "type": "middleware", - "iconPath": ".assets/statiq/icon.png", - "import": "github.com/hhftechnology/statiq", - "summary": "A feature-rich static file server inside Traefik.", - "author": "HHF Technology", - "version": "v0.3.0", - "tested_with": "Traefik v2.9+, v3.0+", - "stars": 150, - "homepage": "https://github.com/hhftechnology/statiq", - "docs": "https://github.com/hhftechnology/statiq#readme" + "stars": 225, + "homepage": "https://github.com/tomMoulard/fail2ban", + "docs": "https://github.com/tomMoulard/fail2ban#readme" } - ] From 04dc0e88ee852df5347938d838998ce7a1854935 Mon Sep 17 00:00:00 2001 From: hhftechnologies Date: Mon, 19 May 2025 21:38:38 +0530 Subject: [PATCH 02/10] Update config_generator.go --- services/config_generator.go | 64 ++++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/services/config_generator.go b/services/config_generator.go index 72a6d7b47..e46331362 100644 --- a/services/config_generator.go +++ b/services/config_generator.go @@ -369,11 +369,14 @@ func (cg *ConfigGenerator) processResourcesWithServices(config *TraefikConfig) e var serviceReference string if mapValueDataEntry.CustomServiceID.Valid && mapValueDataEntry.CustomServiceID.String != "" { - // If the custom service ID already contains a provider, preserve it - if strings.Contains(mapValueDataEntry.CustomServiceID.String, "@") { - serviceReference = mapValueDataEntry.CustomServiceID.String + customServiceID := mapValueDataEntry.CustomServiceID.String + // Check if custom service ID already has a provider suffix + if strings.Contains(customServiceID, "@") { + // Already has a provider, use as is + serviceReference = customServiceID } else { - serviceReference = fmt.Sprintf("%s@file", mapValueDataEntry.CustomServiceID.String) + // Add the file provider + serviceReference = fmt.Sprintf("%s@file", customServiceID) } } else { // For Docker environments when using Traefik API, prefer docker provider @@ -384,11 +387,17 @@ func (cg *ConfigGenerator) processResourcesWithServices(config *TraefikConfig) e providerSuffix = "http" } - // If service ID already has a provider suffix, preserve it - if strings.Contains(info.ServiceID, "@") { + // FIX: Check properly for existing suffixes + suffix := fmt.Sprintf("@%s", providerSuffix) + if strings.HasSuffix(info.ServiceID, suffix) { + // Already has the correct suffix, use as is + serviceReference = info.ServiceID + } else if strings.Contains(info.ServiceID, "@") { + // Has a different provider suffix, use as is serviceReference = info.ServiceID } else { - serviceReference = fmt.Sprintf("%s@%s", info.ServiceID, providerSuffix) + // No provider suffix, add one + serviceReference = fmt.Sprintf("%s%s", info.ServiceID, suffix) } } @@ -429,7 +438,6 @@ func (cg *ConfigGenerator) processResourcesWithServices(config *TraefikConfig) e return nil } - func (cg *ConfigGenerator) processTCPRouters(config *TraefikConfig) error { activeDSConfig, err := cg.configManager.GetActiveDataSourceConfig() if err != nil { @@ -476,27 +484,41 @@ func (cg *ConfigGenerator) processTCPRouters(config *TraefikConfig) error { var tcpServiceReference string if customServiceID.Valid && customServiceID.String != "" { - tcpServiceReference = fmt.Sprintf("%s@file", customServiceID.String) + customSvc := customServiceID.String + // Check if custom service ID already has a provider suffix + if strings.Contains(customSvc, "@") { + // Already has a provider, use as is + tcpServiceReference = customSvc + } else { + // Add the file provider + tcpServiceReference = fmt.Sprintf("%s@file", customSvc) + } } else { - providerSuffix := "http" // Default, implies the HTTP service definition might be used or Traefik handles internally + providerSuffix := "http" // Default provider + + // Use appropriate provider based on data source if activeDSConfig.Type == models.TraefikAPI { if models.DataSourceType(sourceType) == models.TraefikAPI { - // For TCP services linked to Docker, Traefik often resolves service by name from Docker provider - if !strings.Contains(serviceID, "@") { // Only add @docker if no provider specified - providerSuffix = "docker" - } else { - providerSuffix = "" // Keep existing provider - } + providerSuffix = "docker" } } - if providerSuffix != "" && !strings.Contains(serviceID, "@") { - tcpServiceReference = fmt.Sprintf("%s@%s", serviceID, providerSuffix) + + // FIX: Check properly for existing suffixes + suffix := fmt.Sprintf("@%s", providerSuffix) + if strings.HasSuffix(serviceID, suffix) { + // Already has the correct suffix, use as is + tcpServiceReference = serviceID + } else if strings.Contains(serviceID, "@") { + // Has a different provider suffix, use as is + tcpServiceReference = serviceID } else { - tcpServiceReference = serviceID // Use as-is + // No provider suffix, add one + tcpServiceReference = fmt.Sprintf("%s%s", serviceID, suffix) } } - log.Printf("Resource %s (TCP): Router service set to %s. (SourceType: %s, ActiveDS: %s, CustomSvc: %s)", id, tcpServiceReference, sourceType, activeDSConfig.Type, customServiceID.String) - + + log.Printf("Resource %s (TCP): Router service set to %s. (SourceType: %s, ActiveDS: %s, CustomSvc: %s)", + id, tcpServiceReference, sourceType, activeDSConfig.Type, customServiceID.String) tcpRouterID := fmt.Sprintf("%s-tcp", id) config.TCP.Routers[tcpRouterID] = map[string]interface{}{ From 301f266f7d515aa5912c02e6d5153b34d59a3aa3 Mon Sep 17 00:00:00 2001 From: hhftechnologies Date: Mon, 19 May 2025 22:10:46 +0530 Subject: [PATCH 03/10] Update config_generator.go --- services/config_generator.go | 569 +++++++++++++++++------------------ 1 file changed, 281 insertions(+), 288 deletions(-) diff --git a/services/config_generator.go b/services/config_generator.go index e46331362..fe13c0fa0 100644 --- a/services/config_generator.go +++ b/services/config_generator.go @@ -230,306 +230,299 @@ func (cg *ConfigGenerator) processServices(config *TraefikConfig) error { // In services/config_generator.go // processResourcesWithServices processes resources with their assigned services +// Helper function to extract the base name without provider suffixes +func extractBaseName(id string) string { + // If the ID contains @ character, extract the part before it + if idx := strings.Index(id, "@"); idx > 0 { + return id[:idx] + } + return id +} + // processResourcesWithServices processes resources with their assigned services func (cg *ConfigGenerator) processResourcesWithServices(config *TraefikConfig) error { - activeDSConfig, err := cg.configManager.GetActiveDataSourceConfig() - if err != nil { - log.Printf("Warning: Could not get active data source config in ConfigGenerator: %v. Defaulting to Pangolin logic.", err) - activeDSConfig.Type = models.PangolinAPI - } - - query := ` - SELECT r.id, r.host, r.service_id, r.entrypoints, r.tls_domains, - r.custom_headers, r.router_priority, r.source_type, - rm.middleware_id, rm.priority, - rs.service_id as custom_service_id - FROM resources r - LEFT JOIN resource_middlewares rm ON r.id = rm.resource_id - LEFT JOIN resource_services rs ON r.id = rs.resource_id - WHERE r.status = 'active' - ORDER BY r.id, rm.priority DESC - ` - rows, err := cg.db.Query(query) - if err != nil { - return fmt.Errorf("failed to fetch resources for HTTP routers: %w", err) - } - defer rows.Close() - - type resourceProcessedData struct { - Info models.Resource - Middlewares []MiddlewareWithPriority - CustomServiceID sql.NullString - } - resourceDataMap := make(map[string]resourceProcessedData) - - for rows.Next() { - var rID_db, host_db, serviceID_db, entrypoints_db, tlsDomains_db, customHeadersStr_db, sourceType_db string - var routerPriority_db sql.NullInt64 - var middlewareID_db sql.NullString - var middlewarePriority_db sql.NullInt64 - var customServiceID_db sql.NullString - - err := rows.Scan( - &rID_db, &host_db, &serviceID_db, &entrypoints_db, &tlsDomains_db, - &customHeadersStr_db, &routerPriority_db, &sourceType_db, - &middlewareID_db, &middlewarePriority_db, &customServiceID_db, - ) - if err != nil { - log.Printf("Failed to scan resource data for HTTP router: %v", err) - continue - } - - data, exists := resourceDataMap[rID_db] - if !exists { - data.Info = models.Resource{ - ID: rID_db, - Host: host_db, - ServiceID: serviceID_db, - Entrypoints: entrypoints_db, - TLSDomains: tlsDomains_db, - CustomHeaders: customHeadersStr_db, - SourceType: sourceType_db, - } - if routerPriority_db.Valid { - data.Info.RouterPriority = int(routerPriority_db.Int64) - } else { - data.Info.RouterPriority = 100 // Default - } - data.CustomServiceID = customServiceID_db - } - - if middlewareID_db.Valid { - mwPriority := 100 - if middlewarePriority_db.Valid { - mwPriority = int(middlewarePriority_db.Int64) - } - data.Middlewares = append(data.Middlewares, MiddlewareWithPriority{ - ID: middlewareID_db.String, - Priority: mwPriority, - }) - } - resourceDataMap[rID_db] = data - } - if err = rows.Err(); err != nil { - return fmt.Errorf("error iterating resource rows for HTTP: %w", err) - } - - for _, mapValueDataEntry := range resourceDataMap { - info := mapValueDataEntry.Info - assignedMiddlewares := mapValueDataEntry.Middlewares - - sort.SliceStable(assignedMiddlewares, func(i, j int) bool { - return assignedMiddlewares[i].Priority > assignedMiddlewares[j].Priority - }) + activeDSConfig, err := cg.configManager.GetActiveDataSourceConfig() + if err != nil { + log.Printf("Warning: Could not get active data source config in ConfigGenerator: %v. Defaulting to Pangolin logic.", err) + activeDSConfig.Type = models.PangolinAPI + } + + query := ` + SELECT r.id, r.host, r.service_id, r.entrypoints, r.tls_domains, + r.custom_headers, r.router_priority, r.source_type, + rm.middleware_id, rm.priority, + rs.service_id as custom_service_id + FROM resources r + LEFT JOIN resource_middlewares rm ON r.id = rm.resource_id + LEFT JOIN resource_services rs ON r.id = rs.resource_id + WHERE r.status = 'active' + ORDER BY r.id, rm.priority DESC + ` + rows, err := cg.db.Query(query) + if err != nil { + return fmt.Errorf("failed to fetch resources for HTTP routers: %w", err) + } + defer rows.Close() + + type resourceProcessedData struct { + Info models.Resource + Middlewares []MiddlewareWithPriority + CustomServiceID sql.NullString + } + resourceDataMap := make(map[string]resourceProcessedData) + + for rows.Next() { + var rID_db, host_db, serviceID_db, entrypoints_db, tlsDomains_db, customHeadersStr_db, sourceType_db string + var routerPriority_db sql.NullInt64 + var middlewareID_db sql.NullString + var middlewarePriority_db sql.NullInt64 + var customServiceID_db sql.NullString + + err := rows.Scan( + &rID_db, &host_db, &serviceID_db, &entrypoints_db, &tlsDomains_db, + &customHeadersStr_db, &routerPriority_db, &sourceType_db, + &middlewareID_db, &middlewarePriority_db, &customServiceID_db, + ) + if err != nil { + log.Printf("Failed to scan resource data for HTTP router: %v", err) + continue + } + + data, exists := resourceDataMap[rID_db] + if !exists { + data.Info = models.Resource{ + ID: rID_db, + Host: host_db, + ServiceID: serviceID_db, + Entrypoints: entrypoints_db, + TLSDomains: tlsDomains_db, + CustomHeaders: customHeadersStr_db, + SourceType: sourceType_db, + } + if routerPriority_db.Valid { + data.Info.RouterPriority = int(routerPriority_db.Int64) + } else { + data.Info.RouterPriority = 100 // Default + } + data.CustomServiceID = customServiceID_db + } - routerEntryPoints := strings.Split(strings.TrimSpace(info.Entrypoints), ",") - if len(routerEntryPoints) == 0 || (len(routerEntryPoints) == 1 && routerEntryPoints[0] == "") { - routerEntryPoints = []string{"websecure"} - } + if middlewareID_db.Valid { + mwPriority := 100 + if middlewarePriority_db.Valid { + mwPriority = int(middlewarePriority_db.Int64) + } + data.Middlewares = append(data.Middlewares, MiddlewareWithPriority{ + ID: middlewareID_db.String, + Priority: mwPriority, + }) + } + resourceDataMap[rID_db] = data + } + if err = rows.Err(); err != nil { + return fmt.Errorf("error iterating resource rows for HTTP: %w", err) + } + + for _, mapValueDataEntry := range resourceDataMap { + info := mapValueDataEntry.Info + assignedMiddlewares := mapValueDataEntry.Middlewares + + sort.SliceStable(assignedMiddlewares, func(i, j int) bool { + return assignedMiddlewares[i].Priority > assignedMiddlewares[j].Priority + }) + + routerEntryPoints := strings.Split(strings.TrimSpace(info.Entrypoints), ",") + if len(routerEntryPoints) == 0 || (len(routerEntryPoints) == 1 && routerEntryPoints[0] == "") { + routerEntryPoints = []string{"websecure"} + } - var customHeadersMiddlewareID string - if info.CustomHeaders != "" && info.CustomHeaders != "{}" && info.CustomHeaders != "null" { - var headersMap map[string]string - if err := json.Unmarshal([]byte(info.CustomHeaders), &headersMap); err == nil && len(headersMap) > 0 { - middlewareName := fmt.Sprintf("%s-customheaders", info.ID) - customRequestHeadersMap := make(map[string]string) - for k,v := range headersMap { - customRequestHeadersMap[k] = v - } - config.HTTP.Middlewares[middlewareName] = map[string]interface{}{ - "headers": map[string]interface{}{"customRequestHeaders": customRequestHeadersMap}, - } - customHeadersMiddlewareID = fmt.Sprintf("%s@file", middlewareName) - } else if err != nil { - log.Printf("Failed to parse custom headers for resource %s: %v. Headers: %s", info.ID, err, info.CustomHeaders) - } - } + var customHeadersMiddlewareID string + if info.CustomHeaders != "" && info.CustomHeaders != "{}" && info.CustomHeaders != "null" { + var headersMap map[string]string + if err := json.Unmarshal([]byte(info.CustomHeaders), &headersMap); err == nil && len(headersMap) > 0 { + middlewareName := fmt.Sprintf("%s-customheaders", info.ID) + customRequestHeadersMap := make(map[string]string) + for k,v := range headersMap { + customRequestHeadersMap[k] = v + } + config.HTTP.Middlewares[middlewareName] = map[string]interface{}{ + "headers": map[string]interface{}{"customRequestHeaders": customRequestHeadersMap}, + } + customHeadersMiddlewareID = fmt.Sprintf("%s@file", middlewareName) + } else if err != nil { + log.Printf("Failed to parse custom headers for resource %s: %v. Headers: %s", info.ID, err, info.CustomHeaders) + } + } - var finalMiddlewares []string - if customHeadersMiddlewareID != "" { - finalMiddlewares = append(finalMiddlewares, customHeadersMiddlewareID) - } - for _, mw := range assignedMiddlewares { - finalMiddlewares = append(finalMiddlewares, fmt.Sprintf("%s@file", mw.ID)) - } - - // Only add the badger middleware when using Pangolin data source - if activeDSConfig.Type == models.PangolinAPI { - isBadgerPresent := false - for _, m := range finalMiddlewares { - if m == "badger@http" { - isBadgerPresent = true - break - } - } - if !isBadgerPresent { - finalMiddlewares = append(finalMiddlewares, "badger@http") - } - } - - var serviceReference string - if mapValueDataEntry.CustomServiceID.Valid && mapValueDataEntry.CustomServiceID.String != "" { - customServiceID := mapValueDataEntry.CustomServiceID.String - // Check if custom service ID already has a provider suffix - if strings.Contains(customServiceID, "@") { - // Already has a provider, use as is - serviceReference = customServiceID - } else { - // Add the file provider - serviceReference = fmt.Sprintf("%s@file", customServiceID) - } - } else { - // For Docker environments when using Traefik API, prefer docker provider - providerSuffix := "docker" - - // If not using Traefik API as data source, use http provider - if activeDSConfig.Type != models.TraefikAPI { - providerSuffix = "http" - } - - // FIX: Check properly for existing suffixes - suffix := fmt.Sprintf("@%s", providerSuffix) - if strings.HasSuffix(info.ServiceID, suffix) { - // Already has the correct suffix, use as is - serviceReference = info.ServiceID - } else if strings.Contains(info.ServiceID, "@") { - // Has a different provider suffix, use as is - serviceReference = info.ServiceID - } else { - // No provider suffix, add one - serviceReference = fmt.Sprintf("%s%s", info.ServiceID, suffix) - } - } - - log.Printf("Resource %s (HTTP): Router service set to %s. (SourceType: %s, ActiveDS: %s, CustomSvc: %s)", - info.ID, - serviceReference, - info.SourceType, - activeDSConfig.Type, - mapValueDataEntry.CustomServiceID.String) - - routerIDForTraefik := fmt.Sprintf("%s-auth", info.ID) - routerConfig := map[string]interface{}{ - "rule": fmt.Sprintf("Host(`%s`)", info.Host), - "service": serviceReference, - "entryPoints": routerEntryPoints, - "priority": info.RouterPriority, - } - if len(finalMiddlewares) > 0 { - routerConfig["middlewares"] = finalMiddlewares - } + var finalMiddlewares []string + if customHeadersMiddlewareID != "" { + finalMiddlewares = append(finalMiddlewares, customHeadersMiddlewareID) + } + for _, mw := range assignedMiddlewares { + // Use extractBaseName here too for middleware IDs if needed + middlewareID := extractBaseName(mw.ID) + finalMiddlewares = append(finalMiddlewares, fmt.Sprintf("%s@file", middlewareID)) + } + + // Only add the badger middleware when using Pangolin data source + if activeDSConfig.Type == models.PangolinAPI { + isBadgerPresent := false + for _, m := range finalMiddlewares { + if m == "badger@http" { + isBadgerPresent = true + break + } + } + if !isBadgerPresent { + finalMiddlewares = append(finalMiddlewares, "badger@http") + } + } + + var serviceReference string + if mapValueDataEntry.CustomServiceID.Valid && mapValueDataEntry.CustomServiceID.String != "" { + // Extract base name without any suffixes + baseName := extractBaseName(mapValueDataEntry.CustomServiceID.String) + // Always add the file provider for custom services + serviceReference = fmt.Sprintf("%s@file", baseName) + } else { + // For Docker environments when using Traefik API, prefer docker provider + providerSuffix := "docker" + + // If not using Traefik API as data source, use http provider + if activeDSConfig.Type != models.TraefikAPI { + providerSuffix = "http" + } + + // Extract base name without any suffixes + baseName := extractBaseName(info.ServiceID) + // Add the appropriate provider suffix + serviceReference = fmt.Sprintf("%s@%s", baseName, providerSuffix) + } + + log.Printf("Resource %s (HTTP): Router service set to %s. (SourceType: %s, ActiveDS: %s, CustomSvc: %s)", + info.ID, + serviceReference, + info.SourceType, + activeDSConfig.Type, + mapValueDataEntry.CustomServiceID.String) + + // Make sure we don't have duplicated suffixes in router ID + routerIDBase := extractBaseName(info.ID) + routerIDForTraefik := fmt.Sprintf("%s-auth", routerIDBase) + + routerConfig := map[string]interface{}{ + "rule": fmt.Sprintf("Host(`%s`)", info.Host), + "service": serviceReference, + "entryPoints": routerEntryPoints, + "priority": info.RouterPriority, + } + if len(finalMiddlewares) > 0 { + routerConfig["middlewares"] = finalMiddlewares + } - tlsConfig := map[string]interface{}{"certResolver": "letsencrypt"} - if info.TLSDomains != "" { - sans := strings.Split(strings.TrimSpace(info.TLSDomains), ",") - var cleanSans []string - for _, s := range sans { - if trimmed := strings.TrimSpace(s); trimmed != "" { - cleanSans = append(cleanSans, trimmed) - } - } - if len(cleanSans) > 0 { - tlsConfig["domains"] = []map[string]interface{}{{"main": info.Host, "sans": cleanSans}} - } - } - routerConfig["tls"] = tlsConfig - config.HTTP.Routers[routerIDForTraefik] = routerConfig - } - return nil + tlsConfig := map[string]interface{}{"certResolver": "letsencrypt"} + if info.TLSDomains != "" { + sans := strings.Split(strings.TrimSpace(info.TLSDomains), ",") + var cleanSans []string + for _, s := range sans { + if trimmed := strings.TrimSpace(s); trimmed != "" { + cleanSans = append(cleanSans, trimmed) + } + } + if len(cleanSans) > 0 { + tlsConfig["domains"] = []map[string]interface{}{{"main": info.Host, "sans": cleanSans}} + } + } + routerConfig["tls"] = tlsConfig + config.HTTP.Routers[routerIDForTraefik] = routerConfig + } + return nil } +// processTCPRouters processes TCP router resources func (cg *ConfigGenerator) processTCPRouters(config *TraefikConfig) error { - activeDSConfig, err := cg.configManager.GetActiveDataSourceConfig() - if err != nil { - log.Printf("Warning: Could not get active data source config for TCP routers: %v. Defaulting to Pangolin logic.", err) - activeDSConfig.Type = models.PangolinAPI - } - - query := ` - SELECT r.id, r.host, r.service_id, r.tcp_entrypoints, r.tcp_sni_rule, r.router_priority, r.source_type, - rs.service_id as custom_service_id - FROM resources r - LEFT JOIN resource_services rs ON r.id = rs.resource_id - WHERE r.status = 'active' AND r.tcp_enabled = 1 - ` - rows, err := cg.db.Query(query) - if err != nil { - return fmt.Errorf("failed to fetch TCP resources: %w", err) - } - defer rows.Close() - - for rows.Next() { - var id, host, serviceID, tcpEntrypointsStr, tcpSNIRule, sourceType string - var routerPriority sql.NullInt64 - var customServiceID sql.NullString - if err := rows.Scan(&id, &host, &serviceID, &tcpEntrypointsStr, &tcpSNIRule, &routerPriority, &sourceType, &customServiceID); err != nil { - log.Printf("Failed to scan TCP resource: %v", err) - continue - } + activeDSConfig, err := cg.configManager.GetActiveDataSourceConfig() + if err != nil { + log.Printf("Warning: Could not get active data source config for TCP routers: %v. Defaulting to Pangolin logic.", err) + activeDSConfig.Type = models.PangolinAPI + } + + query := ` + SELECT r.id, r.host, r.service_id, r.tcp_entrypoints, r.tcp_sni_rule, r.router_priority, r.source_type, + rs.service_id as custom_service_id + FROM resources r + LEFT JOIN resource_services rs ON r.id = rs.resource_id + WHERE r.status = 'active' AND r.tcp_enabled = 1 + ` + rows, err := cg.db.Query(query) + if err != nil { + return fmt.Errorf("failed to fetch TCP resources: %w", err) + } + defer rows.Close() + + for rows.Next() { + var id, host, serviceID, tcpEntrypointsStr, tcpSNIRule, sourceType string + var routerPriority sql.NullInt64 + var customServiceID sql.NullString + if err := rows.Scan(&id, &host, &serviceID, &tcpEntrypointsStr, &tcpSNIRule, &routerPriority, &sourceType, &customServiceID); err != nil { + log.Printf("Failed to scan TCP resource: %v", err) + continue + } - priority := 100 - if routerPriority.Valid { - priority = int(routerPriority.Int64) - } + priority := 100 + if routerPriority.Valid { + priority = int(routerPriority.Int64) + } - entrypoints := strings.Split(strings.TrimSpace(tcpEntrypointsStr), ",") - if len(entrypoints) == 0 || entrypoints[0] == "" { - entrypoints = []string{"tcp"} // Default TCP entrypoint - } - - rule := tcpSNIRule - if rule == "" { // Default SNI rule if not specified - rule = fmt.Sprintf("HostSNI(`%s`)", host) - } + entrypoints := strings.Split(strings.TrimSpace(tcpEntrypointsStr), ",") + if len(entrypoints) == 0 || entrypoints[0] == "" { + entrypoints = []string{"tcp"} // Default TCP entrypoint + } + + rule := tcpSNIRule + if rule == "" { // Default SNI rule if not specified + rule = fmt.Sprintf("HostSNI(`%s`)", host) + } - var tcpServiceReference string - if customServiceID.Valid && customServiceID.String != "" { - customSvc := customServiceID.String - // Check if custom service ID already has a provider suffix - if strings.Contains(customSvc, "@") { - // Already has a provider, use as is - tcpServiceReference = customSvc - } else { - // Add the file provider - tcpServiceReference = fmt.Sprintf("%s@file", customSvc) - } - } else { - providerSuffix := "http" // Default provider - - // Use appropriate provider based on data source - if activeDSConfig.Type == models.TraefikAPI { - if models.DataSourceType(sourceType) == models.TraefikAPI { - providerSuffix = "docker" - } - } - - // FIX: Check properly for existing suffixes - suffix := fmt.Sprintf("@%s", providerSuffix) - if strings.HasSuffix(serviceID, suffix) { - // Already has the correct suffix, use as is - tcpServiceReference = serviceID - } else if strings.Contains(serviceID, "@") { - // Has a different provider suffix, use as is - tcpServiceReference = serviceID - } else { - // No provider suffix, add one - tcpServiceReference = fmt.Sprintf("%s%s", serviceID, suffix) - } - } - - log.Printf("Resource %s (TCP): Router service set to %s. (SourceType: %s, ActiveDS: %s, CustomSvc: %s)", - id, tcpServiceReference, sourceType, activeDSConfig.Type, customServiceID.String) - - tcpRouterID := fmt.Sprintf("%s-tcp", id) - config.TCP.Routers[tcpRouterID] = map[string]interface{}{ - "rule": rule, - "service": tcpServiceReference, - "entryPoints": entrypoints, - "priority": priority, - "tls": map[string]interface{}{}, // TCP routers with SNI usually involve TLS - } - } - return rows.Err() + var tcpServiceReference string + if customServiceID.Valid && customServiceID.String != "" { + // Extract base name without any suffixes + baseName := extractBaseName(customServiceID.String) + // Always add the file provider for custom services + tcpServiceReference = fmt.Sprintf("%s@file", baseName) + } else { + // Default provider suffix + providerSuffix := "http" + + // If using Traefik API, consider using docker for appropriate sources + if activeDSConfig.Type == models.TraefikAPI { + if models.DataSourceType(sourceType) == models.TraefikAPI { + providerSuffix = "docker" + } + } + + // Extract base name without any suffixes + baseName := extractBaseName(serviceID) + // Add the appropriate provider suffix + tcpServiceReference = fmt.Sprintf("%s@%s", baseName, providerSuffix) + } + + log.Printf("Resource %s (TCP): Router service set to %s. (SourceType: %s, ActiveDS: %s, CustomSvc: %s)", + id, tcpServiceReference, sourceType, activeDSConfig.Type, customServiceID.String) + + // Make sure we don't have duplicated suffixes in router ID + routerIDBase := extractBaseName(id) + tcpRouterID := fmt.Sprintf("%s-tcp", routerIDBase) + + config.TCP.Routers[tcpRouterID] = map[string]interface{}{ + "rule": rule, + "service": tcpServiceReference, + "entryPoints": entrypoints, + "priority": priority, + "tls": map[string]interface{}{}, // TCP routers with SNI usually involve TLS + } + } + return rows.Err() } From 90d0f79d736568db82ba518948c7f0a7774c5d3a Mon Sep 17 00:00:00 2001 From: hhftechnologies Date: Mon, 19 May 2025 23:16:23 +0530 Subject: [PATCH 04/10] update --- database/cleanup.go | 77 +++++++++++++++++++ main.go | 7 ++ services/config_generator.go | 145 ++++++++++++++++++++++++----------- 3 files changed, 185 insertions(+), 44 deletions(-) create mode 100644 database/cleanup.go diff --git a/database/cleanup.go b/database/cleanup.go new file mode 100644 index 000000000..fff224abf --- /dev/null +++ b/database/cleanup.go @@ -0,0 +1,77 @@ +package database + +import ( + "fmt" + "log" + "strings" +) + +// CleanupDuplicateServices removes service duplication from the database +func (db *DB) CleanupDuplicateServices() error { + // Get all services + rows, err := db.Query("SELECT id, name, type, config FROM services") + if err != nil { + return fmt.Errorf("failed to query services: %w", err) + } + defer rows.Close() + + // Map to track unique base names + type serviceInfo struct { + ID string + Config string + } + uniqueServices := make(map[string]serviceInfo) + + var servicesToDelete []string + + // Process each service + for rows.Next() { + var id, name, typ, configStr string + if err := rows.Scan(&id, &name, &typ, &configStr); err != nil { + return fmt.Errorf("failed to scan service: %w", err) + } + + // Get base name (without any provider suffix) + baseName := id + if idx := strings.Index(id, "@"); idx > 0 { + baseName = id[:idx] + } + + // If we've already seen this base name, mark this as duplicate + if existing, found := uniqueServices[baseName]; found { + // Keep the one with fewer @ symbols (more "canonical") + if strings.Count(existing.ID, "@") > strings.Count(id, "@") { + // The new one is better, update the map and mark the old one for deletion + servicesToDelete = append(servicesToDelete, existing.ID) + uniqueServices[baseName] = serviceInfo{id, configStr} + } else { + // The existing one is better, mark this one for deletion + servicesToDelete = append(servicesToDelete, id) + } + } else { + // First time seeing this base name + uniqueServices[baseName] = serviceInfo{id, configStr} + } + } + + // Delete duplicates in a transaction + tx, err := db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + + for _, id := range servicesToDelete { + log.Printf("Deleting duplicate service: %s", id) + if _, err := tx.Exec("DELETE FROM services WHERE id = ?", id); err != nil { + tx.Rollback() + return fmt.Errorf("failed to delete service %s: %w", id, err) + } + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + log.Printf("Cleanup complete. Removed %d duplicate services", len(servicesToDelete)) + return nil +} \ No newline at end of file diff --git a/main.go b/main.go index 31da01327..740d3ceb5 100644 --- a/main.go +++ b/main.go @@ -123,6 +123,13 @@ func main() { if err := config.LoadDefaultServiceTemplates(db); err != nil { log.Printf("Warning: Failed to load default service templates: %v", err) } + log.Println("Cleaning up duplicate services...") +if err := db.CleanupDuplicateServices(); err != nil { + log.Printf("Warning: Failed to clean up duplicate services: %v", err) +} else { + log.Println("Service cleanup completed successfully") +} + configManager, err := services.NewConfigManager(filepath.Join(configDir, "config.json")) if err != nil { diff --git a/services/config_generator.go b/services/config_generator.go index fe13c0fa0..9ab485ae4 100644 --- a/services/config_generator.go +++ b/services/config_generator.go @@ -12,6 +12,7 @@ import ( "strings" "sync" "time" + "net/http" "github.com/hhftechnology/middleware-manager/database" "github.com/hhftechnology/middleware-manager/models" // Correct import for your models @@ -97,7 +98,15 @@ func (cg *ConfigGenerator) Start(interval time.Duration) { } } } - +// Add this helper function at the top of the file with other utility functions +func normalizeServiceID(id string) string { + // Extract the base name (everything before the first @) + baseName := id + if idx := strings.Index(id, "@"); idx > 0 { + baseName = id[:idx] + } + return baseName +} // Stop stops the config generator func (cg *ConfigGenerator) Stop() { cg.mutex.Lock() @@ -378,26 +387,27 @@ func (cg *ConfigGenerator) processResourcesWithServices(config *TraefikConfig) e } } - var serviceReference string - if mapValueDataEntry.CustomServiceID.Valid && mapValueDataEntry.CustomServiceID.String != "" { - // Extract base name without any suffixes - baseName := extractBaseName(mapValueDataEntry.CustomServiceID.String) - // Always add the file provider for custom services - serviceReference = fmt.Sprintf("%s@file", baseName) - } else { - // For Docker environments when using Traefik API, prefer docker provider - providerSuffix := "docker" - - // If not using Traefik API as data source, use http provider - if activeDSConfig.Type != models.TraefikAPI { - providerSuffix = "http" - } - - // Extract base name without any suffixes - baseName := extractBaseName(info.ServiceID) - // Add the appropriate provider suffix - serviceReference = fmt.Sprintf("%s@%s", baseName, providerSuffix) - } +// Find the section where serviceReference is set +var serviceReference string +if mapValueDataEntry.CustomServiceID.Valid && mapValueDataEntry.CustomServiceID.String != "" { + // Extract base name without any suffixes + baseName := normalizeServiceID(mapValueDataEntry.CustomServiceID.String) + // Always add the file provider for custom services + serviceReference = fmt.Sprintf("%s@file", baseName) +} else { + // For Docker environments when using Traefik API, prefer docker provider + providerSuffix := "docker" + + // If not using Traefik API as data source, use http provider + if activeDSConfig.Type != models.TraefikAPI { + providerSuffix = "http" + } + + // Extract base name without any suffixes + baseName := normalizeServiceID(info.ServiceID) + // Add the appropriate provider suffix + serviceReference = fmt.Sprintf("%s@%s", baseName, providerSuffix) +} log.Printf("Resource %s (HTTP): Router service set to %s. (SourceType: %s, ActiveDS: %s, CustomSvc: %s)", info.ID, @@ -439,6 +449,54 @@ func (cg *ConfigGenerator) processResourcesWithServices(config *TraefikConfig) e return nil } +// Add to the imports if needed: +// import "encoding/json" + +// Helper to fetch service names from Traefik API +func (cg *ConfigGenerator) fetchTraefikServiceNames() map[string]string { + serviceMap := make(map[string]string) + client := &http.Client{Timeout: 5 * time.Second} + + // Get Traefik API URL from data source config + dsConfig, err := cg.configManager.GetActiveDataSourceConfig() + if err != nil { + log.Printf("Warning: Failed to get active data source config: %v", err) + return serviceMap + } + + apiURL := dsConfig.URL + + // Fetch HTTP services + resp, err := client.Get(apiURL + "/api/http/services") + if err != nil { + log.Printf("Warning: Failed to fetch services from Traefik API: %v", err) + return serviceMap + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Printf("Warning: Traefik API returned status %d", resp.StatusCode) + return serviceMap + } + + var services []struct { + Name string `json:"name"` + } + + if err := json.NewDecoder(resp.Body).Decode(&services); err != nil { + log.Printf("Warning: Failed to decode Traefik API response: %v", err) + return serviceMap + } + + // Build a map of base name -> full name with provider + for _, svc := range services { + baseName := normalizeServiceID(svc.Name) + serviceMap[baseName] = svc.Name + } + + return serviceMap +} + // processTCPRouters processes TCP router resources func (cg *ConfigGenerator) processTCPRouters(config *TraefikConfig) error { activeDSConfig, err := cg.configManager.GetActiveDataSourceConfig() @@ -484,29 +542,28 @@ func (cg *ConfigGenerator) processTCPRouters(config *TraefikConfig) error { rule = fmt.Sprintf("HostSNI(`%s`)", host) } - var tcpServiceReference string - if customServiceID.Valid && customServiceID.String != "" { - // Extract base name without any suffixes - baseName := extractBaseName(customServiceID.String) - // Always add the file provider for custom services - tcpServiceReference = fmt.Sprintf("%s@file", baseName) - } else { - // Default provider suffix - providerSuffix := "http" - - // If using Traefik API, consider using docker for appropriate sources - if activeDSConfig.Type == models.TraefikAPI { - if models.DataSourceType(sourceType) == models.TraefikAPI { - providerSuffix = "docker" - } - } - - // Extract base name without any suffixes - baseName := extractBaseName(serviceID) - // Add the appropriate provider suffix - tcpServiceReference = fmt.Sprintf("%s@%s", baseName, providerSuffix) - } - + var tcpServiceReference string + if customServiceID.Valid && customServiceID.String != "" { + // Extract base name without any suffixes + baseName := normalizeServiceID(customServiceID.String) + // Always add the file provider for custom services + tcpServiceReference = fmt.Sprintf("%s@file", baseName) + } else { + // Default provider suffix + providerSuffix := "http" + + // If using Traefik API, consider using docker for appropriate sources + if activeDSConfig.Type == models.TraefikAPI { + if models.DataSourceType(sourceType) == models.TraefikAPI { + providerSuffix = "docker" + } + } + + // Extract base name without any suffixes + baseName := normalizeServiceID(serviceID) + // Add the appropriate provider suffix + tcpServiceReference = fmt.Sprintf("%s@%s", baseName, providerSuffix) + } log.Printf("Resource %s (TCP): Router service set to %s. (SourceType: %s, ActiveDS: %s, CustomSvc: %s)", id, tcpServiceReference, sourceType, activeDSConfig.Type, customServiceID.String) From e2ddfdda1a8e995eab0ef430fca113a1bad41e07 Mon Sep 17 00:00:00 2001 From: hhftechnologies Date: Mon, 19 May 2025 23:48:13 +0530 Subject: [PATCH 05/10] Update service_watcher.go --- services/service_watcher.go | 38 +++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/services/service_watcher.go b/services/service_watcher.go index 9eebdf7cd..c8d0b82e2 100644 --- a/services/service_watcher.go +++ b/services/service_watcher.go @@ -181,17 +181,25 @@ func (sw *ServiceWatcher) checkServices() error { return nil } +// updateOrCreateService updates an existing service or creates a new one // updateOrCreateService updates an existing service or creates a new one func (sw *ServiceWatcher) updateOrCreateService(service models.Service) error { - // Check if service already exists + // Normalize service ID by removing additional provider suffixes + normalizedID := getNormalizedServiceID(service.ID) + + // Check if service already exists using both original and normalized IDs var exists int + var existingType, existingConfig string - err := sw.db.QueryRow("SELECT 1 FROM services WHERE id = ?", service.ID).Scan(&exists) + err := sw.db.QueryRow( + "SELECT 1, type, config FROM services WHERE id = ? OR id LIKE ?", + service.ID, normalizedID+"@%", + ).Scan(&exists, &existingType, &existingConfig) if err == nil { // Service exists, only update if it changed if shouldUpdateService(sw.db, service) { - log.Printf("Updating existing service: %s", service.ID) + log.Printf("Updating existing service: %s (normalized from %s)", normalizedID, service.ID) return sw.updateService(service) } // Service exists and hasn't changed, skip update @@ -201,10 +209,19 @@ func (sw *ServiceWatcher) updateOrCreateService(service models.Service) error { return fmt.Errorf("error checking if service exists: %w", err) } - // Service doesn't exist, create it + // Service doesn't exist, create it with normalized ID + service.ID = normalizedID return sw.createService(service) } +// getNormalizedServiceID removes any provider suffixes from service IDs +func getNormalizedServiceID(id string) string { + if idx := strings.Index(id, "@"); idx > 0 { + return id[:idx] + } + return id +} + // shouldUpdateService determines if an existing service needs to be updated func shouldUpdateService(db *database.DB, newService models.Service) bool { var existingType, existingConfig string @@ -334,6 +351,7 @@ func configsAreDifferent(config1, config2 map[string]interface{}) bool { return false } +// createService creates a new service in the database // createService creates a new service in the database func (sw *ServiceWatcher) createService(service models.Service) error { // Validate service type @@ -376,17 +394,25 @@ func (sw *ServiceWatcher) createService(service models.Service) error { service.Name = formatServiceName(service.ID) } + // Make sure we're not adding @file if the ID already has a provider + serviceID := service.ID + if !strings.Contains(serviceID, "@") { + serviceID = serviceID + "@file" // Only add @file if no provider exists + } + + log.Printf("Creating new service: %s (original ID: %s)", serviceID, service.ID) + // Insert the service _, err = sw.db.Exec( "INSERT INTO services (id, name, type, config, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)", - service.ID, service.Name, service.Type, string(configJSON), time.Now(), time.Now(), + serviceID, service.Name, service.Type, string(configJSON), time.Now(), time.Now(), ) if err != nil { return fmt.Errorf("failed to insert service %s: %w", service.ID, err) } - log.Printf("Created new service: %s (%s)", service.Name, service.ID) + log.Printf("Created new service: %s (%s)", service.Name, serviceID) return nil } From 0bd65d08819e7d9c5cfddc1cee75a42f570501ae Mon Sep 17 00:00:00 2001 From: hhftechnologies Date: Tue, 20 May 2025 00:22:00 +0530 Subject: [PATCH 06/10] Update resource_watcher.go --- services/resource_watcher.go | 99 ++++++++++++++++++++++++++++++++++-- 1 file changed, 94 insertions(+), 5 deletions(-) diff --git a/services/resource_watcher.go b/services/resource_watcher.go index fefb41204..caad0cdbe 100644 --- a/services/resource_watcher.go +++ b/services/resource_watcher.go @@ -204,34 +204,117 @@ func (rw *ResourceWatcher) checkResources() error { return nil } +// updateOrCreateResource updates an existing resource or creates a new one +// normalizeResourceID removes cascading auth suffixes and provider suffixes from resource IDs +func normalizeResourceID(id string) string { + // First, remove any provider suffix (if present) + baseName := id + if idx := strings.Index(baseName, "@"); idx > 0 { + baseName = baseName[:idx] + } + + // Check if this is a router resource + if !strings.Contains(baseName, "-router") { + return baseName // Not a router resource, return without suffix processing + } + + // Handle cascading auth patterns + // Extract the base router pattern (e.g., "1-router" from "1-router-auth-auth-auth...") + routerParts := strings.SplitN(baseName, "-router", 2) + if len(routerParts) != 2 { + return baseName // Unexpected format, return as is + } + + // Check if we have auth suffixes + suffixPart := routerParts[1] + if strings.Contains(suffixPart, "-auth") { + // Replace all cascading -auth suffixes with just one -auth + // First, handle the case of -router-auth pattern + if strings.HasPrefix(suffixPart, "-auth") { + return routerParts[0] + "-router-auth" + } + + // Handle cases like -router-redirect-auth with a single preserved redirect component + redirectParts := strings.SplitN(suffixPart, "-auth", 2) + if len(redirectParts) > 1 && redirectParts[0] != "" { + return routerParts[0] + "-router" + redirectParts[0] + "-auth" + } + } + + // If no auth suffixes or couldn't parse properly, return the original base name + return baseName +} + // updateOrCreateResource updates an existing resource or creates a new one func (rw *ResourceWatcher) updateOrCreateResource(resource models.Resource) error { - // Check if resource already exists + // Normalize the resource ID to handle cascading auth suffixes + normalizedID := normalizeResourceID(resource.ID) + + // For logging purposes, keep track if we normalized the ID + originalID := resource.ID + wasNormalized := normalizedID != originalID + + // Check if resource already exists with either the original or normalized ID var exists int var status string var entrypoints, tlsDomains, tcpEntrypoints, tcpSNIRule, customHeaders string var tcpEnabled int var routerPriority sql.NullInt64 + // First try exact match with the original ID err := rw.db.QueryRow(` SELECT 1, status, entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, custom_headers, router_priority FROM resources WHERE id = ? `, resource.ID).Scan(&exists, &status, &entrypoints, &tlsDomains, &tcpEnabled, - &tcpEntrypoints, &tcpSNIRule, &customHeaders, &routerPriority) + &tcpEntrypoints, &tcpSNIRule, &customHeaders, &routerPriority) + + // If exact match not found and ID was normalized, try with pattern matching + if err == sql.ErrNoRows && wasNormalized { + // Try to find any resource that matches the normalized pattern with potential suffixes + // Use LIKE query with escape for special characters in the ID + normalizedPattern := normalizedID + "%" + if strings.Contains(normalizedID, "-router") { + // For router resources, specifically match auth suffix pattern + normalizedPattern = strings.Replace(normalizedID, "-router", "-router%", 1) + } + + err = rw.db.QueryRow(` + SELECT 1, status, entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, + custom_headers, router_priority + FROM resources WHERE id LIKE ? LIMIT 1 + `, normalizedPattern).Scan(&exists, &status, &entrypoints, &tlsDomains, &tcpEnabled, + &tcpEntrypoints, &tcpSNIRule, &customHeaders, &routerPriority) + + // If found via pattern match, log it for debugging + if err == nil { + log.Printf("Found resource via pattern matching: %s matches normalized pattern %s", + resource.ID, normalizedPattern) + } + } if err == nil { // Resource exists, update essential fields but preserve custom configuration + // If we found a match via normalization, use the normalized ID for the update + updateID := resource.ID + if wasNormalized { + // Use the normalized ID for the update to prevent future duplication + updateID = normalizedID + log.Printf("Updating with normalized ID: %s (was %s)", normalizedID, originalID) + } + _, err = rw.db.Exec( - "UPDATE resources SET host = ?, service_id = ?, status = 'active', source_type = ?, updated_at = ? WHERE id = ?", - resource.Host, resource.ServiceID, resource.SourceType, time.Now(), resource.ID, + "UPDATE resources SET id = ?, host = ?, service_id = ?, status = 'active', source_type = ?, updated_at = ? WHERE id LIKE ?", + updateID, resource.Host, resource.ServiceID, resource.SourceType, time.Now(), + // Use pattern matching for the WHERE clause to catch all variations + strings.Replace(normalizedID, "-router", "-router%", 1), ) if err != nil { return fmt.Errorf("failed to update resource %s: %w", resource.ID, err) } if status == "disabled" { - log.Printf("Resource %s was disabled but is now active again", resource.ID) + log.Printf("Resource %s was disabled but is now active again", updateID) } return nil @@ -260,6 +343,12 @@ func (rw *ResourceWatcher) updateOrCreateResource(resource models.Resource) erro resource.RouterPriority = 100 // Default priority } + // For new resources, always use the normalized ID to prevent duplication + if wasNormalized { + log.Printf("Creating new resource with normalized ID: %s (was %s)", normalizedID, originalID) + resource.ID = normalizedID + } + // Create new resource with default configuration _, err = rw.db.Exec(` INSERT INTO resources ( From 54dee26deea9794c9649f2e9fed9b7542b99471c Mon Sep 17 00:00:00 2001 From: hhftechnologies Date: Tue, 20 May 2025 00:26:47 +0530 Subject: [PATCH 07/10] Update resource_watcher.go --- services/resource_watcher.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/resource_watcher.go b/services/resource_watcher.go index caad0cdbe..39676996f 100644 --- a/services/resource_watcher.go +++ b/services/resource_watcher.go @@ -245,6 +245,8 @@ func normalizeResourceID(id string) string { return baseName } +// updateOrCreateResource updates an existing resource or creates a new one + // updateOrCreateResource updates an existing resource or creates a new one func (rw *ResourceWatcher) updateOrCreateResource(resource models.Resource) error { // Normalize the resource ID to handle cascading auth suffixes @@ -368,7 +370,6 @@ func (rw *ResourceWatcher) updateOrCreateResource(resource models.Resource) erro log.Printf("Added new resource: %s (%s)", resource.Host, resource.ID) return nil } - // fetchTraefikConfig fetches the Traefik configuration from the data source // This method is kept for backward compatibility with the original implementation func (rw *ResourceWatcher) fetchTraefikConfig(ctx context.Context) (*models.PangolinTraefikConfig, error) { From a89dfa1c868786198b9f53201e84665b98fcf3db Mon Sep 17 00:00:00 2001 From: hhftechnologies Date: Tue, 20 May 2025 12:00:40 +0530 Subject: [PATCH 08/10] update --- services/resource_watcher.go | 244 +++++++++++++++++++++++++---------- services/service_watcher.go | 11 +- 2 files changed, 182 insertions(+), 73 deletions(-) diff --git a/services/resource_watcher.go b/services/resource_watcher.go index 39676996f..e362ba71a 100644 --- a/services/resource_watcher.go +++ b/services/resource_watcher.go @@ -206,47 +206,55 @@ func (rw *ResourceWatcher) checkResources() error { // updateOrCreateResource updates an existing resource or creates a new one // normalizeResourceID removes cascading auth suffixes and provider suffixes from resource IDs +// normalizeResourceID removes redundant cascading auth suffixes but preserves essential distinctions func normalizeResourceID(id string) string { - // First, remove any provider suffix (if present) + // Extract the base name (everything before the first @) baseName := id if idx := strings.Index(baseName, "@"); idx > 0 { baseName = baseName[:idx] } - // Check if this is a router resource + // If not a router resource, return as is if !strings.Contains(baseName, "-router") { - return baseName // Not a router resource, return without suffix processing + return baseName } - // Handle cascading auth patterns - // Extract the base router pattern (e.g., "1-router" from "1-router-auth-auth-auth...") - routerParts := strings.SplitN(baseName, "-router", 2) - if len(routerParts) != 2 { - return baseName // Unexpected format, return as is + // Handle cascading auth patterns more carefully + // Replace multiple consecutive -auth suffixes with just one -auth + // but preserve redirect and other important suffixes + parts := strings.Split(baseName, "-router") + if len(parts) != 2 { + return baseName // Unexpected format } - // Check if we have auth suffixes - suffixPart := routerParts[1] - if strings.Contains(suffixPart, "-auth") { - // Replace all cascading -auth suffixes with just one -auth - // First, handle the case of -router-auth pattern - if strings.HasPrefix(suffixPart, "-auth") { - return routerParts[0] + "-router-auth" + prefix := parts[0] // e.g., "api" + suffix := parts[1] // e.g., "-auth-auth-redirect" + + // Process the suffix to preserve distinctive parts + if strings.Contains(suffix, "-auth") { + // Preserve redirect component if present + redirectPart := "" + if strings.Contains(suffix, "-redirect") { + redirectParts := strings.Split(suffix, "-redirect") + suffix = redirectParts[0] + redirectPart = "-redirect" } - // Handle cases like -router-redirect-auth with a single preserved redirect component - redirectParts := strings.SplitN(suffixPart, "-auth", 2) - if len(redirectParts) > 1 && redirectParts[0] != "" { - return routerParts[0] + "-router" + redirectParts[0] + "-auth" + // Replace multiple auth suffixes with single auth + for strings.Contains(suffix, "-auth-auth") { + suffix = strings.Replace(suffix, "-auth-auth", "-auth", 1) } + + // Reconstruct with preserved components + return prefix + "-router" + suffix + redirectPart } - // If no auth suffixes or couldn't parse properly, return the original base name return baseName } // updateOrCreateResource updates an existing resource or creates a new one +// updateOrCreateResource updates an existing resource or creates a new one // updateOrCreateResource updates an existing resource or creates a new one func (rw *ResourceWatcher) updateOrCreateResource(resource models.Resource) error { // Normalize the resource ID to handle cascading auth suffixes @@ -258,6 +266,7 @@ func (rw *ResourceWatcher) updateOrCreateResource(resource models.Resource) erro // Check if resource already exists with either the original or normalized ID var exists int + var existingID string // Store the actual ID found in the database var status string var entrypoints, tlsDomains, tcpEntrypoints, tcpSNIRule, customHeaders string var tcpEnabled int @@ -265,58 +274,119 @@ func (rw *ResourceWatcher) updateOrCreateResource(resource models.Resource) erro // First try exact match with the original ID err := rw.db.QueryRow(` - SELECT 1, status, entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, + SELECT id, 1, status, entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, custom_headers, router_priority FROM resources WHERE id = ? - `, resource.ID).Scan(&exists, &status, &entrypoints, &tlsDomains, &tcpEnabled, + `, resource.ID).Scan(&existingID, &exists, &status, &entrypoints, &tlsDomains, &tcpEnabled, &tcpEntrypoints, &tcpSNIRule, &customHeaders, &routerPriority) - // If exact match not found and ID was normalized, try with pattern matching - if err == sql.ErrNoRows && wasNormalized { - // Try to find any resource that matches the normalized pattern with potential suffixes - // Use LIKE query with escape for special characters in the ID - normalizedPattern := normalizedID + "%" - if strings.Contains(normalizedID, "-router") { - // For router resources, specifically match auth suffix pattern - normalizedPattern = strings.Replace(normalizedID, "-router", "-router%", 1) - } - + // If not found by original ID, try normalized ID + if err == sql.ErrNoRows { err = rw.db.QueryRow(` - SELECT 1, status, entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, + SELECT id, 1, status, entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, custom_headers, router_priority - FROM resources WHERE id LIKE ? LIMIT 1 - `, normalizedPattern).Scan(&exists, &status, &entrypoints, &tlsDomains, &tcpEnabled, - &tcpEntrypoints, &tcpSNIRule, &customHeaders, &routerPriority) - - // If found via pattern match, log it for debugging - if err == nil { - log.Printf("Found resource via pattern matching: %s matches normalized pattern %s", - resource.ID, normalizedPattern) + FROM resources WHERE id = ? + `, normalizedID).Scan(&existingID, &exists, &status, &entrypoints, &tlsDomains, &tcpEnabled, + &tcpEntrypoints, &tcpSNIRule, &customHeaders, &routerPriority) + } + + // If still not found and ID was normalized, try with more precise pattern matching + if err == sql.ErrNoRows && wasNormalized && strings.Contains(normalizedID, "-router") { + parts := strings.Split(normalizedID, "-router") + if len(parts) == 2 { + // Create a more specific pattern that includes the prefix to avoid unrelated matches + prefix := parts[0] + suffix := parts[1] + + // If there's a specific suffix like "-redirect", create an even more precise pattern + var pattern string + if strings.Contains(suffix, "-redirect") { + pattern = prefix + "-router%-redirect%" + } else if strings.Contains(suffix, "-auth") { + pattern = prefix + "-router%-auth%" + } else { + pattern = prefix + "-router%" + } + + log.Printf("Trying to find resource with more specific pattern: %s", pattern) + err = rw.db.QueryRow(` + SELECT id, 1, status, entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, + custom_headers, router_priority + FROM resources WHERE id LIKE ? LIMIT 1 + `, pattern).Scan(&existingID, &exists, &status, &entrypoints, &tlsDomains, &tcpEnabled, + &tcpEntrypoints, &tcpSNIRule, &customHeaders, &routerPriority) + + // If found via pattern match, log it for debugging + if err == nil { + log.Printf("Found resource via precise pattern matching: %s matches pattern %s", + resource.ID, pattern) + } } } if err == nil { - // Resource exists, update essential fields but preserve custom configuration - // If we found a match via normalization, use the normalized ID for the update - updateID := resource.ID - if wasNormalized { - // Use the normalized ID for the update to prevent future duplication - updateID = normalizedID - log.Printf("Updating with normalized ID: %s (was %s)", normalizedID, originalID) + // Resource exists, update it using its existing ID to prevent conflicts + log.Printf("Updating resource %s using existing ID %s in database", resource.ID, existingID) + + // Preserve custom configuration if available from existing resource + if resource.Entrypoints == "" && entrypoints != "" { + resource.Entrypoints = entrypoints + } + if resource.TLSDomains == "" && tlsDomains != "" { + resource.TLSDomains = tlsDomains + } + if resource.TCPEntrypoints == "" && tcpEntrypoints != "" { + resource.TCPEntrypoints = tcpEntrypoints + } + if resource.TCPSNIRule == "" && tcpSNIRule != "" { + resource.TCPSNIRule = tcpSNIRule + } + if resource.CustomHeaders == "" && customHeaders != "" { + resource.CustomHeaders = customHeaders + } + if !resource.TCPEnabled && tcpEnabled > 0 { + resource.TCPEnabled = true + } + if resource.RouterPriority == 0 && routerPriority.Valid { + resource.RouterPriority = int(routerPriority.Int64) } - _, err = rw.db.Exec( - "UPDATE resources SET id = ?, host = ?, service_id = ?, status = 'active', source_type = ?, updated_at = ? WHERE id LIKE ?", - updateID, resource.Host, resource.ServiceID, resource.SourceType, time.Now(), - // Use pattern matching for the WHERE clause to catch all variations - strings.Replace(normalizedID, "-router", "-router%", 1), + // Use the existing ID for update to prevent conflicts + _, err = rw.db.Exec(` + UPDATE resources SET + host = ?, + service_id = ?, + status = 'active', + source_type = ?, + entrypoints = ?, + tls_domains = ?, + tcp_enabled = ?, + tcp_entrypoints = ?, + tcp_sni_rule = ?, + custom_headers = ?, + router_priority = ?, + updated_at = ? + WHERE id = ?`, + resource.Host, + resource.ServiceID, + resource.SourceType, + resource.Entrypoints, + resource.TLSDomains, + boolToInt(resource.TCPEnabled), + resource.TCPEntrypoints, + resource.TCPSNIRule, + resource.CustomHeaders, + resource.RouterPriority, + time.Now(), + existingID, ) + if err != nil { - return fmt.Errorf("failed to update resource %s: %w", resource.ID, err) + return fmt.Errorf("failed to update resource %s (using ID %s): %w", resource.ID, existingID, err) } if status == "disabled" { - log.Printf("Resource %s was disabled but is now active again", updateID) + log.Printf("Resource %s was disabled but is now active again", existingID) } return nil @@ -335,41 +405,73 @@ func (rw *ResourceWatcher) updateOrCreateResource(resource models.Resource) erro resource.SiteID = "unknown" } - tcpEnabledValue := 0 - if resource.TCPEnabled { - tcpEnabledValue = 1 - } - - // Use default router priority if not set if resource.RouterPriority == 0 { resource.RouterPriority = 100 // Default priority } - // For new resources, always use the normalized ID to prevent duplication - if wasNormalized { + // For new resources, choose an appropriate ID to prevent future duplications + createID := resource.ID + + // Only normalize in cases where we know there's redundancy + if strings.Contains(resource.ID, "@file@file") || + strings.Count(resource.ID, "-auth") > 1 || + (wasNormalized && !strings.Contains(resource.ID, "@")) { + createID = normalizedID log.Printf("Creating new resource with normalized ID: %s (was %s)", normalizedID, originalID) - resource.ID = normalizedID } - // Create new resource with default configuration + // Additional safeguard - check one more time if the ID we're about to use already exists + var finalIDExists int + err = rw.db.QueryRow("SELECT 1 FROM resources WHERE id = ?", createID).Scan(&finalIDExists) + if err == nil { + // The ID we were going to use already exists! Generate a unique variation + uniqueID := fmt.Sprintf("%s-%d", createID, time.Now().UnixNano() % 10000) + log.Printf("Warning: ID %s already exists, using unique ID %s instead", createID, uniqueID) + createID = uniqueID + } + + // Create new resource with chosen ID + tcpEnabledValue := boolToInt(resource.TCPEnabled) + _, err = rw.db.Exec(` INSERT INTO resources ( id, host, service_id, org_id, site_id, status, source_type, entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, custom_headers, router_priority, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, 'active', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `, resource.ID, resource.Host, resource.ServiceID, resource.OrgID, resource.SiteID, - resource.SourceType, resource.Entrypoints, resource.TLSDomains, tcpEnabledValue, - resource.TCPEntrypoints, resource.TCPSNIRule, resource.CustomHeaders, - resource.RouterPriority, time.Now(), time.Now()) + `, + createID, + resource.Host, + resource.ServiceID, + resource.OrgID, + resource.SiteID, + resource.SourceType, + resource.Entrypoints, + resource.TLSDomains, + tcpEnabledValue, + resource.TCPEntrypoints, + resource.TCPSNIRule, + resource.CustomHeaders, + resource.RouterPriority, + time.Now(), + time.Now(), + ) if err != nil { - return fmt.Errorf("failed to create resource %s: %w", resource.ID, err) + return fmt.Errorf("failed to create resource %s (with ID %s): %w", resource.ID, createID, err) } - log.Printf("Added new resource: %s (%s)", resource.Host, resource.ID) + log.Printf("Added new resource: %s (%s)", resource.Host, createID) return nil } + +// boolToInt converts a boolean to an integer (1 for true, 0 for false) +func boolToInt(b bool) int { + if b { + return 1 + } + return 0 +} // fetchTraefikConfig fetches the Traefik configuration from the data source // This method is kept for backward compatibility with the original implementation func (rw *ResourceWatcher) fetchTraefikConfig(ctx context.Context) (*models.PangolinTraefikConfig, error) { diff --git a/services/service_watcher.go b/services/service_watcher.go index c8d0b82e2..d29f283fb 100644 --- a/services/service_watcher.go +++ b/services/service_watcher.go @@ -214,9 +214,16 @@ func (sw *ServiceWatcher) updateOrCreateService(service models.Service) error { return sw.createService(service) } -// getNormalizedServiceID removes any provider suffixes from service IDs +// getNormalizedServiceID removes redundant provider suffixes from service IDs func getNormalizedServiceID(id string) string { - if idx := strings.Index(id, "@"); idx > 0 { + // Remove any provider suffix but only if it's duplicated + if strings.Contains(id, "@file@file") { + // Handle double @file suffix + if idx := strings.Index(id, "@file"); idx > 0 { + return id[:idx] + "@file" + } + } else if idx := strings.Index(id, "@"); idx > 0 { + // For other cases, just extract the base name return id[:idx] } return id From 3c8c1caec43dd6cca43ed380480d4763425f2109 Mon Sep 17 00:00:00 2001 From: hhftechnologies Date: Tue, 20 May 2025 13:13:23 +0530 Subject: [PATCH 09/10] update --- database/cleanup.go | 376 +++++++++++++++++++++++++++++--- database/transaction.go | 125 ++++++----- main.go | 222 +++++++++---------- services/resource_watcher.go | 400 +++++++++++++++-------------------- services/service_watcher.go | 197 +++++++++++------ util/id_normalizer.go | 82 +++++++ 6 files changed, 925 insertions(+), 477 deletions(-) create mode 100644 util/id_normalizer.go diff --git a/database/cleanup.go b/database/cleanup.go index fff224abf..eac7b3504 100644 --- a/database/cleanup.go +++ b/database/cleanup.go @@ -1,13 +1,41 @@ package database import ( + "database/sql" "fmt" "log" "strings" + "time" + + "github.com/hhftechnology/middleware-manager/util" ) +// CleanupOptions contains options for controlling cleanup operations +type CleanupOptions struct { + DryRun bool // If true, logs what would be done without making changes + LogLevel int // 0=errors only, 1=basic info, 2=verbose + MaxDeleteBatch int // Maximum number of items to delete in one batch + ReapDisabled bool // If true, physically delete disabled resources + RecoverCorrupted bool // If true, attempt to recover corrupted resources +} + +// DefaultCleanupOptions returns the default cleanup options +func DefaultCleanupOptions() CleanupOptions { + return CleanupOptions{ + DryRun: false, + LogLevel: 1, + MaxDeleteBatch: 100, + ReapDisabled: false, + RecoverCorrupted: true, + } +} + // CleanupDuplicateServices removes service duplication from the database -func (db *DB) CleanupDuplicateServices() error { +func (db *DB) CleanupDuplicateServices(opts CleanupOptions) error { + if opts.LogLevel >= 1 { + log.Println("Starting cleanup of duplicate services...") + } + // Get all services rows, err := db.Query("SELECT id, name, type, config FROM services") if err != nil { @@ -31,47 +59,347 @@ func (db *DB) CleanupDuplicateServices() error { return fmt.Errorf("failed to scan service: %w", err) } - // Get base name (without any provider suffix) - baseName := id - if idx := strings.Index(id, "@"); idx > 0 { - baseName = id[:idx] - } + // Get normalized ID + normalizedID := util.NormalizeID(id) - // If we've already seen this base name, mark this as duplicate - if existing, found := uniqueServices[baseName]; found { - // Keep the one with fewer @ symbols (more "canonical") - if strings.Count(existing.ID, "@") > strings.Count(id, "@") { - // The new one is better, update the map and mark the old one for deletion + // If we've already seen this normalized ID, check which one to keep + if existing, found := uniqueServices[normalizedID]; found { + // Determine which one to keep: + // 1. Prefer versions without provider suffixes or with @file suffix + // 2. If both have same suffix type, keep the one with simpler/shorter ID + keepNew := false + + existingHasSuffix := strings.Contains(existing.ID, "@") + newHasSuffix := strings.Contains(id, "@") + + if existingHasSuffix && !newHasSuffix { + // Keep the one without suffix + keepNew = true + } else if !existingHasSuffix && newHasSuffix { + // Keep existing without suffix + keepNew = false + } else if strings.HasSuffix(id, "@file") && !strings.HasSuffix(existing.ID, "@file") { + // Prefer @file suffix + keepNew = true + } else if !strings.HasSuffix(id, "@file") && strings.HasSuffix(existing.ID, "@file") { + // Keep existing with @file + keepNew = false + } else { + // Both have same suffix type, keep the one with simpler ID + if len(existing.ID) > len(id) { + keepNew = true + } + } + + if keepNew { + // The new one is better, mark the old one for deletion + if opts.LogLevel >= 2 { + log.Printf("Duplicate found: keeping %s, will delete %s", id, existing.ID) + } servicesToDelete = append(servicesToDelete, existing.ID) - uniqueServices[baseName] = serviceInfo{id, configStr} + uniqueServices[normalizedID] = serviceInfo{id, configStr} } else { // The existing one is better, mark this one for deletion + if opts.LogLevel >= 2 { + log.Printf("Duplicate found: keeping %s, will delete %s", existing.ID, id) + } servicesToDelete = append(servicesToDelete, id) } } else { - // First time seeing this base name - uniqueServices[baseName] = serviceInfo{id, configStr} + // First time seeing this normalized ID + uniqueServices[normalizedID] = serviceInfo{id, configStr} } } + if err := rows.Err(); err != nil { + return fmt.Errorf("error iterating services: %w", err) + } + + if len(servicesToDelete) == 0 { + if opts.LogLevel >= 1 { + log.Println("No duplicate services found.") + } + return nil + } + + if opts.DryRun { + log.Printf("DRY RUN: Would delete %d duplicate services", len(servicesToDelete)) + for _, id := range servicesToDelete { + log.Printf(" - %s", id) + } + return nil + } + // Delete duplicates in a transaction - tx, err := db.Begin() + return db.WithTransaction(func(tx *sql.Tx) error { + for _, id := range servicesToDelete { + if opts.LogLevel >= 1 { + log.Printf("Deleting duplicate service: %s", id) + } + + // First remove any resource_service references + if _, err := tx.Exec("DELETE FROM resource_services WHERE service_id = ?", id); err != nil { + return fmt.Errorf("failed to delete resource_service references for %s: %w", id, err) + } + + // Then delete the service + if _, err := tx.Exec("DELETE FROM services WHERE id = ?", id); err != nil { + return fmt.Errorf("failed to delete service %s: %w", id, err) + } + } + + if opts.LogLevel >= 1 { + log.Printf("Cleanup complete. Removed %d duplicate services", len(servicesToDelete)) + } + return nil + }) +} + +// CleanupDuplicateResources removes resource duplication from the database +func (db *DB) CleanupDuplicateResources(opts CleanupOptions) error { + if opts.LogLevel >= 1 { + log.Println("Starting cleanup of duplicate resources...") + } + + // Get all resources + rows, err := db.Query("SELECT id, host, service_id, status FROM resources") if err != nil { - return fmt.Errorf("failed to begin transaction: %w", err) + return fmt.Errorf("failed to query resources: %w", err) + } + defer rows.Close() + + // Map to track resources by normalized ID + type resourceInfo struct { + ID string + Host string + ServiceID string + Status string + } + + // Group by host to find multiple resources for the same host + hostMap := make(map[string][]resourceInfo) + + // Process each resource + for rows.Next() { + var id, host, serviceID, status string + if err := rows.Scan(&id, &host, &serviceID, &status); err != nil { + return fmt.Errorf("failed to scan resource: %w", err) + } + + // Add to host map + hostMap[host] = append(hostMap[host], resourceInfo{ + ID: id, + Host: host, + ServiceID: serviceID, + Status: status, + }) } - for _, id := range servicesToDelete { - log.Printf("Deleting duplicate service: %s", id) - if _, err := tx.Exec("DELETE FROM services WHERE id = ?", id); err != nil { - tx.Rollback() - return fmt.Errorf("failed to delete service %s: %w", id, err) + if err := rows.Err(); err != nil { + return fmt.Errorf("error iterating resources: %w", err) + } + + // Find hosts with multiple resources + var resourcesToDelete []string + var resourcesToActivate []string + + for host, resources := range hostMap { + if len(resources) <= 1 { + continue // No duplicates + } + + if opts.LogLevel >= 2 { + log.Printf("Found %d resources for host %s", len(resources), host) + } + + // Sort resources by status (active first) and then by ID complexity + // We'll keep the active one with the simplest ID + activeResources := make([]resourceInfo, 0) + disabledResources := make([]resourceInfo, 0) + + for _, res := range resources { + if res.Status == "active" { + activeResources = append(activeResources, res) + } else { + disabledResources = append(disabledResources, res) + } + } + + // If there are multiple active resources, disable extras + if len(activeResources) > 1 { + // Sort to find the one to keep (prioritize simpler IDs) + bestID := "" + bestIdx := 0 + + for i, res := range activeResources { + normalizedID := util.NormalizeID(res.ID) + + if bestID == "" { + bestID = normalizedID + bestIdx = i + } else { + // Prefer router-auth pattern for consistency + if strings.Contains(normalizedID, "-router-auth") && + !strings.Contains(bestID, "-router-auth") { + bestID = normalizedID + bestIdx = i + } else if !strings.Contains(normalizedID, "-router-auth") && + strings.Contains(bestID, "-router-auth") { + // Keep current best + } else if len(normalizedID) < len(bestID) { + // Prefer shorter/simpler IDs + bestID = normalizedID + bestIdx = i + } + } + } + + // Keep the best one, mark others for deletion + for i, res := range activeResources { + if i != bestIdx { + if opts.LogLevel >= 2 { + log.Printf(" - Will disable duplicate active resource: %s", res.ID) + } + resourcesToDelete = append(resourcesToDelete, res.ID) + } else if opts.LogLevel >= 2 { + log.Printf(" - Keeping active resource: %s", res.ID) + } + } + } else if len(activeResources) == 0 && len(disabledResources) > 0 && opts.RecoverCorrupted { + // No active resources, recover one + bestIdx := 0 + bestID := "" + + for i, res := range disabledResources { + normalizedID := util.NormalizeID(res.ID) + + if bestID == "" { + bestID = normalizedID + bestIdx = i + } else if len(normalizedID) < len(bestID) { + // Prefer shorter/simpler IDs + bestID = normalizedID + bestIdx = i + } + } + + // Activate the best one + if opts.LogLevel >= 2 { + log.Printf(" - Will activate resource: %s", disabledResources[bestIdx].ID) + } + resourcesToActivate = append(resourcesToActivate, disabledResources[bestIdx].ID) + + // If reaping disabled resources, delete the rest + if opts.ReapDisabled { + for i, res := range disabledResources { + if i != bestIdx { + if opts.LogLevel >= 2 { + log.Printf(" - Will delete disabled resource: %s", res.ID) + } + resourcesToDelete = append(resourcesToDelete, res.ID) + } + } + } + } else if opts.ReapDisabled { + // Delete all disabled resources if ReapDisabled is true + for _, res := range disabledResources { + if opts.LogLevel >= 2 { + log.Printf(" - Will delete disabled resource: %s", res.ID) + } + resourcesToDelete = append(resourcesToDelete, res.ID) + } } } - if err := tx.Commit(); err != nil { - return fmt.Errorf("failed to commit transaction: %w", err) + if len(resourcesToDelete) == 0 && len(resourcesToActivate) == 0 { + if opts.LogLevel >= 1 { + log.Println("No resources need cleanup.") + } + return nil + } + + if opts.DryRun { + log.Printf("DRY RUN: Would delete %d resources and activate %d resources", + len(resourcesToDelete), len(resourcesToActivate)) + return nil + } + + // Process changes in a transaction + return db.WithTransaction(func(tx *sql.Tx) error { + // Activate resources that need activation + for _, id := range resourcesToActivate { + if opts.LogLevel >= 1 { + log.Printf("Activating resource: %s", id) + } + + _, err := tx.Exec( + "UPDATE resources SET status = 'active', updated_at = ? WHERE id = ?", + time.Now(), id, + ) + + if err != nil { + return fmt.Errorf("failed to activate resource %s: %w", id, err) + } + } + + // Delete or disable resources + for _, id := range resourcesToDelete { + if opts.ReapDisabled { + // Physically delete the resource + if opts.LogLevel >= 1 { + log.Printf("Deleting resource: %s", id) + } + + // First delete any middleware relationships + if _, err := tx.Exec("DELETE FROM resource_middlewares WHERE resource_id = ?", id); err != nil { + return fmt.Errorf("failed to delete resource_middlewares for %s: %w", id, err) + } + + // Then delete any service relationships + if _, err := tx.Exec("DELETE FROM resource_services WHERE resource_id = ?", id); err != nil { + return fmt.Errorf("failed to delete resource_services for %s: %w", id, err) + } + + // Finally delete the resource + if _, err := tx.Exec("DELETE FROM resources WHERE id = ?", id); err != nil { + return fmt.Errorf("failed to delete resource %s: %w", id, err) + } + } else { + // Just mark as disabled + if opts.LogLevel >= 1 { + log.Printf("Disabling resource: %s", id) + } + + _, err := tx.Exec( + "UPDATE resources SET status = 'disabled', updated_at = ? WHERE id = ?", + time.Now(), id, + ) + + if err != nil { + return fmt.Errorf("failed to disable resource %s: %w", id, err) + } + } + } + + if opts.LogLevel >= 1 { + log.Printf("Resource cleanup complete. Deleted/disabled %d resources, activated %d resources", + len(resourcesToDelete), len(resourcesToActivate)) + } + return nil + }) +} + +// PerformFullCleanup runs a comprehensive cleanup of the database +func (db *DB) PerformFullCleanup(opts CleanupOptions) error { + // First clean up services + if err := db.CleanupDuplicateServices(opts); err != nil { + return fmt.Errorf("service cleanup failed: %w", err) + } + + // Then clean up resources + if err := db.CleanupDuplicateResources(opts); err != nil { + return fmt.Errorf("resource cleanup failed: %w", err) } - log.Printf("Cleanup complete. Removed %d duplicate services", len(servicesToDelete)) return nil } \ No newline at end of file diff --git a/database/transaction.go b/database/transaction.go index 907a991b8..bb21bd1fa 100644 --- a/database/transaction.go +++ b/database/transaction.go @@ -1,9 +1,11 @@ package database import ( + "context" "database/sql" "fmt" "log" + "time" ) // TxFn represents a function that uses a transaction @@ -42,58 +44,81 @@ func (db *DB) WithTransaction(fn TxFn) error { return nil } -// QueryRow executes a query that returns a single row and scans the result into the provided destination -func (db *DB) QueryRowSafe(query string, dest interface{}, args ...interface{}) error { - row := db.QueryRow(query, args...) - if err := row.Scan(dest); err != nil { - if err == sql.ErrNoRows { - return ErrNotFound - } - return fmt.Errorf("scan failed: %w", err) - } - return nil -} - -// ExecSafe executes a statement and returns the result summary -func (db *DB) ExecSafe(query string, args ...interface{}) (sql.Result, error) { - result, err := db.Exec(query, args...) - if err != nil { - return nil, fmt.Errorf("exec failed: %w", err) - } - return result, nil -} - -// CustomError types for database operations -var ( - ErrNotFound = fmt.Errorf("record not found") - ErrDuplicate = fmt.Errorf("duplicate record") - ErrConstraint = fmt.Errorf("constraint violation") -) - -// ExecTx executes a statement within a transaction and returns the result -func ExecTx(tx *sql.Tx, query string, args ...interface{}) (sql.Result, error) { - result, err := tx.Exec(query, args...) - if err != nil { - return nil, fmt.Errorf("exec in transaction failed: %w", err) - } - return result, nil -} - -// GetRowsAffected is a helper to get rows affected from a result -func GetRowsAffected(result sql.Result) (int64, error) { - affected, err := result.RowsAffected() - if err != nil { - return 0, fmt.Errorf("failed to get rows affected: %w", err) +// WithTimeoutTransaction wraps a function with a transaction that has a timeout +func (db *DB) WithTimeoutTransaction(ctx context.Context, timeout time.Duration, fn TxFn) error { + // Create a context with timeout + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + // Create a done channel to signal completion + done := make(chan error, 1) + + // Run the transaction in a goroutine + go func() { + done <- db.WithTransaction(fn) + }() + + // Wait for either context timeout or transaction completion + select { + case <-ctx.Done(): + // Context timed out + return fmt.Errorf("transaction timed out after %v: %w", timeout, ctx.Err()) + case err := <-done: + // Transaction completed + return err } - return affected, nil } -// GetLastInsertID is a helper to get last insert ID from a result -func GetLastInsertID(result sql.Result) (int64, error) { - id, err := result.LastInsertId() - if err != nil { - return 0, fmt.Errorf("failed to get last insert ID: %w", err) - } - return id, nil +// BatchTransaction executes multiple operations in a single transaction +// All operations must succeed or the transaction is rolled back +func (db *DB) BatchTransaction(operations []TxFn) error { + return db.WithTransaction(func(tx *sql.Tx) error { + for i, op := range operations { + if err := op(tx); err != nil { + return fmt.Errorf("operation %d failed: %w", i, err) + } + } + return nil + }) } +// UpdateInTransaction updates a record in a transaction +func (db *DB) UpdateInTransaction(table string, id string, updates map[string]interface{}) error { + return db.WithTransaction(func(tx *sql.Tx) error { + // Build the update statement + query := fmt.Sprintf("UPDATE %s SET ", table) + var params []interface{} + + i := 0 + for field, value := range updates { + if i > 0 { + query += ", " + } + query += field + " = ?" + params = append(params, value) + i++ + } + + // Add the WHERE clause and updated_at + query += ", updated_at = ? WHERE id = ?" + params = append(params, time.Now(), id) + + // Execute the update + result, err := tx.Exec(query, params...) + if err != nil { + return fmt.Errorf("update failed: %w", err) + } + + // Check if any rows were affected + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rowsAffected == 0 { + return fmt.Errorf("no rows affected, record with ID %s not found", id) + } + + return nil + }) +} \ No newline at end of file diff --git a/main.go b/main.go index 740d3ceb5..df46b4286 100644 --- a/main.go +++ b/main.go @@ -82,115 +82,119 @@ func DiscoverTraefikAPI() (string, error) { } func main() { - log.Println("Starting Middleware Manager...") - - var debug bool - flag.BoolVar(&debug, "debug", false, "Enable debug mode") - flag.Parse() - - cfg := loadConfiguration(debug) - - if os.Getenv("TRAEFIK_API_URL") == "" { - if discoveredURL, err := DiscoverTraefikAPI(); err == nil && discoveredURL != "" { - log.Printf("Auto-discovered Traefik API URL: %s", discoveredURL) - cfg.TraefikAPIURL = discoveredURL - } - } - - db, err := database.InitDB(cfg.DBPath) - if err != nil { - log.Fatalf("Failed to initialize database: %v", err) - } - defer db.Close() - - configDir := cfg.ConfigDir - if err := config.EnsureConfigDirectory(configDir); err != nil { - log.Printf("Warning: Failed to create config directory: %v", err) - } - - if err := config.SaveTemplateFile(configDir); err != nil { - log.Printf("Warning: Failed to save default middleware templates: %v", err) - } - - if err := config.LoadDefaultTemplates(db); err != nil { - log.Printf("Warning: Failed to load default middleware templates: %v", err) - } - - if err := config.SaveTemplateServicesFile(configDir); err != nil { - log.Printf("Warning: Failed to save default service templates: %v", err) - } - - if err := config.LoadDefaultServiceTemplates(db); err != nil { - log.Printf("Warning: Failed to load default service templates: %v", err) - } - log.Println("Cleaning up duplicate services...") -if err := db.CleanupDuplicateServices(); err != nil { - log.Printf("Warning: Failed to clean up duplicate services: %v", err) -} else { - log.Println("Service cleanup completed successfully") -} - - - configManager, err := services.NewConfigManager(filepath.Join(configDir, "config.json")) - if err != nil { - log.Fatalf("Failed to initialize config manager: %v", err) - } - - configManager.EnsureDefaultDataSources(cfg.PangolinAPIURL, cfg.TraefikAPIURL) - - stopChan := make(chan struct{}) - - resourceWatcher, err := services.NewResourceWatcher(db, configManager) - if err != nil { - log.Fatalf("Failed to create resource watcher: %v", err) - } - go resourceWatcher.Start(cfg.CheckInterval) - - configGenerator := services.NewConfigGenerator(db, cfg.TraefikConfDir, configManager) - go configGenerator.Start(cfg.GenerateInterval) - - serverConfig := api.ServerConfig{ - Port: cfg.Port, - UIPath: cfg.UIPath, - Debug: cfg.Debug, - AllowCORS: cfg.AllowCORS, - CORSOrigin: cfg.CORSOrigin, - } - - server := api.NewServer(db.DB, serverConfig, configManager, cfg.TraefikStaticConfigPath, cfg.PluginsJSONURL) - go func() { - if err := server.Start(); err != nil { - log.Printf("Server error: %v", err) - close(stopChan) - } - }() - - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) - - serviceWatcher, err := services.NewServiceWatcher(db, configManager) - if err != nil { - log.Printf("Warning: Failed to create service watcher: %v", err) - serviceWatcher = nil - } else { - go serviceWatcher.Start(cfg.ServiceInterval) - } - - select { - case <-signalChan: - log.Println("Received shutdown signal") - case <-stopChan: - log.Println("Received stop signal from server") - } - - log.Println("Shutting down...") - resourceWatcher.Stop() - if serviceWatcher != nil { - serviceWatcher.Stop() - } - configGenerator.Stop() - server.Stop() - log.Println("Middleware Manager stopped") + log.Println("Starting Middleware Manager...") + + var debug bool + flag.BoolVar(&debug, "debug", false, "Enable debug mode") + flag.Parse() + + cfg := loadConfiguration(debug) + + if os.Getenv("TRAEFIK_API_URL") == "" { + if discoveredURL, err := DiscoverTraefikAPI(); err == nil && discoveredURL != "" { + log.Printf("Auto-discovered Traefik API URL: %s", discoveredURL) + cfg.TraefikAPIURL = discoveredURL + } + } + + db, err := database.InitDB(cfg.DBPath) + if err != nil { + log.Fatalf("Failed to initialize database: %v", err) + } + defer db.Close() + + configDir := cfg.ConfigDir + if err := config.EnsureConfigDirectory(configDir); err != nil { + log.Printf("Warning: Failed to create config directory: %v", err) + } + + if err := config.SaveTemplateFile(configDir); err != nil { + log.Printf("Warning: Failed to save default middleware templates: %v", err) + } + + if err := config.LoadDefaultTemplates(db); err != nil { + log.Printf("Warning: Failed to load default middleware templates: %v", err) + } + + if err := config.SaveTemplateServicesFile(configDir); err != nil { + log.Printf("Warning: Failed to save default service templates: %v", err) + } + + if err := config.LoadDefaultServiceTemplates(db); err != nil { + log.Printf("Warning: Failed to load default service templates: %v", err) + } + + // Run comprehensive database cleanup on startup + log.Println("Performing full database cleanup...") + cleanupOpts := database.DefaultCleanupOptions() + cleanupOpts.LogLevel = 2 // More verbose logging during startup + + if err := db.PerformFullCleanup(cleanupOpts); err != nil { + log.Printf("Warning: Database cleanup encountered issues: %v", err) + } else { + log.Println("Database cleanup completed successfully") + } + + configManager, err := services.NewConfigManager(filepath.Join(configDir, "config.json")) + if err != nil { + log.Fatalf("Failed to initialize config manager: %v", err) + } + + configManager.EnsureDefaultDataSources(cfg.PangolinAPIURL, cfg.TraefikAPIURL) + + stopChan := make(chan struct{}) + + resourceWatcher, err := services.NewResourceWatcher(db, configManager) + if err != nil { + log.Fatalf("Failed to create resource watcher: %v", err) + } + go resourceWatcher.Start(cfg.CheckInterval) + + configGenerator := services.NewConfigGenerator(db, cfg.TraefikConfDir, configManager) + go configGenerator.Start(cfg.GenerateInterval) + + serverConfig := api.ServerConfig{ + Port: cfg.Port, + UIPath: cfg.UIPath, + Debug: cfg.Debug, + AllowCORS: cfg.AllowCORS, + CORSOrigin: cfg.CORSOrigin, + } + + server := api.NewServer(db.DB, serverConfig, configManager, cfg.TraefikStaticConfigPath, cfg.PluginsJSONURL) + go func() { + if err := server.Start(); err != nil { + log.Printf("Server error: %v", err) + close(stopChan) + } + }() + + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) + + serviceWatcher, err := services.NewServiceWatcher(db, configManager) + if err != nil { + log.Printf("Warning: Failed to create service watcher: %v", err) + serviceWatcher = nil + } else { + go serviceWatcher.Start(cfg.ServiceInterval) + } + + select { + case <-signalChan: + log.Println("Received shutdown signal") + case <-stopChan: + log.Println("Received stop signal from server") + } + + log.Println("Shutting down...") + resourceWatcher.Stop() + if serviceWatcher != nil { + serviceWatcher.Stop() + } + configGenerator.Stop() + server.Stop() + log.Println("Middleware Manager stopped") } func loadConfiguration(debug bool) Configuration { diff --git a/services/resource_watcher.go b/services/resource_watcher.go index e362ba71a..3fb957be6 100644 --- a/services/resource_watcher.go +++ b/services/resource_watcher.go @@ -14,6 +14,7 @@ import ( "github.com/hhftechnology/middleware-manager/database" "github.com/hhftechnology/middleware-manager/models" + "github.com/hhftechnology/middleware-manager/util" ) // ResourceWatcher watches for resources using configured data source @@ -169,6 +170,8 @@ func (rw *ResourceWatcher) checkResources() error { return nil } + // Build a map of normalized IDs to original resources + normalizedMap := make(map[string]models.Resource) // Process resources for _, resource := range resources.Resources { // Skip invalid resources @@ -176,6 +179,9 @@ func (rw *ResourceWatcher) checkResources() error { continue } + normalizedID := util.NormalizeID(resource.ID) + normalizedMap[normalizedID] = resource + // Process resource if err := rw.updateOrCreateResource(resource); err != nil { log.Printf("Error processing resource %s: %v", resource.ID, err) @@ -183,13 +189,14 @@ func (rw *ResourceWatcher) checkResources() error { continue } - // Mark this resource as found - foundResources[resource.ID] = true + // Mark this resource as found (using normalized ID) + foundResources[normalizedID] = true } // Mark resources as disabled if they no longer exist in the data source for _, resourceID := range existingResources { - if !foundResources[resourceID] { + normalizedID := util.NormalizeID(resourceID) + if !foundResources[normalizedID] { log.Printf("Resource %s no longer exists, marking as disabled", resourceID) _, err := rw.db.Exec( "UPDATE resources SET status = 'disabled', updated_at = ? WHERE id = ?", @@ -204,195 +211,109 @@ func (rw *ResourceWatcher) checkResources() error { return nil } -// updateOrCreateResource updates an existing resource or creates a new one -// normalizeResourceID removes cascading auth suffixes and provider suffixes from resource IDs -// normalizeResourceID removes redundant cascading auth suffixes but preserves essential distinctions -func normalizeResourceID(id string) string { - // Extract the base name (everything before the first @) - baseName := id - if idx := strings.Index(baseName, "@"); idx > 0 { - baseName = baseName[:idx] - } - - // If not a router resource, return as is - if !strings.Contains(baseName, "-router") { - return baseName - } - - // Handle cascading auth patterns more carefully - // Replace multiple consecutive -auth suffixes with just one -auth - // but preserve redirect and other important suffixes - parts := strings.Split(baseName, "-router") - if len(parts) != 2 { - return baseName // Unexpected format - } - - prefix := parts[0] // e.g., "api" - suffix := parts[1] // e.g., "-auth-auth-redirect" - - // Process the suffix to preserve distinctive parts - if strings.Contains(suffix, "-auth") { - // Preserve redirect component if present - redirectPart := "" - if strings.Contains(suffix, "-redirect") { - redirectParts := strings.Split(suffix, "-redirect") - suffix = redirectParts[0] - redirectPart = "-redirect" - } - - // Replace multiple auth suffixes with single auth - for strings.Contains(suffix, "-auth-auth") { - suffix = strings.Replace(suffix, "-auth-auth", "-auth", 1) - } - - // Reconstruct with preserved components - return prefix + "-router" + suffix + redirectPart - } - - return baseName -} - -// updateOrCreateResource updates an existing resource or creates a new one - -// updateOrCreateResource updates an existing resource or creates a new one // updateOrCreateResource updates an existing resource or creates a new one func (rw *ResourceWatcher) updateOrCreateResource(resource models.Resource) error { - // Normalize the resource ID to handle cascading auth suffixes - normalizedID := normalizeResourceID(resource.ID) + // Use our centralized normalization function + normalizedID := util.NormalizeID(resource.ID) // For logging purposes, keep track if we normalized the ID originalID := resource.ID wasNormalized := normalizedID != originalID - // Check if resource already exists with either the original or normalized ID + if wasNormalized { + log.Printf("Normalized resource ID from %s to %s", originalID, normalizedID) + } + + // First try exact match with the normalized ID var exists int - var existingID string // Store the actual ID found in the database var status string var entrypoints, tlsDomains, tcpEntrypoints, tcpSNIRule, customHeaders string var tcpEnabled int var routerPriority sql.NullInt64 - // First try exact match with the original ID err := rw.db.QueryRow(` - SELECT id, 1, status, entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, + SELECT 1, status, entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, custom_headers, router_priority FROM resources WHERE id = ? - `, resource.ID).Scan(&existingID, &exists, &status, &entrypoints, &tlsDomains, &tcpEnabled, + `, normalizedID).Scan(&exists, &status, &entrypoints, &tlsDomains, &tcpEnabled, &tcpEntrypoints, &tcpSNIRule, &customHeaders, &routerPriority) - // If not found by original ID, try normalized ID - if err == sql.ErrNoRows { + if err == nil { + // Resource exists with normalized ID, update it + return rw.updateExistingResource(normalizedID, resource, status) + } + + // If not found with normalized ID, try with original ID + if normalizedID != originalID { err = rw.db.QueryRow(` - SELECT id, 1, status, entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, + SELECT 1, status, entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, custom_headers, router_priority FROM resources WHERE id = ? - `, normalizedID).Scan(&existingID, &exists, &status, &entrypoints, &tlsDomains, &tcpEnabled, - &tcpEntrypoints, &tcpSNIRule, &customHeaders, &routerPriority) - } - - // If still not found and ID was normalized, try with more precise pattern matching - if err == sql.ErrNoRows && wasNormalized && strings.Contains(normalizedID, "-router") { - parts := strings.Split(normalizedID, "-router") - if len(parts) == 2 { - // Create a more specific pattern that includes the prefix to avoid unrelated matches - prefix := parts[0] - suffix := parts[1] - - // If there's a specific suffix like "-redirect", create an even more precise pattern - var pattern string - if strings.Contains(suffix, "-redirect") { - pattern = prefix + "-router%-redirect%" - } else if strings.Contains(suffix, "-auth") { - pattern = prefix + "-router%-auth%" - } else { - pattern = prefix + "-router%" - } - - log.Printf("Trying to find resource with more specific pattern: %s", pattern) - err = rw.db.QueryRow(` - SELECT id, 1, status, entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, - custom_headers, router_priority - FROM resources WHERE id LIKE ? LIMIT 1 - `, pattern).Scan(&existingID, &exists, &status, &entrypoints, &tlsDomains, &tcpEnabled, - &tcpEntrypoints, &tcpSNIRule, &customHeaders, &routerPriority) - - // If found via pattern match, log it for debugging - if err == nil { - log.Printf("Found resource via precise pattern matching: %s matches pattern %s", - resource.ID, pattern) - } + `, originalID).Scan(&exists, &status, &entrypoints, &tlsDomains, &tcpEnabled, + &tcpEntrypoints, &tcpSNIRule, &customHeaders, &routerPriority) + + if err == nil { + // Resource exists with original ID, update it + return rw.updateExistingResource(originalID, resource, status) } } + // If still not found, try to find a resource with a similar normalized pattern + var existingID string + err = rw.db.QueryRow(` + SELECT id FROM resources + WHERE id LIKE ? OR id LIKE ? + LIMIT 1 + `, normalizedID+"%", originalID+"%").Scan(&existingID) + if err == nil { - // Resource exists, update it using its existing ID to prevent conflicts - log.Printf("Updating resource %s using existing ID %s in database", resource.ID, existingID) + // Found a similar resource + log.Printf("Found resource via pattern matching: %s matches pattern %s", + existingID, normalizedID+"%") - // Preserve custom configuration if available from existing resource - if resource.Entrypoints == "" && entrypoints != "" { - resource.Entrypoints = entrypoints - } - if resource.TLSDomains == "" && tlsDomains != "" { - resource.TLSDomains = tlsDomains - } - if resource.TCPEntrypoints == "" && tcpEntrypoints != "" { - resource.TCPEntrypoints = tcpEntrypoints - } - if resource.TCPSNIRule == "" && tcpSNIRule != "" { - resource.TCPSNIRule = tcpSNIRule - } - if resource.CustomHeaders == "" && customHeaders != "" { - resource.CustomHeaders = customHeaders - } - if !resource.TCPEnabled && tcpEnabled > 0 { - resource.TCPEnabled = true - } - if resource.RouterPriority == 0 && routerPriority.Valid { - resource.RouterPriority = int(routerPriority.Int64) + // Get its status + err = rw.db.QueryRow("SELECT status FROM resources WHERE id = ?", + existingID).Scan(&status) + + if err == nil { + // Update the resource using the existing ID + return rw.updateExistingResource(existingID, resource, status) } + } + + // No existing resource found, create a new one + return rw.createNewResource(resource, normalizedID, wasNormalized) +} + +// updateExistingResource updates an existing resource by ID +func (rw *ResourceWatcher) updateExistingResource(id string, resource models.Resource, status string) error { + // Use a transaction for the update + return rw.db.WithTransaction(func(tx *sql.Tx) error { + log.Printf("Updating resource %s using existing ID %s in database", resource.ID, id) - // Use the existing ID for update to prevent conflicts - _, err = rw.db.Exec(` - UPDATE resources SET - host = ?, - service_id = ?, - status = 'active', - source_type = ?, - entrypoints = ?, - tls_domains = ?, - tcp_enabled = ?, - tcp_entrypoints = ?, - tcp_sni_rule = ?, - custom_headers = ?, - router_priority = ?, - updated_at = ? - WHERE id = ?`, - resource.Host, - resource.ServiceID, - resource.SourceType, - resource.Entrypoints, - resource.TLSDomains, - boolToInt(resource.TCPEnabled), - resource.TCPEntrypoints, - resource.TCPSNIRule, - resource.CustomHeaders, - resource.RouterPriority, - time.Now(), - existingID, - ) + // Update essential fields but preserve custom configuration + _, err := tx.Exec(` + UPDATE resources + SET host = ?, service_id = ?, status = 'active', + source_type = ?, updated_at = ? + WHERE id = ? + `, resource.Host, resource.ServiceID, resource.SourceType, time.Now(), id) if err != nil { - return fmt.Errorf("failed to update resource %s (using ID %s): %w", resource.ID, existingID, err) + return fmt.Errorf("failed to update resource %s: %w", id, err) } if status == "disabled" { - log.Printf("Resource %s was disabled but is now active again", existingID) + log.Printf("Resource %s was disabled but is now active again", id) } return nil - } + }) +} - // Handle default values for new resources +// createNewResource creates a new resource in the database +func (rw *ResourceWatcher) createNewResource(resource models.Resource, normalizedID string, wasNormalized bool) error { + // Set default values for new resources if resource.Entrypoints == "" { resource.Entrypoints = "websecure" } @@ -405,75 +326,78 @@ func (rw *ResourceWatcher) updateOrCreateResource(resource models.Resource) erro resource.SiteID = "unknown" } - if resource.RouterPriority == 0 { - resource.RouterPriority = 100 // Default priority + tcpEnabledValue := 0 + if resource.TCPEnabled { + tcpEnabledValue = 1 } - // For new resources, choose an appropriate ID to prevent future duplications - createID := resource.ID - - // Only normalize in cases where we know there's redundancy - if strings.Contains(resource.ID, "@file@file") || - strings.Count(resource.ID, "-auth") > 1 || - (wasNormalized && !strings.Contains(resource.ID, "@")) { - createID = normalizedID - log.Printf("Creating new resource with normalized ID: %s (was %s)", normalizedID, originalID) + // Use default router priority if not set + if resource.RouterPriority == 0 { + resource.RouterPriority = 100 // Default priority } - // Additional safeguard - check one more time if the ID we're about to use already exists - var finalIDExists int - err = rw.db.QueryRow("SELECT 1 FROM resources WHERE id = ?", createID).Scan(&finalIDExists) - if err == nil { - // The ID we were going to use already exists! Generate a unique variation - uniqueID := fmt.Sprintf("%s-%d", createID, time.Now().UnixNano() % 10000) - log.Printf("Warning: ID %s already exists, using unique ID %s instead", createID, uniqueID) - createID = uniqueID - } - - // Create new resource with chosen ID - tcpEnabledValue := boolToInt(resource.TCPEnabled) - - _, err = rw.db.Exec(` - INSERT INTO resources ( - id, host, service_id, org_id, site_id, status, source_type, - entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, - custom_headers, router_priority, created_at, updated_at - ) VALUES (?, ?, ?, ?, ?, 'active', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `, - createID, - resource.Host, - resource.ServiceID, - resource.OrgID, - resource.SiteID, - resource.SourceType, - resource.Entrypoints, - resource.TLSDomains, - tcpEnabledValue, - resource.TCPEntrypoints, - resource.TCPSNIRule, - resource.CustomHeaders, - resource.RouterPriority, - time.Now(), - time.Now(), - ) - - if err != nil { - return fmt.Errorf("failed to create resource %s (with ID %s): %w", resource.ID, createID, err) - } - - log.Printf("Added new resource: %s (%s)", resource.Host, createID) - return nil + // Use a transaction for the insert + return rw.db.WithTransaction(func(tx *sql.Tx) error { + // For new resources, always use the normalized ID to prevent duplication + resourceID := resource.ID + if wasNormalized { + log.Printf("Creating new resource with normalized ID: %s (was %s)", normalizedID, resource.ID) + resourceID = normalizedID + } + + // Try to create with the ideal ID first + log.Printf("Adding new resource: %s (%s)", resource.Host, resourceID) + + result, err := tx.Exec(` + INSERT INTO resources ( + id, host, service_id, org_id, site_id, status, source_type, + entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, + custom_headers, router_priority, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, 'active', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, resourceID, resource.Host, resource.ServiceID, resource.OrgID, resource.SiteID, + resource.SourceType, resource.Entrypoints, resource.TLSDomains, tcpEnabledValue, + resource.TCPEntrypoints, resource.TCPSNIRule, resource.CustomHeaders, + resource.RouterPriority, time.Now(), time.Now()) + + if err != nil { + // Check if it's a duplicate key error + if strings.Contains(err.Error(), "UNIQUE constraint") { + // Try with a different ID format (append -auth if it's a router) + if strings.Contains(resourceID, "-router") && !strings.Contains(resourceID, "-auth") { + alternativeID := resourceID + "-auth" + log.Printf("Encountered duplicate, trying alternative ID: %s", alternativeID) + + result, err = tx.Exec(` + INSERT INTO resources ( + id, host, service_id, org_id, site_id, status, source_type, + entrypoints, tls_domains, tcp_enabled, tcp_entrypoints, tcp_sni_rule, + custom_headers, router_priority, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, 'active', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, alternativeID, resource.Host, resource.ServiceID, resource.OrgID, resource.SiteID, + resource.SourceType, resource.Entrypoints, resource.TLSDomains, tcpEnabledValue, + resource.TCPEntrypoints, resource.TCPSNIRule, resource.CustomHeaders, + resource.RouterPriority, time.Now(), time.Now()) + + if err != nil { + return fmt.Errorf("failed to create resource with alternative ID %s: %w", alternativeID, err) + } + + log.Printf("Added new resource with alternative ID: %s (%s)", resource.Host, alternativeID) + return nil + } + + return fmt.Errorf("failed to create resource due to ID conflict: %w", err) + } + + return fmt.Errorf("failed to create resource %s: %w", resourceID, err) + } + + log.Printf("Added new resource: %s (%s)", resource.Host, resourceID) + return nil + }) } -// boolToInt converts a boolean to an integer (1 for true, 0 for false) -func boolToInt(b bool) int { - if b { - return 1 - } - return 0 -} // fetchTraefikConfig fetches the Traefik configuration from the data source -// This method is kept for backward compatibility with the original implementation func (rw *ResourceWatcher) fetchTraefikConfig(ctx context.Context) (*models.PangolinTraefikConfig, error) { // Get the active data source config dsConfig, err := rw.configManager.GetActiveDataSourceConfig() @@ -535,20 +459,44 @@ func (rw *ResourceWatcher) fetchTraefikConfig(ctx context.Context) (*models.Pang return &config, nil } -// isSystemRouterForResourceWatcher checks if a router is a system router (to be skipped) -// This is renamed to prevent collision with the function in pangolin_fetcher.go -func isSystemRouterForResourceWatcher(routerID string) bool { +// isSystemRouter checks if a router is a system router (to be skipped) +func isSystemRouter(routerID string) bool { systemPrefixes := []string{ - "api-router", - "next-router", - "ws-router", - "dashboard", "api@internal", - "acme-http", + "dashboard@internal", + "acme-http@internal", + "noop@internal", } + // Check exact internal system routers for _, prefix := range systemPrefixes { - if strings.Contains(routerID, prefix) { + if routerID == prefix { + return true + } + } + + // Allow user routers with these patterns + userPatterns := []string{ + "api-router@file", + "next-router@file", + "ws-router@file", + } + + for _, pattern := range userPatterns { + if strings.Contains(routerID, pattern) { + return false + } + } + + // Check other system prefixes + otherSystemPrefixes := []string{ + "api@", + "dashboard@", + "traefik@", + } + + for _, prefix := range otherSystemPrefixes { + if strings.HasPrefix(routerID, prefix) { return true } } diff --git a/services/service_watcher.go b/services/service_watcher.go index d29f283fb..4f222e5a1 100644 --- a/services/service_watcher.go +++ b/services/service_watcher.go @@ -11,6 +11,7 @@ import ( "github.com/hhftechnology/middleware-manager/database" "github.com/hhftechnology/middleware-manager/models" + "github.com/hhftechnology/middleware-manager/util" ) // ServiceWatcher watches for services using configured data source @@ -155,6 +156,14 @@ func (sw *ServiceWatcher) checkServices() error { continue } + // Get active data source for context + dsConfig, _ := sw.configManager.GetActiveDataSourceConfig() + + // Determine source type for tracking + if service.SourceType == "" { + service.SourceType = string(dsConfig.Type) + } + // Process service if err := sw.updateOrCreateService(service); err != nil { log.Printf("Error processing service %s: %v", service.ID, err) @@ -162,15 +171,17 @@ func (sw *ServiceWatcher) checkServices() error { continue } - // Mark this service as found - foundServices[service.ID] = true + // Mark normalized version of this service as found + normalizedID := util.NormalizeID(service.ID) + foundServices[normalizedID] = true } // Optionally, mark services as "inactive" if they no longer exist in the data source // This is commented out by default to avoid deleting user-created services /* for _, serviceID := range existingServices { - if !foundServices[serviceID] { + normalizedID := util.NormalizeID(serviceID) + if !foundServices[normalizedID] { log.Printf("Service %s no longer exists in data source, consider marking as inactive", serviceID) // Optional: You could update a status field if you add one to the services table // _, err := sw.db.Exec("UPDATE services SET status = 'inactive' WHERE id = ?", serviceID) @@ -181,26 +192,26 @@ func (sw *ServiceWatcher) checkServices() error { return nil } -// updateOrCreateService updates an existing service or creates a new one // updateOrCreateService updates an existing service or creates a new one func (sw *ServiceWatcher) updateOrCreateService(service models.Service) error { - // Normalize service ID by removing additional provider suffixes - normalizedID := getNormalizedServiceID(service.ID) + // Use our centralized normalization function + normalizedID := util.NormalizeID(service.ID) + originalID := service.ID - // Check if service already exists using both original and normalized IDs + // Check if service already exists using normalized ID var exists int var existingType, existingConfig string err := sw.db.QueryRow( - "SELECT 1, type, config FROM services WHERE id = ? OR id LIKE ?", - service.ID, normalizedID+"@%", + "SELECT 1, type, config FROM services WHERE id = ?", + normalizedID, ).Scan(&exists, &existingType, &existingConfig) if err == nil { // Service exists, only update if it changed - if shouldUpdateService(sw.db, service) { - log.Printf("Updating existing service: %s (normalized from %s)", normalizedID, service.ID) - return sw.updateService(service) + if shouldUpdateService(sw.db, service, normalizedID) { + log.Printf("Updating existing service: %s (normalized from %s)", normalizedID, originalID) + return sw.updateService(service, normalizedID) } // Service exists and hasn't changed, skip update return nil @@ -209,38 +220,50 @@ func (sw *ServiceWatcher) updateOrCreateService(service models.Service) error { return fmt.Errorf("error checking if service exists: %w", err) } - // Service doesn't exist, create it with normalized ID - service.ID = normalizedID - return sw.createService(service) -} - -// getNormalizedServiceID removes redundant provider suffixes from service IDs -func getNormalizedServiceID(id string) string { - // Remove any provider suffix but only if it's duplicated - if strings.Contains(id, "@file@file") { - // Handle double @file suffix - if idx := strings.Index(id, "@file"); idx > 0 { - return id[:idx] + "@file" + // Try checking if service exists with different provider suffixes + var found bool + err = sw.db.QueryRow( + "SELECT 1 FROM services WHERE id LIKE ?", + normalizedID+"%", + ).Scan(&exists) + + if err == nil { + // Found a service with this base name but different suffix + found = true + var altID string + err = sw.db.QueryRow( + "SELECT id FROM services WHERE id LIKE ? LIMIT 1", + normalizedID+"%", + ).Scan(&altID) + + if err == nil { + log.Printf("Found existing service with different suffix: %s - will update", altID) + return sw.updateService(service, altID) } - } else if idx := strings.Index(id, "@"); idx > 0 { - // For other cases, just extract the base name - return id[:idx] } - return id + + if !found { + // Service doesn't exist with any suffix, create it + service.ID = normalizedID + return sw.createService(service) + } + + // This shouldn't be reached, but just in case + return nil } // shouldUpdateService determines if an existing service needs to be updated -func shouldUpdateService(db *database.DB, newService models.Service) bool { +func shouldUpdateService(db *database.DB, newService models.Service, normalizedID string) bool { var existingType, existingConfig string err := db.QueryRow( "SELECT type, config FROM services WHERE id = ?", - newService.ID, + normalizedID, ).Scan(&existingType, &existingConfig) if err != nil { // If there's an error, assume we should update - log.Printf("Error checking existing service %s: %v", newService.ID, err) + log.Printf("Error checking existing service %s: %v", normalizedID, err) return true } @@ -255,12 +278,12 @@ func shouldUpdateService(db *database.DB, newService models.Service) bool { var newConfigMap map[string]interface{} if err := json.Unmarshal([]byte(existingConfig), &existingConfigMap); err != nil { - log.Printf("Error parsing existing config for %s: %v", newService.ID, err) + log.Printf("Error parsing existing config for %s: %v", normalizedID, err) return true } if err := json.Unmarshal([]byte(newService.Config), &newConfigMap); err != nil { - log.Printf("Error parsing new config for %s: %v", newService.ID, err) + log.Printf("Error parsing new config for %s: %v", normalizedID, err) return true } @@ -298,6 +321,10 @@ func configsAreDifferent(config1, config2 map[string]interface{}) bool { // Compare each server for i, server1 := range servers1 { + if i >= len(servers2) { + return true + } + server1Map, ok1 := server1.(map[string]interface{}) server2Map, ok2 := servers2[i].(map[string]interface{}) @@ -349,16 +376,12 @@ func configsAreDifferent(config1, config2 map[string]interface{}) bool { return true } } - - // We don't do deep comparison of nested structures like healthCheck - // If we need to be more precise, we could expand this function } } return false } -// createService creates a new service in the database // createService creates a new service in the database func (sw *ServiceWatcher) createService(service models.Service) error { // Validate service type @@ -401,36 +424,64 @@ func (sw *ServiceWatcher) createService(service models.Service) error { service.Name = formatServiceName(service.ID) } - // Make sure we're not adding @file if the ID already has a provider - serviceID := service.ID - if !strings.Contains(serviceID, "@") { - serviceID = serviceID + "@file" // Only add @file if no provider exists + // Get active data source to determine provider suffix + dsConfig, err := sw.configManager.GetActiveDataSourceConfig() + if err != nil { + log.Printf("Warning: Could not get active data source: %v. Using default file provider.", err) + dsConfig.Type = models.PangolinAPI } - log.Printf("Creating new service: %s (original ID: %s)", serviceID, service.ID) - - // Insert the service - _, err = sw.db.Exec( - "INSERT INTO services (id, name, type, config, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)", - serviceID, service.Name, service.Type, string(configJSON), time.Now(), time.Now(), - ) - - if err != nil { - return fmt.Errorf("failed to insert service %s: %w", service.ID, err) + // Determine the appropriate provider suffix based on context + providerSuffix := "@file" + if !strings.Contains(service.ID, "@") { + // Only add a suffix if one doesn't already exist + service.ID = service.ID + providerSuffix } - log.Printf("Created new service: %s (%s)", service.Name, serviceID) - return nil + // Use a database transaction for insert + return sw.db.WithTransaction(func(tx *sql.Tx) error { + log.Printf("Creating new service: %s", service.ID) + + // Check for existing service one more time within transaction + var exists int + err := tx.QueryRow("SELECT 1 FROM services WHERE id = ?", service.ID).Scan(&exists) + if err == nil { + // Service exists, silently skip + return nil + } else if err != sql.ErrNoRows { + // Unexpected error + return fmt.Errorf("error checking service existence in transaction: %w", err) + } + + // Insert the service + _, err = tx.Exec( + "INSERT INTO services (id, name, type, config, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)", + service.ID, service.Name, service.Type, string(configJSON), time.Now(), time.Now(), + ) + + if err != nil { + // Check if it's a duplicate key error + if strings.Contains(err.Error(), "UNIQUE constraint") { + // Log but don't return error to continue processing other services + log.Printf("Service %s already exists, skipping", service.ID) + return nil + } + return fmt.Errorf("failed to insert service %s: %w", service.ID, err) + } + + log.Printf("Created new service: %s", service.ID) + return nil + }) } // updateService updates an existing service in the database -func (sw *ServiceWatcher) updateService(service models.Service) error { +func (sw *ServiceWatcher) updateService(service models.Service, existingID string) error { // Get the existing service to preserve the name var existingName string - err := sw.db.QueryRow("SELECT name FROM services WHERE id = ?", service.ID).Scan(&existingName) + err := sw.db.QueryRow("SELECT name FROM services WHERE id = ?", existingID).Scan(&existingName) if err != nil { - log.Printf("Error fetching existing service name for %s: %v, using provided name", service.ID, err) + log.Printf("Error fetching existing service name for %s: %v, using provided name", existingID, err) } else if existingName != "" { // Preserve existing name unless the new name is meaningful if service.Name == service.ID || service.Name == "" { @@ -455,18 +506,28 @@ func (sw *ServiceWatcher) updateService(service models.Service) error { configJSON = []byte("{}") } - // Update the service - _, err = sw.db.Exec( - "UPDATE services SET name = ?, type = ?, config = ?, updated_at = ? WHERE id = ?", - service.Name, service.Type, string(configJSON), time.Now(), service.ID, - ) - - if err != nil { - return fmt.Errorf("failed to update service %s: %w", service.ID, err) - } - - log.Printf("Updated existing service: %s (%s)", service.Name, service.ID) - return nil + // Update the service using a transaction + return sw.db.WithTransaction(func(tx *sql.Tx) error { + // Update the service using the existing ID + result, err := tx.Exec( + "UPDATE services SET name = ?, type = ?, config = ?, updated_at = ? WHERE id = ?", + service.Name, service.Type, string(configJSON), time.Now(), existingID, + ) + + if err != nil { + return fmt.Errorf("failed to update service %s: %w", service.ID, err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + log.Printf("Error getting rows affected: %v", err) + } else if rowsAffected == 0 { + log.Printf("Warning: Update did not affect any rows for service %s", existingID) + } + + log.Printf("Updated existing service: %s", existingID) + return nil + }) } // formatServiceName creates a readable name from a service ID diff --git a/util/id_normalizer.go b/util/id_normalizer.go new file mode 100644 index 000000000..fc458e978 --- /dev/null +++ b/util/id_normalizer.go @@ -0,0 +1,82 @@ +package util + +import ( + "strings" + "regexp" +) + +var ( + // Regular expression to match cascading auth suffixes + authCascadeRegex = regexp.MustCompile(`(-auth)+$`) + + // Regular expression for router suffix with auth patterns + routerAuthRegex = regexp.MustCompile(`-router(-auth)*$`) +) + +// NormalizeID provides a standard way to normalize any ID across the application +// It removes provider suffixes and handles special cases like auth cascades +func NormalizeID(id string) string { + // First, remove any provider suffix (if present) + baseName := id + if idx := strings.Index(baseName, "@"); idx > 0 { + baseName = baseName[:idx] + } + + // Handle cascading auth patterns + baseName = authCascadeRegex.ReplaceAllString(baseName, "-auth") + + // Special handling for router resources + if strings.Contains(baseName, "-router") { + // For router-auth, router-auth-auth patterns, normalize to router-auth + baseName = routerAuthRegex.ReplaceAllString(baseName, "-router-auth") + + // Handle redirect suffixes in routers + if strings.Contains(baseName, "-redirect") { + // Normalize router-redirect-auth to router-redirect + if strings.HasSuffix(baseName, "-auth") { + baseName = strings.TrimSuffix(baseName, "-auth") + } + } + } + + return baseName +} + +// GetProviderSuffix extracts the provider suffix from an ID +func GetProviderSuffix(id string) string { + if idx := strings.Index(id, "@"); idx > 0 { + return id[idx:] + } + return "" +} + +// AddProviderSuffix adds a provider suffix if one doesn't exist +// If the ID already has a suffix, it returns the original ID +func AddProviderSuffix(id string, suffix string) string { + if suffix == "" || strings.Contains(id, "@") { + return id + } + + // Ensure suffix starts with @ + if !strings.HasPrefix(suffix, "@") { + suffix = "@" + suffix + } + + return id + suffix +} + +// DetermineProviderSuffix returns the appropriate provider suffix based on context +func DetermineProviderSuffix(sourceType string, activeDataSourceType string) string { + // Use file provider for custom services + if sourceType == "file" { + return "@file" + } + + // For Traefik API, prefer docker provider for matching source types + if activeDataSourceType == "traefik" && sourceType == "traefik" { + return "@docker" + } + + // Default to http provider + return "@http" +} \ No newline at end of file From 48667fd50070dae1ed8a4e0454d88d576f832f63 Mon Sep 17 00:00:00 2001 From: hhftechnologies Date: Tue, 20 May 2025 13:18:09 +0530 Subject: [PATCH 10/10] update --- services/resource_watcher.go | 7 ++++++- services/service_watcher.go | 8 -------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/services/resource_watcher.go b/services/resource_watcher.go index 3fb957be6..98e8ed73c 100644 --- a/services/resource_watcher.go +++ b/services/resource_watcher.go @@ -391,7 +391,12 @@ func (rw *ResourceWatcher) createNewResource(resource models.Resource, normalize return fmt.Errorf("failed to create resource %s: %w", resourceID, err) } - + rowsAffected, err := result.RowsAffected() +if err != nil { + log.Printf("Error getting rows affected: %v", err) +} else if rowsAffected > 0 { + log.Printf("Successfully updated/inserted %d rows", rowsAffected) +} log.Printf("Added new resource: %s (%s)", resource.Host, resourceID) return nil }) diff --git a/services/service_watcher.go b/services/service_watcher.go index 4f222e5a1..b2f4b41ec 100644 --- a/services/service_watcher.go +++ b/services/service_watcher.go @@ -156,14 +156,6 @@ func (sw *ServiceWatcher) checkServices() error { continue } - // Get active data source for context - dsConfig, _ := sw.configManager.GetActiveDataSourceConfig() - - // Determine source type for tracking - if service.SourceType == "" { - service.SourceType = string(dsConfig.Type) - } - // Process service if err := sw.updateOrCreateService(service); err != nil { log.Printf("Error processing service %s: %v", service.ID, err)