Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,14 @@ go.work.sum
# Editor/IDE
# .idea/
# .vscode/

# Generated reports and output files
final_report.yaml
*_report.yaml
periodic_reports/
time_based_*
stream-diff
*.log

# Temporary test files
/tmp/
66 changes: 63 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,73 @@ source:

## Usage

To run a comparison, use the `compare` command and provide the paths to the two configuration files.
### Command Line Interface

The tool can be run in two ways:

#### 1. Using a Run Configuration File (Recommended)

```bash
go run ./cmd/stream-diff -config runConfig.yaml -key user_id
```

#### 2. Using Command Line Parameters

```bash
# (Once implemented)
go run ./cmd/comparator compare ./config1.yaml ./config2.yaml
go run ./cmd/stream-diff \
-source1 path/to/source1.csv \
-source2 path/to/source2.csv \
-key user_id \
-enable-periodic \
-time-interval 30 \
-record-interval 1000 \
-output-dir ./reports
```

### Run Configuration File

Create a `runConfig.yaml` file to define your comparison settings:

```yaml
source1:
type: csv
path: data/source1.csv
parser_config:
json_in_string: false

source2:
type: csv
path: data/source2.csv
parser_config:
json_in_string: false

output:
final_report: final_report.yaml
periodic_reports: periodic_reports

periodic:
enabled: true
time_interval_seconds: 30
record_interval: 1000
```

### Periodic Diff Reporting

The tool supports periodic reporting of differences as data streams are processed. This is useful for:
- Monitoring long-running comparisons
- Early detection of data differences
- Progress tracking for large datasets

**Configuration Options:**
- `time_interval_seconds`: Generate reports every N seconds
- `record_interval`: Generate reports every N records processed
- Both options can be used together - reports trigger when either condition is met

**Output:**
- Periodic reports are saved to timestamped YAML files in the specified directory
- Console output shows real-time progress updates
- Final comprehensive report is generated at the end

## Testing

This project is developed using a test-driven approach. A comprehensive suite of test cases, including source data and expected outputs, can be found in the `testdata` directory. These tests cover all major features and edge cases.
178 changes: 178 additions & 0 deletions cmd/stream-diff/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package main

import (
"data-comparator/internal/pkg/comparator"
"data-comparator/internal/pkg/config"
"data-comparator/internal/pkg/datareader"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"time"

"gopkg.in/yaml.v3"
)

