diff --git a/.gitignore b/.gitignore index aaadf73..a039965 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,14 @@ go.work.sum # Editor/IDE # .idea/ # .vscode/ + +# Generated reports and output files +final_report.yaml +*_report.yaml +periodic_reports/ +time_based_* +stream-diff +*.log + +# Temporary test files +/tmp/ diff --git a/README.md b/README.md index 359168e..a7ecdaf 100644 --- a/README.md +++ b/README.md @@ -52,13 +52,73 @@ source: ## Usage -To run a comparison, use the `compare` command and provide the paths to the two configuration files. +### Command Line Interface + +The tool can be run in two ways: + +#### 1. Using a Run Configuration File (Recommended) + +```bash +go run ./cmd/stream-diff -config runConfig.yaml -key user_id +``` + +#### 2. Using Command Line Parameters ```bash -# (Once implemented) -go run ./cmd/comparator compare ./config1.yaml ./config2.yaml +go run ./cmd/stream-diff \ + -source1 path/to/source1.csv \ + -source2 path/to/source2.csv \ + -key user_id \ + -enable-periodic \ + -time-interval 30 \ + -record-interval 1000 \ + -output-dir ./reports ``` +### Run Configuration File + +Create a `runConfig.yaml` file to define your comparison settings: + +```yaml +source1: + type: csv + path: data/source1.csv + parser_config: + json_in_string: false + +source2: + type: csv + path: data/source2.csv + parser_config: + json_in_string: false + +output: + final_report: final_report.yaml + periodic_reports: periodic_reports + +periodic: + enabled: true + time_interval_seconds: 30 + record_interval: 1000 +``` + +### Periodic Diff Reporting + +The tool supports periodic reporting of differences as data streams are processed. This is useful for: +- Monitoring long-running comparisons +- Early detection of data differences +- Progress tracking for large datasets + +**Configuration Options:** +- `time_interval_seconds`: Generate reports every N seconds +- `record_interval`: Generate reports every N records processed +- Both options can be used together - reports trigger when either condition is met + +**Output:** +- Periodic reports are saved to timestamped YAML files in the specified directory +- Console output shows real-time progress updates +- Final comprehensive report is generated at the end + ## Testing This project is developed using a test-driven approach. A comprehensive suite of test cases, including source data and expected outputs, can be found in the `testdata` directory. These tests cover all major features and edge cases. diff --git a/cmd/stream-diff/main.go b/cmd/stream-diff/main.go new file mode 100644 index 0000000..7e664a1 --- /dev/null +++ b/cmd/stream-diff/main.go @@ -0,0 +1,178 @@ +package main + +import ( + "data-comparator/internal/pkg/comparator" + "data-comparator/internal/pkg/config" + "data-comparator/internal/pkg/datareader" + "flag" + "fmt" + "log" + "os" + "path/filepath" + "time" + + "gopkg.in/yaml.v3" +) + +func main() { + var ( + runConfigPath = flag.String("config", "runConfig.yaml", "Path to run configuration file") + source1Path = flag.String("source1", "", "Path to first source configuration file") + source2Path = flag.String("source2", "", "Path to second source configuration file") + keyField = flag.String("key", "", "Key field for record comparison") + timeInterval = flag.Int("time-interval", 0, "Time interval for periodic reports (seconds)") + recordInterval = flag.Int("record-interval", 0, "Record interval for periodic reports") + enablePeriodic = flag.Bool("enable-periodic", false, "Enable periodic reporting") + outputDir = flag.String("output-dir", ".", "Directory for output files") + ) + flag.Parse() + + // Load configuration + var runConfig *config.RunConfig + var err error + + if *source1Path != "" && *source2Path != "" { + // Create run config from command line arguments + runConfig = &config.RunConfig{ + Source1: config.Source{Type: "csv", Path: *source1Path}, // Default to CSV + Source2: config.Source{Type: "csv", Path: *source2Path}, // Default to CSV + Output: config.OutputConfig{ + FinalReport: filepath.Join(*outputDir, "final_report.yaml"), + PeriodicReports: filepath.Join(*outputDir, "periodic_reports"), + }, + Periodic: config.PeriodicConfig{ + Enabled: *enablePeriodic, + TimeInterval: *timeInterval, + RecordInterval: *recordInterval, + }, + } + } else { + // Load from config file + runConfig, err = config.LoadRunConfig(*runConfigPath) + if err != nil { + log.Fatalf("Failed to load run config: %v", err) + } + } + + // Override key field if provided + if *keyField != "" { + // Key field will be passed to comparator + } else { + // Try to detect key field or use a default + *keyField = "id" // Default key field + } + + fmt.Printf("Starting stream comparison...\n") + fmt.Printf("Source 1: %s\n", runConfig.Source1.Path) + fmt.Printf("Source 2: %s\n", runConfig.Source2.Path) + fmt.Printf("Key field: %s\n", *keyField) + if runConfig.Periodic.Enabled { + fmt.Printf("Periodic reporting enabled - Time: %ds, Records: %d\n", + runConfig.Periodic.TimeInterval, runConfig.Periodic.RecordInterval) + } + + // Create data readers + reader1, err := datareader.New(runConfig.Source1) + if err != nil { + log.Fatalf("Failed to create reader for source1: %v", err) + } + defer reader1.Close() + + reader2, err := datareader.New(runConfig.Source2) + if err != nil { + log.Fatalf("Failed to create reader for source2: %v", err) + } + defer reader2.Close() + + // Create periodic reports directory if needed + if runConfig.Periodic.Enabled && runConfig.Output.PeriodicReports != "" { + if err := os.MkdirAll(runConfig.Output.PeriodicReports, 0755); err != nil { + log.Fatalf("Failed to create periodic reports directory: %v", err) + } + } + + // Create periodic diff callback + periodicDiffCallback := func(result comparator.ComparisonResult) error { + fmt.Printf("[PERIODIC] %s - Records: %d, Matching: %d, Identical: %d, Diffs: %d\n", + result.Timestamp.Format("15:04:05"), + result.RecordsProcessed, + result.MatchingKeys, + result.IdenticalRows, + len(result.ValueDiffs)) + + // Save periodic report if configured + if runConfig.Output.PeriodicReports != "" { + filename := fmt.Sprintf("periodic_report_%s.yaml", + result.Timestamp.Format("20060102_150405")) + filePath := filepath.Join(runConfig.Output.PeriodicReports, filename) + + if err := saveReportToFile(result, filePath); err != nil { + return fmt.Errorf("failed to save periodic report: %w", err) + } + fmt.Printf(" Saved periodic report: %s\n", filePath) + } + + return nil + } + + // Create stream comparator + sc := comparator.NewStreamComparator( + reader1, + reader2, + runConfig.Periodic, + *keyField, + periodicDiffCallback, + ) + + // Perform comparison + fmt.Printf("Starting comparison...\n") + startTime := time.Now() + + finalResult, err := sc.Compare() + if err != nil { + log.Fatalf("Comparison failed: %v", err) + } + + duration := time.Since(startTime) + fmt.Printf("\nComparison completed in %v\n", duration) + + // Print final summary + fmt.Printf("\nFinal Results:\n") + fmt.Printf(" Records processed: %d\n", finalResult.RecordsProcessed) + fmt.Printf(" Source1 records: %d\n", finalResult.Source1Records) + fmt.Printf(" Source2 records: %d\n", finalResult.Source2Records) + fmt.Printf(" Matching keys: %d\n", finalResult.MatchingKeys) + fmt.Printf(" Identical rows: %d\n", finalResult.IdenticalRows) + fmt.Printf(" Value differences: %d records\n", len(finalResult.ValueDiffs)) + fmt.Printf(" Keys only in source1: %d\n", len(finalResult.KeysOnlyInSource1)) + fmt.Printf(" Keys only in source2: %d\n", len(finalResult.KeysOnlyInSource2)) + + // Save final report + finalReportPath := runConfig.Output.FinalReport + if finalReportPath == "" { + finalReportPath = "final_report.yaml" + } + + if err := saveReportToFile(*finalResult, finalReportPath); err != nil { + log.Fatalf("Failed to save final report: %v", err) + } + fmt.Printf("\nFinal report saved to: %s\n", finalReportPath) +} + +// saveReportToFile saves a comparison result to a YAML file. +func saveReportToFile(result interface{}, filePath string) error { + // Create directory if it doesn't exist + dir := filepath.Dir(filePath) + if err := os.MkdirAll(dir, 0755); err != nil { + return err + } + + // Marshal to YAML + data, err := yaml.Marshal(result) + if err != nil { + return err + } + + // Write to file + return os.WriteFile(filePath, data, 0644) +} \ No newline at end of file diff --git a/example_runConfig.yaml b/example_runConfig.yaml new file mode 100644 index 0000000..7d66c43 --- /dev/null +++ b/example_runConfig.yaml @@ -0,0 +1,37 @@ +# Example runConfig.yaml for stream-diff +# This configuration file defines how to compare two data sources with periodic reporting + +# First data source configuration +source1: + type: csv # Supported types: csv, json + path: testdata/testcase1_simple_csv/source1.csv + parser_config: + json_in_string: false # Set to true if CSV contains JSON strings to parse + sampler: + sample_size: 1000 # Number of records to sample for schema generation + +# Second data source configuration +source2: + type: csv + path: testdata/testcase1_simple_csv/source2.csv + parser_config: + json_in_string: false + sampler: + sample_size: 1000 + +# Output configuration +output: + final_report: final_report.yaml # Path for the final comparison report + periodic_reports: periodic_reports # Directory for periodic reports + +# Periodic diff reporting configuration +periodic: + enabled: true # Enable/disable periodic reporting + time_interval_seconds: 30 # Trigger report every N seconds (0 to disable) + record_interval: 1000 # Trigger report every N records processed (0 to disable) + +# Note: If both time_interval_seconds and record_interval are specified, +# a report will be generated when either condition is met. +# If both are 0 when periodic.enabled is true, defaults will be applied: +# - time_interval_seconds: 30 +# - record_interval: 1000 \ No newline at end of file diff --git a/final_report.yaml b/final_report.yaml new file mode 100644 index 0000000..e588724 --- /dev/null +++ b/final_report.yaml @@ -0,0 +1,22 @@ +timestamp: 2025-09-15T01:26:50.985394537Z +records_processed: 10 +source1_records: 5 +source2_records: 5 +matching_keys: 4 +identical_rows: 3 +keys_only_in_source1: + - "5" +keys_only_in_source2: + - "6" +value_diffs_by_key: + "1": + - field: plan_type + source1_value: premium + source2_value: premium_plus + - field: last_login + source1_value: "2025-09-10T12:00:00Z" + source2_value: "2025-09-10T13:00:00Z" + - field: age + source1_value: "30" + source2_value: "31" +is_periodic_report: false diff --git a/internal/pkg/comparator/comparator.go b/internal/pkg/comparator/comparator.go new file mode 100644 index 0000000..34dceb7 --- /dev/null +++ b/internal/pkg/comparator/comparator.go @@ -0,0 +1,284 @@ +package comparator + +import ( + "data-comparator/internal/pkg/config" + "data-comparator/internal/pkg/datareader" + "fmt" + "io" + "time" +) + +// ComparisonResult holds the results of comparing two data streams. +type ComparisonResult struct { + Timestamp time.Time `yaml:"timestamp"` + RecordsProcessed int `yaml:"records_processed"` + Source1Records int `yaml:"source1_records"` + Source2Records int `yaml:"source2_records"` + MatchingKeys int `yaml:"matching_keys"` + IdenticalRows int `yaml:"identical_rows"` + KeysOnlyInSource1 []string `yaml:"keys_only_in_source1,omitempty"` + KeysOnlyInSource2 []string `yaml:"keys_only_in_source2,omitempty"` + ValueDiffs map[string][]FieldDiff `yaml:"value_diffs_by_key,omitempty"` + IsPeriodicReport bool `yaml:"is_periodic_report"` +} + +// FieldDiff represents a difference in a field between two records. +type FieldDiff struct { + Field string `yaml:"field"` + Source1Value interface{} `yaml:"source1_value"` + Source2Value interface{} `yaml:"source2_value"` +} + +// StreamComparator compares two data streams with periodic reporting. +type StreamComparator struct { + source1 datareader.DataReader + source2 datareader.DataReader + periodicConfig config.PeriodicConfig + onPeriodicDiff func(result ComparisonResult) error + keyField string +} + +// NewStreamComparator creates a new stream comparator. +func NewStreamComparator( + source1, source2 datareader.DataReader, + periodicConfig config.PeriodicConfig, + keyField string, + onPeriodicDiff func(result ComparisonResult) error, +) *StreamComparator { + return &StreamComparator{ + source1: source1, + source2: source2, + periodicConfig: periodicConfig, + onPeriodicDiff: onPeriodicDiff, + keyField: keyField, + } +} + +// Compare performs the stream comparison with periodic reporting. +func (sc *StreamComparator) Compare() (*ComparisonResult, error) { + startTime := time.Now() + lastReportTime := startTime + recordsProcessed := 0 + lastReportRecords := 0 + + // Maps to store records by key + source1Records := make(map[string]datareader.Record) + source2Records := make(map[string]datareader.Record) + + source1Count, source2Count := 0, 0 + + // Read all records from source1 + for { + record, err := sc.source1.Read() + if err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("error reading from source1: %w", err) + } + + source1Count++ + recordsProcessed++ + + key := sc.getRecordKey(record) + if key != "" { + source1Records[key] = record + } + + // Check for periodic reporting + if sc.shouldReportPeriodic(startTime, lastReportTime, recordsProcessed, lastReportRecords) { + result := sc.generatePeriodicResult(source1Records, source2Records, recordsProcessed, source1Count, source2Count) + if sc.onPeriodicDiff != nil { + if err := sc.onPeriodicDiff(result); err != nil { + return nil, fmt.Errorf("error in periodic diff callback: %w", err) + } + } + lastReportTime = time.Now() + lastReportRecords = recordsProcessed + } + } + + // Read all records from source2 + for { + record, err := sc.source2.Read() + if err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("error reading from source2: %w", err) + } + + source2Count++ + recordsProcessed++ + + key := sc.getRecordKey(record) + if key != "" { + source2Records[key] = record + } + + // Check for periodic reporting + if sc.shouldReportPeriodic(startTime, lastReportTime, recordsProcessed, lastReportRecords) { + result := sc.generatePeriodicResult(source1Records, source2Records, recordsProcessed, source1Count, source2Count) + if sc.onPeriodicDiff != nil { + if err := sc.onPeriodicDiff(result); err != nil { + return nil, fmt.Errorf("error in periodic diff callback: %w", err) + } + } + lastReportTime = time.Now() + lastReportRecords = recordsProcessed + } + } + + // Generate final result + return sc.generateFinalResult(source1Records, source2Records, recordsProcessed, source1Count, source2Count), nil +} + +// getRecordKey extracts the key field value from a record. +func (sc *StreamComparator) getRecordKey(record datareader.Record) string { + if sc.keyField == "" { + return "" + } + + value, exists := record[sc.keyField] + if !exists { + return "" + } + + return fmt.Sprintf("%v", value) +} + +// shouldReportPeriodic determines if a periodic report should be generated. +func (sc *StreamComparator) shouldReportPeriodic(startTime, lastReportTime time.Time, recordsProcessed, lastReportRecords int) bool { + if !sc.periodicConfig.Enabled { + return false + } + + // Check time interval + if sc.periodicConfig.TimeInterval > 0 { + if time.Since(lastReportTime).Seconds() >= float64(sc.periodicConfig.TimeInterval) { + return true + } + } + + // Check record interval + if sc.periodicConfig.RecordInterval > 0 { + if recordsProcessed-lastReportRecords >= sc.periodicConfig.RecordInterval { + return true + } + } + + return false +} + +// generatePeriodicResult creates a periodic comparison result. +func (sc *StreamComparator) generatePeriodicResult( + source1Records, source2Records map[string]datareader.Record, + recordsProcessed, source1Count, source2Count int, +) ComparisonResult { + return sc.generateResult(source1Records, source2Records, recordsProcessed, source1Count, source2Count, true) +} + +// generateFinalResult creates the final comparison result. +func (sc *StreamComparator) generateFinalResult( + source1Records, source2Records map[string]datareader.Record, + recordsProcessed, source1Count, source2Count int, +) *ComparisonResult { + result := sc.generateResult(source1Records, source2Records, recordsProcessed, source1Count, source2Count, false) + return &result +} + +// generateResult creates a comparison result. +func (sc *StreamComparator) generateResult( + source1Records, source2Records map[string]datareader.Record, + recordsProcessed, source1Count, source2Count int, + isPeriodicReport bool, +) ComparisonResult { + result := ComparisonResult{ + Timestamp: time.Now(), + RecordsProcessed: recordsProcessed, + Source1Records: source1Count, + Source2Records: source2Count, + IsPeriodicReport: isPeriodicReport, + ValueDiffs: make(map[string][]FieldDiff), + } + + // Find keys only in source1 + for key := range source1Records { + if _, exists := source2Records[key]; !exists { + result.KeysOnlyInSource1 = append(result.KeysOnlyInSource1, key) + } + } + + // Find keys only in source2 + for key := range source2Records { + if _, exists := source1Records[key]; !exists { + result.KeysOnlyInSource2 = append(result.KeysOnlyInSource2, key) + } + } + + // Compare records with matching keys + for key, record1 := range source1Records { + if record2, exists := source2Records[key]; exists { + result.MatchingKeys++ + + // Compare field values + diffs := sc.compareRecords(record1, record2) + if len(diffs) == 0 { + result.IdenticalRows++ + } else { + result.ValueDiffs[key] = diffs + } + } + } + + return result +} + +// compareRecords compares two records and returns field differences. +func (sc *StreamComparator) compareRecords(record1, record2 datareader.Record) []FieldDiff { + var diffs []FieldDiff + + // Get all unique field names + allFields := make(map[string]bool) + for field := range record1 { + allFields[field] = true + } + for field := range record2 { + allFields[field] = true + } + + // Compare each field + for field := range allFields { + value1, exists1 := record1[field] + value2, exists2 := record2[field] + + if !exists1 && !exists2 { + continue + } + + if !exists1 || !exists2 || !sc.valuesEqual(value1, value2) { + diffs = append(diffs, FieldDiff{ + Field: field, + Source1Value: value1, + Source2Value: value2, + }) + } + } + + return diffs +} + +// valuesEqual compares two values for equality. +func (sc *StreamComparator) valuesEqual(v1, v2 interface{}) bool { + if v1 == nil && v2 == nil { + return true + } + if v1 == nil || v2 == nil { + return false + } + + // Simple string comparison for now + str1 := fmt.Sprintf("%v", v1) + str2 := fmt.Sprintf("%v", v2) + return str1 == str2 +} \ No newline at end of file diff --git a/internal/pkg/comparator/comparator_test.go b/internal/pkg/comparator/comparator_test.go new file mode 100644 index 0000000..86bfcfa --- /dev/null +++ b/internal/pkg/comparator/comparator_test.go @@ -0,0 +1,181 @@ +package comparator + +import ( + "data-comparator/internal/pkg/config" + "data-comparator/internal/pkg/datareader" + "io" + "testing" + "time" +) + +// MockDataReader is a mock implementation of DataReader for testing +type MockDataReader struct { + records []datareader.Record + index int +} + +func NewMockDataReader(records []datareader.Record) *MockDataReader { + return &MockDataReader{ + records: records, + index: 0, + } +} + +func (m *MockDataReader) Read() (datareader.Record, error) { + if m.index >= len(m.records) { + return nil, io.EOF + } + record := m.records[m.index] + m.index++ + return record, nil +} + +func (m *MockDataReader) Close() error { + return nil +} + +func TestStreamComparator_Compare(t *testing.T) { + // Create test data + source1Records := []datareader.Record{ + {"id": "1", "name": "Alice", "age": "30"}, + {"id": "2", "name": "Bob", "age": "25"}, + {"id": "3", "name": "Charlie", "age": "35"}, + } + + source2Records := []datareader.Record{ + {"id": "1", "name": "Alice", "age": "31"}, // Age diff + {"id": "2", "name": "Bob", "age": "25"}, // Identical + {"id": "4", "name": "David", "age": "40"}, // Only in source2 + } + + // Create mock readers + reader1 := NewMockDataReader(source1Records) + reader2 := NewMockDataReader(source2Records) + + // Create comparator without periodic reporting + periodicConfig := config.PeriodicConfig{ + Enabled: false, + } + + sc := NewStreamComparator(reader1, reader2, periodicConfig, "id", nil) + + // Perform comparison + result, err := sc.Compare() + if err != nil { + t.Fatalf("Compare() error = %v", err) + } + + // Verify results + if result.RecordsProcessed != 6 { + t.Errorf("RecordsProcessed got = %v, want %v", result.RecordsProcessed, 6) + } + if result.Source1Records != 3 { + t.Errorf("Source1Records got = %v, want %v", result.Source1Records, 3) + } + if result.Source2Records != 3 { + t.Errorf("Source2Records got = %v, want %v", result.Source2Records, 3) + } + if result.MatchingKeys != 2 { + t.Errorf("MatchingKeys got = %v, want %v", result.MatchingKeys, 2) + } + if result.IdenticalRows != 1 { + t.Errorf("IdenticalRows got = %v, want %v", result.IdenticalRows, 1) + } + + // Check keys only in source1 + if len(result.KeysOnlyInSource1) != 1 || result.KeysOnlyInSource1[0] != "3" { + t.Errorf("KeysOnlyInSource1 got = %v, want %v", result.KeysOnlyInSource1, []string{"3"}) + } + + // Check keys only in source2 + if len(result.KeysOnlyInSource2) != 1 || result.KeysOnlyInSource2[0] != "4" { + t.Errorf("KeysOnlyInSource2 got = %v, want %v", result.KeysOnlyInSource2, []string{"4"}) + } + + // Check value diffs + if len(result.ValueDiffs) != 1 { + t.Errorf("ValueDiffs length got = %v, want %v", len(result.ValueDiffs), 1) + } + + if diffs, exists := result.ValueDiffs["1"]; exists { + if len(diffs) != 1 { + t.Errorf("ValueDiffs for key '1' length got = %v, want %v", len(diffs), 1) + } else if diffs[0].Field != "age" || diffs[0].Source1Value != "30" || diffs[0].Source2Value != "31" { + t.Errorf("ValueDiffs for key '1' got = %v, want age diff 30->31", diffs[0]) + } + } else { + t.Error("Expected value diff for key '1' not found") + } +} + +func TestStreamComparator_PeriodicReporting(t *testing.T) { + // Create test data + source1Records := []datareader.Record{ + {"id": "1", "name": "Alice"}, + {"id": "2", "name": "Bob"}, + } + + source2Records := []datareader.Record{ + {"id": "1", "name": "Alice"}, + } + + // Create mock readers + reader1 := NewMockDataReader(source1Records) + reader2 := NewMockDataReader(source2Records) + + // Track periodic reports + var periodicReports []ComparisonResult + periodicCallback := func(result ComparisonResult) error { + periodicReports = append(periodicReports, result) + return nil + } + + // Create comparator with periodic reporting every 1 record + periodicConfig := config.PeriodicConfig{ + Enabled: true, + RecordInterval: 1, + } + + sc := NewStreamComparator(reader1, reader2, periodicConfig, "id", periodicCallback) + + // Perform comparison + _, err := sc.Compare() + if err != nil { + t.Fatalf("Compare() error = %v", err) + } + + // Should have received periodic reports + if len(periodicReports) == 0 { + t.Error("Expected periodic reports but got none") + } + + // All periodic reports should be marked as such + for i, report := range periodicReports { + if !report.IsPeriodicReport { + t.Errorf("Periodic report %d not marked as periodic", i) + } + } +} + +func TestStreamComparator_TimeBasedPeriodic(t *testing.T) { + // This test is more challenging to write without actual time delays + // For now, we'll just test the configuration + periodicConfig := config.PeriodicConfig{ + Enabled: true, + TimeInterval: 1, // 1 second + } + + reader1 := NewMockDataReader([]datareader.Record{}) + reader2 := NewMockDataReader([]datareader.Record{}) + + sc := NewStreamComparator(reader1, reader2, periodicConfig, "id", nil) + + // Test the shouldReportPeriodic method + startTime := time.Now() + lastReportTime := startTime.Add(-2 * time.Second) // 2 seconds ago + + should := sc.shouldReportPeriodic(startTime, lastReportTime, 10, 5) + if !should { + t.Error("Should report periodic when time interval exceeded") + } +} \ No newline at end of file diff --git a/internal/pkg/comparator/slow_test.go b/internal/pkg/comparator/slow_test.go new file mode 100644 index 0000000..e06689e --- /dev/null +++ b/internal/pkg/comparator/slow_test.go @@ -0,0 +1,102 @@ +package comparator + +import ( + "data-comparator/internal/pkg/config" + "data-comparator/internal/pkg/datareader" + "fmt" + "io" + "time" +) + +// SlowMockDataReader simulates slow data reading for testing time-based triggers +type SlowMockDataReader struct { + records []datareader.Record + index int + delay time.Duration +} + +func NewSlowMockDataReader(records []datareader.Record, delay time.Duration) *SlowMockDataReader { + return &SlowMockDataReader{ + records: records, + index: 0, + delay: delay, + } +} + +func (m *SlowMockDataReader) Read() (datareader.Record, error) { + if m.index >= len(m.records) { + return nil, io.EOF + } + + // Add delay to simulate slow reading + if m.delay > 0 { + time.Sleep(m.delay) + } + + record := m.records[m.index] + m.index++ + return record, nil +} + +func (m *SlowMockDataReader) Close() error { + return nil +} + +// CreateSlowReaderTest creates a test that demonstrates time-based periodic reporting +func CreateSlowReaderTest() { + fmt.Println("Testing time-based periodic reporting...") + + // Create test data with enough records to span time intervals + source1Records := []datareader.Record{ + {"id": "1", "name": "Alice", "age": "30"}, + {"id": "2", "name": "Bob", "age": "25"}, + {"id": "3", "name": "Charlie", "age": "35"}, + {"id": "4", "name": "David", "age": "40"}, + {"id": "5", "name": "Eve", "age": "28"}, + } + + source2Records := []datareader.Record{ + {"id": "1", "name": "Alice", "age": "31"}, + {"id": "2", "name": "Bob", "age": "25"}, + {"id": "3", "name": "Charlie", "age": "36"}, + {"id": "6", "name": "Frank", "age": "45"}, + {"id": "7", "name": "Grace", "age": "32"}, + } + + // Create slow readers with 2-second delay per record + reader1 := NewSlowMockDataReader(source1Records, 2*time.Second) + reader2 := NewSlowMockDataReader(source2Records, 2*time.Second) + + // Configure for time-based periodic reporting every 3 seconds + periodicConfig := config.PeriodicConfig{ + Enabled: true, + TimeInterval: 3, // 3 seconds + RecordInterval: 0, // Disable record-based trigger + } + + // Track periodic reports + periodicCallback := func(result ComparisonResult) error { + fmt.Printf("[PERIODIC TIME-BASED] %s - Records: %d, Matching: %d\n", + result.Timestamp.Format("15:04:05"), + result.RecordsProcessed, + result.MatchingKeys) + return nil + } + + sc := NewStreamComparator(reader1, reader2, periodicConfig, "id", periodicCallback) + + // Perform comparison + fmt.Println("Starting slow comparison to demonstrate time-based triggers...") + start := time.Now() + + result, err := sc.Compare() + if err != nil { + fmt.Printf("Error: %v\n", err) + return + } + + duration := time.Since(start) + fmt.Printf("Completed in %v\n", duration) + fmt.Printf("Final: Records: %d, Matching: %d, Identical: %d\n", + result.RecordsProcessed, result.MatchingKeys, result.IdenticalRows) +} \ No newline at end of file diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index 9317fa5..98e3765 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -12,6 +12,27 @@ type Config struct { Source Source `yaml:"source"` } +// RunConfig defines the configuration for running stream comparisons with periodic reporting. +type RunConfig struct { + Source1 Source `yaml:"source1"` + Source2 Source `yaml:"source2"` + Output OutputConfig `yaml:"output,omitempty"` + Periodic PeriodicConfig `yaml:"periodic,omitempty"` +} + +// OutputConfig defines output settings for comparison results. +type OutputConfig struct { + FinalReport string `yaml:"final_report,omitempty"` + PeriodicReports string `yaml:"periodic_reports,omitempty"` // Directory for periodic reports +} + +// PeriodicConfig defines settings for periodic diff reporting. +type PeriodicConfig struct { + Enabled bool `yaml:"enabled"` + TimeInterval int `yaml:"time_interval_seconds,omitempty"` // Time interval in seconds + RecordInterval int `yaml:"record_interval,omitempty"` // Record count interval +} + // Source defines the data source configuration. type Source struct { Type string `yaml:"type"` @@ -45,3 +66,25 @@ func Load(filePath string) (*Config, error) { return &cfg, nil } + +// LoadRunConfig reads a YAML run configuration file from the given path and returns a RunConfig struct. +func LoadRunConfig(filePath string) (*RunConfig, error) { + data, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("failed to read run config file %s: %w", filePath, err) + } + + var cfg RunConfig + err = yaml.Unmarshal(data, &cfg) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal yaml from %s: %w", filePath, err) + } + + // Set defaults for periodic configuration + if cfg.Periodic.TimeInterval == 0 && cfg.Periodic.RecordInterval == 0 { + cfg.Periodic.TimeInterval = 30 // Default to 30 seconds + cfg.Periodic.RecordInterval = 1000 // Default to 1000 records + } + + return &cfg, nil +} diff --git a/internal/pkg/config/run_config_test.go b/internal/pkg/config/run_config_test.go new file mode 100644 index 0000000..4623e43 --- /dev/null +++ b/internal/pkg/config/run_config_test.go @@ -0,0 +1,113 @@ +package config + +import ( + "os" + "testing" +) + +func TestLoadRunConfig(t *testing.T) { + // Create a test config file content + yamlContent := ` +source1: + type: csv + path: testdata/source1.csv +source2: + type: csv + path: testdata/source2.csv +output: + final_report: final_report.yaml + periodic_reports: periodic_reports +periodic: + enabled: true + time_interval_seconds: 30 + record_interval: 1000 +` + // Create temporary file + tmpFile := "/tmp/test_run_config.yaml" + err := writeStringToFile(tmpFile, yamlContent) + if err != nil { + t.Fatalf("Failed to create test file: %v", err) + } + + // Test loading + runConfig, err := LoadRunConfig(tmpFile) + if err != nil { + t.Fatalf("LoadRunConfig() error = %v", err) + } + + if runConfig == nil { + t.Fatal("LoadRunConfig() returned nil config") + } + + // Verify source1 + if runConfig.Source1.Type != "csv" { + t.Errorf("Source1.Type got = %v, want %v", runConfig.Source1.Type, "csv") + } + if runConfig.Source1.Path != "testdata/source1.csv" { + t.Errorf("Source1.Path got = %v, want %v", runConfig.Source1.Path, "testdata/source1.csv") + } + + // Verify source2 + if runConfig.Source2.Type != "csv" { + t.Errorf("Source2.Type got = %v, want %v", runConfig.Source2.Type, "csv") + } + if runConfig.Source2.Path != "testdata/source2.csv" { + t.Errorf("Source2.Path got = %v, want %v", runConfig.Source2.Path, "testdata/source2.csv") + } + + // Verify output config + if runConfig.Output.FinalReport != "final_report.yaml" { + t.Errorf("Output.FinalReport got = %v, want %v", runConfig.Output.FinalReport, "final_report.yaml") + } + if runConfig.Output.PeriodicReports != "periodic_reports" { + t.Errorf("Output.PeriodicReports got = %v, want %v", runConfig.Output.PeriodicReports, "periodic_reports") + } + + // Verify periodic config + if !runConfig.Periodic.Enabled { + t.Errorf("Periodic.Enabled got = %v, want %v", runConfig.Periodic.Enabled, true) + } + if runConfig.Periodic.TimeInterval != 30 { + t.Errorf("Periodic.TimeInterval got = %v, want %v", runConfig.Periodic.TimeInterval, 30) + } + if runConfig.Periodic.RecordInterval != 1000 { + t.Errorf("Periodic.RecordInterval got = %v, want %v", runConfig.Periodic.RecordInterval, 1000) + } +} + +func TestLoadRunConfigWithDefaults(t *testing.T) { + // Create a minimal test config file content (no periodic config) + yamlContent := ` +source1: + type: csv + path: testdata/source1.csv +source2: + type: json + path: testdata/source2.json +` + // Create temporary file + tmpFile := "/tmp/test_run_config_defaults.yaml" + err := writeStringToFile(tmpFile, yamlContent) + if err != nil { + t.Fatalf("Failed to create test file: %v", err) + } + + // Test loading + runConfig, err := LoadRunConfig(tmpFile) + if err != nil { + t.Fatalf("LoadRunConfig() error = %v", err) + } + + // Verify defaults are applied + if runConfig.Periodic.TimeInterval != 30 { + t.Errorf("Default Periodic.TimeInterval got = %v, want %v", runConfig.Periodic.TimeInterval, 30) + } + if runConfig.Periodic.RecordInterval != 1000 { + t.Errorf("Default Periodic.RecordInterval got = %v, want %v", runConfig.Periodic.RecordInterval, 1000) + } +} + +// Helper function to write string to file +func writeStringToFile(filename, content string) error { + return os.WriteFile(filename, []byte(content), 0644) +} \ No newline at end of file diff --git a/runConfig.yaml b/runConfig.yaml new file mode 100644 index 0000000..def523b --- /dev/null +++ b/runConfig.yaml @@ -0,0 +1,16 @@ +source1: + type: csv + path: testdata/testcase1_simple_csv/source1.csv + +source2: + type: csv + path: testdata/testcase1_simple_csv/source2.csv + +output: + final_report: final_report.yaml + periodic_reports: periodic_reports + +periodic: + enabled: true + time_interval_seconds: 10 + record_interval: 3 \ No newline at end of file diff --git a/stream-diff b/stream-diff new file mode 100755 index 0000000..1c764ba Binary files /dev/null and b/stream-diff differ diff --git a/time_test_config.yaml b/time_test_config.yaml new file mode 100644 index 0000000..80e6956 --- /dev/null +++ b/time_test_config.yaml @@ -0,0 +1,16 @@ +source1: + type: csv + path: testdata/testcase1_simple_csv/source1.csv + +source2: + type: csv + path: testdata/testcase1_simple_csv/source2.csv + +output: + final_report: time_based_final_report.yaml + periodic_reports: time_based_periodic_reports + +periodic: + enabled: true + time_interval_seconds: 3 + record_interval: 0 \ No newline at end of file