diff --git a/README.md b/README.md index 359168e..68bd72d 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ This is particularly useful for data validation, migration testing, and ensuring ## Features -- **Multiple Data Sources:** Supports reading from different sources, starting with CSV and JSON-Lines (`.jsonl`). +- **Multiple Data Sources:** Supports reading from different sources, including CSV, JSON-Lines (`.jsonl`), and Protocol Buffers (Protobuf). - **Automatic Schema Detection:** - Infers the schema from a sample of the data. - Flattens nested JSON objects and arrays into a dot-notation format (e.g., `customer.address.city`). @@ -18,6 +18,11 @@ This is particularly useful for data validation, migration testing, and ensuring - **Advanced String Parsing:** - Can detect and recursively parse JSON strings embedded within other file formats (e.g., a CSV field containing a JSON object). - Identifies field patterns using a library of built-in regex matchers and supports custom matchers. +- **Protobuf Support:** + - Reads JSON-serialized protobuf messages (most common format for streaming data). + - Supports both `protobuf` and `proto` as source types for convenience. + - Handles nested protobuf messages with automatic field flattening. + - Compatible with protobuf messages exported to JSON format from various systems. - **Intelligent Date/Time Handling:** - Parses and compares `date`, `datetime`, and `timestamp` fields, even if their string formats differ between sources. - Supports timestamps with variable precision. @@ -35,7 +40,7 @@ The tool is configured using two YAML files, one for each data source. **Example `config.yaml`:** ```yaml source: - # Type of the data source. Supported: csv, json + # Type of the data source. Supported: csv, json, protobuf (or proto) type: csv # Path to the source file. path: path/to/your/data.csv @@ -50,6 +55,17 @@ source: # ... ``` +**Example Protobuf config:** +```yaml +source: + type: protobuf # or "proto" for short + path: path/to/your/data.jsonpb + parser_config: + json_in_string: false # Usually not needed for protobuf JSON + sampler: + sample_size: 1000 # Number of records to sample for schema detection +``` + ## Usage To run a comparison, use the `compare` command and provide the paths to the two configuration files. diff --git a/go.mod b/go.mod index 1dbba1e..3dbc862 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,6 @@ module data-comparator go 1.25 -require gopkg.in/yaml.v3 v3.0.1 // indirect +require gopkg.in/yaml.v3 v3.0.1 + +require google.golang.org/protobuf v1.36.9 // indirect diff --git a/go.sum b/go.sum index 4bc0337..5539806 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,6 @@ +google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw= +google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/pkg/datareader/datareader.go b/internal/pkg/datareader/datareader.go index 5910bcc..bf406ab 100644 --- a/internal/pkg/datareader/datareader.go +++ b/internal/pkg/datareader/datareader.go @@ -24,6 +24,8 @@ func New(cfg config.Source) (DataReader, error) { return NewCSVReader(cfg) case "json": return NewJSONReader(cfg) + case "protobuf", "proto": + return NewProtobufReader(cfg) default: return nil, fmt.Errorf("unsupported source type: %s", cfg.Type) } diff --git a/internal/pkg/datareader/datareader_test.go b/internal/pkg/datareader/datareader_test.go index aa5df22..01626b9 100644 --- a/internal/pkg/datareader/datareader_test.go +++ b/internal/pkg/datareader/datareader_test.go @@ -134,3 +134,108 @@ func TestReader_EOF(t *testing.T) { t.Errorf("Expected io.EOF, got %v", err) } } + +func TestProtobufReader_JSONFormat(t *testing.T) { + cfg := config.Source{ + Type: "protobuf", + Path: "../../../testdata/testcase4_protobuf/source1.jsonpb", + } + reader, err := New(cfg) + if err != nil { + t.Fatalf("New() error = %v", err) + } + defer reader.Close() + + // Read first record + rec, err := reader.Read() + if err != nil { + t.Fatalf("Read() error = %v", err) + } + + // Check basic field + if userID, ok := rec["user_id"].(string); ok { + if userID != "user-001" { + t.Errorf("user_id read incorrectly, got %s, want %s", userID, "user-001") + } + } else { + t.Error("Field 'user_id' is not a string") + } + + // Check nested field + if profile, ok := rec["profile"].(map[string]interface{}); ok { + if email, ok := profile["email"].(string); ok { + if email != "alice@example.com" { + t.Errorf("Nested field email read incorrectly, got %s, want %s", email, "alice@example.com") + } + } else { + t.Error("Nested field 'email' is not a string") + } + } else { + t.Error("Field 'profile' is not a map") + } + + // Check array field + if activity, ok := rec["activity"].(map[string]interface{}); ok { + if devices, ok := activity["devices"].([]interface{}); ok { + if len(devices) != 2 { + t.Errorf("devices array length incorrect, got %d, want %d", len(devices), 2) + } + if devices[0].(string) != "mobile" { + t.Errorf("First device incorrect, got %s, want %s", devices[0], "mobile") + } + } else { + t.Error("Field 'devices' is not an array") + } + } else { + t.Error("Field 'activity' is not a map") + } +} + +func TestProtobufReader_MultipleTypes(t *testing.T) { + // Test both "protobuf" and "proto" as valid type names + for _, sourceType := range []string{"protobuf", "proto"} { + t.Run(sourceType, func(t *testing.T) { + cfg := config.Source{ + Type: sourceType, + Path: "../../../testdata/testcase4_protobuf/source1.jsonpb", + } + reader, err := New(cfg) + if err != nil { + t.Fatalf("New() error for type %s = %v", sourceType, err) + } + defer reader.Close() + + // Just verify we can read one record without error + _, err = reader.Read() + if err != nil { + t.Fatalf("Read() error for type %s = %v", sourceType, err) + } + }) + } +} + +func TestProtobufReader_EOF(t *testing.T) { + cfg := config.Source{ + Type: "protobuf", + Path: "../../../testdata/testcase4_protobuf/source1.jsonpb", + } + reader, err := New(cfg) + if err != nil { + t.Fatalf("New() error = %v", err) + } + defer reader.Close() + + // Read all 5 records + for i := 0; i < 5; i++ { + _, err := reader.Read() + if err != nil { + t.Fatalf("Read() error on record %d: %v", i+1, err) + } + } + + // The next read should return io.EOF + _, err = reader.Read() + if err != io.EOF { + t.Errorf("Expected io.EOF, got %v", err) + } +} diff --git a/internal/pkg/datareader/protobuf_reader.go b/internal/pkg/datareader/protobuf_reader.go new file mode 100644 index 0000000..0ccd2e2 --- /dev/null +++ b/internal/pkg/datareader/protobuf_reader.go @@ -0,0 +1,220 @@ +package datareader + +import ( + "data-comparator/internal/pkg/config" + "fmt" + "os" + "bufio" + "io" + "encoding/json" +) + +// ProtobufReader reads records from protobuf files. +// It supports different protobuf formats: +// - Binary protobuf messages (requires descriptor file) +// - JSON serialized protobuf messages +// - Text format protobuf messages +type ProtobufReader struct { + file *os.File + scanner *bufio.Scanner + format string // "binary", "json", "text" + parserConfig config.ParserConfig +} + +// ProtobufParserConfig extends ParserConfig with protobuf-specific options +type ProtobufParserConfig struct { + config.ParserConfig + // Format specifies the protobuf format: "binary", "json", or "text" + // Default is "json" which is most common for streaming data + Format string `yaml:"format"` + // DescriptorPath is the path to the protobuf descriptor file (.desc) + // Required for binary format, optional for others + DescriptorPath string `yaml:"descriptor_path"` + // MessageType is the name of the protobuf message type + // Required when using descriptor file + MessageType string `yaml:"message_type"` + // MessageSeparator for binary format (default is length-prefixed) + // Can be "length-prefixed", "newline", or "fixed-size" + MessageSeparator string `yaml:"message_separator"` +} + +// NewProtobufReader creates a new reader for protobuf files. +func NewProtobufReader(cfg config.Source) (DataReader, error) { + file, err := os.Open(cfg.Path) + if err != nil { + return nil, fmt.Errorf("failed to open protobuf file %s: %w", cfg.Path, err) + } + + // Default to JSON format for protobuf + format := "json" + + // Parse protobuf-specific config if provided + if cfg.ParserConfig != nil { + // For now, we'll detect format from the config or file extension + // This is a simplified approach - in a real implementation, + // we'd want a more robust protobuf configuration structure + format = "json" // Default assumption for streaming protobuf data + } + + var pcfg config.ParserConfig + if cfg.ParserConfig != nil { + pcfg = *cfg.ParserConfig + } + + reader := &ProtobufReader{ + file: file, + format: format, + parserConfig: pcfg, + } + + // Initialize scanner for text-based formats + if format == "json" || format == "text" { + reader.scanner = bufio.NewScanner(file) + } + + return reader, nil +} + +// Read reads the next record from the protobuf file. +func (r *ProtobufReader) Read() (Record, error) { + switch r.format { + case "json": + return r.readJSONFormat() + case "text": + return r.readTextFormat() + case "binary": + return r.readBinaryFormat() + default: + return nil, fmt.Errorf("unsupported protobuf format: %s", r.format) + } +} + +// readJSONFormat reads JSON-serialized protobuf messages (most common for streaming) +func (r *ProtobufReader) readJSONFormat() (Record, error) { + if !r.scanner.Scan() { + err := r.scanner.Err() + if err != nil { + return nil, err + } + return nil, io.EOF + } + + line := r.scanner.Text() + if line == "" { + return r.Read() // Skip empty lines + } + + var record Record + err := json.Unmarshal([]byte(line), &record) + if err != nil { + return nil, fmt.Errorf("failed to parse JSON protobuf message: %w", err) + } + + // Apply recursive JSON parsing if enabled + if r.parserConfig.JSONInString { + record = r.processJSONInString(record) + } + + return record, nil +} + +// readTextFormat reads text format protobuf messages +func (r *ProtobufReader) readTextFormat() (Record, error) { + if !r.scanner.Scan() { + err := r.scanner.Err() + if err != nil { + return nil, err + } + return nil, io.EOF + } + + line := r.scanner.Text() + if line == "" { + return r.Read() // Skip empty lines + } + + // For text format, we'll convert to JSON first for easier processing + // This is a simplified approach - real text format parsing would be more complex + record := make(Record) + record["raw_text"] = line + // TODO: Implement proper text format parsing when needed + + return record, nil +} + +// readBinaryFormat reads binary protobuf messages +func (r *ProtobufReader) readBinaryFormat() (Record, error) { + // For binary format, we would need the message descriptor + // This would need to be implemented when binary protobuf support is required + return nil, fmt.Errorf("binary protobuf format not yet implemented") +} + +// processJSONInString applies recursive JSON parsing to string fields +func (r *ProtobufReader) processJSONInString(data Record) Record { + result := make(Record) + for k, v := range data { + switch val := v.(type) { + case string: + result[k] = r.tryParseJSON(val) + case map[string]interface{}: + result[k] = r.processJSONInString(Record(val)) + case []interface{}: + result[k] = r.processArray(val) + default: + result[k] = val + } + } + return result +} + +// processArray applies recursive JSON parsing to array elements +func (r *ProtobufReader) processArray(arr []interface{}) []interface{} { + result := make([]interface{}, len(arr)) + for i, v := range arr { + switch val := v.(type) { + case string: + result[i] = r.tryParseJSON(val) + case map[string]interface{}: + result[i] = r.processJSONInString(Record(val)) + case []interface{}: + result[i] = r.processArray(val) + default: + result[i] = val + } + } + return result +} + +// tryParseJSON attempts to recursively unmarshal a string as JSON. +// This is similar to the CSV reader's implementation. +func (r *ProtobufReader) tryParseJSON(s string) interface{} { + if s == "" { + return s + } + + var result interface{} + err := json.Unmarshal([]byte(s), &result) + if err != nil { + return s + } + + if strVal, ok := result.(string); ok { + return r.tryParseJSON(strVal) + } + + if mapVal, ok := result.(map[string]interface{}); ok { + for k, v := range mapVal { + if strV, ok := v.(string); ok { + mapVal[k] = r.tryParseJSON(strV) + } + } + return mapVal + } + + return result +} + +// Close closes the underlying file. +func (r *ProtobufReader) Close() error { + return r.file.Close() +} \ No newline at end of file diff --git a/internal/pkg/schema/schema_test.go b/internal/pkg/schema/schema_test.go index 203237c..11bfa92 100644 --- a/internal/pkg/schema/schema_test.go +++ b/internal/pkg/schema/schema_test.go @@ -75,3 +75,79 @@ func TestCollectFieldValues(t *testing.T) { t.Errorf("Expected %d fields, got %d. Keys: %v", len(expectedKeys), len(fieldValues), reflect.ValueOf(fieldValues).MapKeys()) } } + +func TestGenerate_Protobuf(t *testing.T) { + cfg := config.Source{ + Type: "protobuf", + Path: "../../../testdata/testcase4_protobuf/source1.jsonpb", + Sampler: &config.Sampler{ + SampleSize: 10, + }, + } + + reader, err := datareader.New(cfg) + if err != nil { + t.Fatalf("Failed to create protobuf reader: %v", err) + } + defer reader.Close() + + schema, err := Generate(reader, cfg.Sampler) + if err != nil { + t.Fatalf("Generate() error = %v", err) + } + + if schema == nil { + t.Fatal("Schema is nil") + } + + // Check that we have the expected flattened fields from our protobuf data + expectedFields := []string{ + "user_id", + "profile", + "profile.email", + "profile.age", + "profile.preferences", + "profile.preferences.theme", + "profile.preferences.language", + "activity", + "activity.last_login", + "activity.login_count", + "activity.devices", + "activity.devices[]", + "subscription", + "subscription.plan_type", + "subscription.status", + "subscription.expires_at", + } + + for _, fieldName := range expectedFields { + if _, ok := schema.Fields[fieldName]; !ok { + t.Errorf("Expected field '%s' not found in protobuf schema", fieldName) + } + } + + // Check some specific field types + if field, ok := schema.Fields["user_id"]; ok { + if field.Type != "string" { + t.Errorf("user_id type: got %s, want string", field.Type) + } + } + + if field, ok := schema.Fields["profile.age"]; ok { + if field.Type != "numeric" { + t.Errorf("profile.age type: got %s, want numeric", field.Type) + } + } + + if field, ok := schema.Fields["activity.last_login"]; ok { + if field.Type != "datetime" { + t.Errorf("activity.last_login type: got %s, want datetime", field.Type) + } + } + + if field, ok := schema.Fields["activity.devices"]; ok { + if field.Type != "array" { + t.Errorf("activity.devices type: got %s, want array", field.Type) + } + } +} diff --git a/testdata/testcase4_protobuf/config1.yaml b/testdata/testcase4_protobuf/config1.yaml new file mode 100644 index 0000000..c7d01b3 --- /dev/null +++ b/testdata/testcase4_protobuf/config1.yaml @@ -0,0 +1,7 @@ +source: + type: protobuf + path: testdata/testcase4_protobuf/source1.jsonpb + parser_config: + json_in_string: false + sampler: + sample_size: 100 \ No newline at end of file diff --git a/testdata/testcase4_protobuf/config2.yaml b/testdata/testcase4_protobuf/config2.yaml new file mode 100644 index 0000000..023440b --- /dev/null +++ b/testdata/testcase4_protobuf/config2.yaml @@ -0,0 +1,7 @@ +source: + type: protobuf + path: testdata/testcase4_protobuf/source2.jsonpb + parser_config: + json_in_string: false + sampler: + sample_size: 100 \ No newline at end of file diff --git a/testdata/testcase4_protobuf/source1.jsonpb b/testdata/testcase4_protobuf/source1.jsonpb new file mode 100644 index 0000000..9c799fa --- /dev/null +++ b/testdata/testcase4_protobuf/source1.jsonpb @@ -0,0 +1,5 @@ +{"user_id": "user-001", "profile": {"email": "alice@example.com", "age": 30, "preferences": {"theme": "dark", "language": "en"}}, "activity": {"last_login": "2025-09-15T10:00:00Z", "login_count": 150, "devices": ["mobile", "desktop"]}, "subscription": {"plan_type": "premium", "status": "active", "expires_at": "2026-09-15T00:00:00Z"}} +{"user_id": "user-002", "profile": {"email": "bob@example.com", "age": 25, "preferences": {"theme": "light", "language": "es"}}, "activity": {"last_login": "2025-09-14T15:30:00Z", "login_count": 75, "devices": ["desktop"]}, "subscription": {"plan_type": "basic", "status": "active", "expires_at": "2025-12-01T00:00:00Z"}} +{"user_id": "user-003", "profile": {"email": "charlie@example.com", "age": 35, "preferences": {"theme": "dark", "language": "fr"}}, "activity": {"last_login": "2025-09-13T08:15:00Z", "login_count": 200, "devices": ["mobile", "tablet", "desktop"]}, "subscription": {"plan_type": "premium", "status": "active", "expires_at": "2026-06-01T00:00:00Z"}} +{"user_id": "user-004", "profile": {"email": "diana@example.com", "age": 28, "preferences": {"theme": "light", "language": "de"}}, "activity": {"last_login": "2025-09-12T12:45:00Z", "login_count": 50, "devices": ["mobile"]}, "subscription": {"plan_type": "basic", "status": "cancelled", "expires_at": "2025-10-01T00:00:00Z"}} +{"user_id": "user-005", "profile": {"email": "eve@example.com", "age": 32, "preferences": {"theme": "dark", "language": "en"}}, "activity": {"last_login": "2025-09-16T09:20:00Z", "login_count": 300, "devices": ["desktop", "mobile"]}, "subscription": {"plan_type": "premium", "status": "active", "expires_at": "2027-01-01T00:00:00Z"}} \ No newline at end of file diff --git a/testdata/testcase4_protobuf/source2.jsonpb b/testdata/testcase4_protobuf/source2.jsonpb new file mode 100644 index 0000000..ef99194 --- /dev/null +++ b/testdata/testcase4_protobuf/source2.jsonpb @@ -0,0 +1,5 @@ +{"user_id": "user-001", "profile": {"email": "alice@example.com", "age": 31, "preferences": {"theme": "light", "language": "en"}}, "activity": {"last_login": "2025-09-16T11:00:00Z", "login_count": 155, "devices": ["mobile", "desktop"]}, "subscription": {"plan_type": "premium", "status": "active", "expires_at": "2026-09-15T00:00:00Z"}} +{"user_id": "user-002", "profile": {"email": "bob@example.com", "age": 25, "preferences": {"theme": "light", "language": "es"}}, "activity": {"last_login": "2025-09-14T15:30:00Z", "login_count": 75, "devices": ["desktop"]}, "subscription": {"plan_type": "basic", "status": "active", "expires_at": "2025-12-01T00:00:00Z"}} +{"user_id": "user-006", "profile": {"email": "frank@example.com", "age": 29, "preferences": {"theme": "auto", "language": "it"}}, "activity": {"last_login": "2025-09-15T14:10:00Z", "login_count": 120, "devices": ["tablet"]}, "subscription": {"plan_type": "premium", "status": "active", "expires_at": "2026-03-01T00:00:00Z"}} +{"user_id": "user-004", "profile": {"email": "diana@example.com", "age": 28, "preferences": {"theme": "light", "language": "de"}}, "activity": {"last_login": "2025-09-12T12:45:00Z", "login_count": 50, "devices": ["mobile"]}, "subscription": {"plan_type": "basic", "status": "cancelled", "expires_at": "2025-10-01T00:00:00Z"}} +{"user_id": "user-007", "profile": {"email": "grace@example.com", "age": 26, "preferences": {"theme": "dark", "language": "ja"}}, "activity": {"last_login": "2025-09-17T07:30:00Z", "login_count": 80, "devices": ["mobile", "desktop"]}, "subscription": {"plan_type": "basic", "status": "active", "expires_at": "2025-11-15T00:00:00Z"}} \ No newline at end of file