func main() {
var (
runConfigPath = flag.String("config", "runConfig.yaml", "Path to run configuration file")
source1Path = flag.String("source1", "", "Path to first source configuration file")
source2Path = flag.String("source2", "", "Path to second source configuration file")
keyField = flag.String("key", "", "Key field for record comparison")
timeInterval = flag.Int("time-interval", 0, "Time interval for periodic reports (seconds)")
recordInterval = flag.Int("record-interval", 0, "Record interval for periodic reports")
enablePeriodic = flag.Bool("enable-periodic", false, "Enable periodic reporting")
outputDir = flag.String("output-dir", ".", "Directory for output files")
)
flag.Parse()

// Load configuration
var runConfig *config.RunConfig
var err error

if *source1Path != "" && *source2Path != "" {
// Create run config from command line arguments
runConfig = &config.RunConfig{
Source1: config.Source{Type: "csv", Path: *source1Path}, // Default to CSV
Source2: config.Source{Type: "csv", Path: *source2Path}, // Default to CSV
Output: config.OutputConfig{
FinalReport: filepath.Join(*outputDir, "final_report.yaml"),
PeriodicReports: filepath.Join(*outputDir, "periodic_reports"),
},
Periodic: config.PeriodicConfig{
Enabled: *enablePeriodic,
TimeInterval: *timeInterval,
RecordInterval: *recordInterval,
},
}
} else {
// Load from config file
runConfig, err = config.LoadRunConfig(*runConfigPath)
if err != nil {
log.Fatalf("Failed to load run config: %v", err)
}
}

// Override key field if provided
if *keyField != "" {
// Key field will be passed to comparator
} else {
// Try to detect key field or use a default
*keyField = "id" // Default key field
}

fmt.Printf("Starting stream comparison...\n")
fmt.Printf("Source 1: %s\n", runConfig.Source1.Path)
fmt.Printf("Source 2: %s\n", runConfig.Source2.Path)
fmt.Printf("Key field: %s\n", *keyField)
if runConfig.Periodic.Enabled {
fmt.Printf("Periodic reporting enabled - Time: %ds, Records: %d\n",
runConfig.Periodic.TimeInterval, runConfig.Periodic.RecordInterval)
}

// Create data readers
reader1, err := datareader.New(runConfig.Source1)
if err != nil {
log.Fatalf("Failed to create reader for source1: %v", err)
}
defer reader1.Close()

reader2, err := datareader.New(runConfig.Source2)
if err != nil {
log.Fatalf("Failed to create reader for source2: %v", err)
}
defer reader2.Close()

// Create periodic reports directory if needed
if runConfig.Periodic.Enabled && runConfig.Output.PeriodicReports != "" {
if err := os.MkdirAll(runConfig.Output.PeriodicReports, 0755); err != nil {
log.Fatalf("Failed to create periodic reports directory: %v", err)
}
}

// Create periodic diff callback
periodicDiffCallback := func(result comparator.ComparisonResult) error {
fmt.Printf("[PERIODIC] %s - Records: %d, Matching: %d, Identical: %d, Diffs: %d\n",
result.Timestamp.Format("15:04:05"),
result.RecordsProcessed,
result.MatchingKeys,
result.IdenticalRows,
len(result.ValueDiffs))

// Save periodic report if configured
if runConfig.Output.PeriodicReports != "" {
filename := fmt.Sprintf("periodic_report_%s.yaml",
result.Timestamp.Format("20060102_150405"))
filePath := filepath.Join(runConfig.Output.PeriodicReports, filename)

if err := saveReportToFile(result, filePath); err != nil {
return fmt.Errorf("failed to save periodic report: %w", err)
}
fmt.Printf(" Saved periodic report: %s\n", filePath)
}

return nil
}

// Create stream comparator
sc := comparator.NewStreamComparator(
reader1,
reader2,
runConfig.Periodic,
*keyField,
periodicDiffCallback,
)

// Perform comparison
fmt.Printf("Starting comparison...\n")
startTime := time.Now()

finalResult, err := sc.Compare()
if err != nil {
log.Fatalf("Comparison failed: %v", err)
}

duration := time.Since(startTime)
fmt.Printf("\nComparison completed in %v\n", duration)

// Print final summary
fmt.Printf("\nFinal Results:\n")
fmt.Printf(" Records processed: %d\n", finalResult.RecordsProcessed)
fmt.Printf(" Source1 records: %d\n", finalResult.Source1Records)
fmt.Printf(" Source2 records: %d\n", finalResult.Source2Records)
fmt.Printf(" Matching keys: %d\n", finalResult.MatchingKeys)
fmt.Printf(" Identical rows: %d\n", finalResult.IdenticalRows)
fmt.Printf(" Value differences: %d records\n", len(finalResult.ValueDiffs))
fmt.Printf(" Keys only in source1: %d\n", len(finalResult.KeysOnlyInSource1))
fmt.Printf(" Keys only in source2: %d\n", len(finalResult.KeysOnlyInSource2))

// Save final report
finalReportPath := runConfig.Output.FinalReport
if finalReportPath == "" {
finalReportPath = "final_report.yaml"
}

if err := saveReportToFile(*finalResult, finalReportPath); err != nil {
log.Fatalf("Failed to save final report: %v", err)
}
fmt.Printf("\nFinal report saved to: %s\n", finalReportPath)
}

// saveReportToFile saves a comparison result to a YAML file.
func saveReportToFile(result interface{}, filePath string) error {
// Create directory if it doesn't exist
dir := filepath.Dir(filePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}

// Marshal to YAML
data, err := yaml.Marshal(result)
if err != nil {
return err
}

// Write to file
return os.WriteFile(filePath, data, 0644)
}
37 changes: 37 additions & 0 deletions example_runConfig.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Example runConfig.yaml for stream-diff
# This configuration file defines how to compare two data sources with periodic reporting

# First data source configuration
source1:
type: csv # Supported types: csv, json
path: testdata/testcase1_simple_csv/source1.csv
parser_config:
json_in_string: false # Set to true if CSV contains JSON strings to parse
sampler:
sample_size: 1000 # Number of records to sample for schema generation

# Second data source configuration
source2:
type: csv
path: testdata/testcase1_simple_csv/source2.csv
parser_config:
json_in_string: false
sampler:
sample_size: 1000

# Output configuration
output:
final_report: final_report.yaml # Path for the final comparison report
periodic_reports: periodic_reports # Directory for periodic reports

# Periodic diff reporting configuration
periodic:
enabled: true # Enable/disable periodic reporting
time_interval_seconds: 30 # Trigger report every N seconds (0 to disable)
record_interval: 1000 # Trigger report every N records processed (0 to disable)

# Note: If both time_interval_seconds and record_interval are specified,
# a report will be generated when either condition is met.
# If both are 0 when periodic.enabled is true, defaults will be applied:
# - time_interval_seconds: 30
# - record_interval: 1000
22 changes: 22 additions & 0 deletions final_report.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
timestamp: 2025-09-15T01:26:50.985394537Z
records_processed: 10
source1_records: 5
source2_records: 5
matching_keys: 4
identical_rows: 3
keys_only_in_source1:
- "5"
keys_only_in_source2:
- "6"
value_diffs_by_key:
"1":
- field: plan_type
source1_value: premium
source2_value: premium_plus
- field: last_login
source1_value: "2025-09-10T12:00:00Z"
source2_value: "2025-09-10T13:00:00Z"
- field: age
source1_value: "30"
source2_value: "31"
is_periodic_report: false
Loading