Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: AVRO-3986: [csharp] - Plain JSON encoding for Apache Avro #2888

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 2 additions & 17 deletions lang/csharp/src/apache/main/Generic/GenericReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ public object Read(object reuse, Schema writerSchema, Schema readerSchema, Decod
case Schema.Type.Union:
return ReadUnion(reuse, (UnionSchema)writerSchema, readerSchema, d);
case Schema.Type.Logical:
return ReadLogical(reuse, (LogicalSchema)writerSchema, readerSchema, d);
LogicalSchema writerLogicalSchema = writerSchema as LogicalSchema;
return Read<object>(writerSchema.Tag, readerSchema, ()=>d.ReadLogicalTypeValue(writerLogicalSchema));
default:
throw new AvroException("Unknown schema type: " + writerSchema);
}
Expand Down Expand Up @@ -552,22 +553,6 @@ protected virtual object ReadUnion(object reuse, UnionSchema writerSchema, Schem
return Read(reuse, ws, readerSchema, d);
}

/// <summary>
/// Deserializes an object based on the writer's logical schema. Uses the underlying logical type to convert
/// the value to the logical type.
/// </summary>
/// <param name="reuse">If appropriate, uses this object instead of creating a new one.</param>
/// <param name="writerSchema">The UnionSchema that the writer used.</param>
/// <param name="readerSchema">The schema the reader uses.</param>
/// <param name="d">The decoder for serialization.</param>
/// <returns>The deserialized object.</returns>
protected virtual object ReadLogical(object reuse, LogicalSchema writerSchema, Schema readerSchema, Decoder d)
{
LogicalSchema ls = (LogicalSchema)readerSchema;

return writerSchema.LogicalType.ConvertToLogicalValue(Read(reuse, writerSchema.BaseSchema, ls.BaseSchema, d), ls);
}

/// <summary>
/// Deserializes a fixed object and returns the object. The default implementation uses CreateFixed()
/// and GetFixedBuffer() and returns what CreateFixed() returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private ReadItem ResolveReader(Schema writerSchema, Schema readerSchema)
case Schema.Type.Union:
return ResolveUnion((UnionSchema)writerSchema, readerSchema);
case Schema.Type.Logical:
return ResolveLogical((LogicalSchema)writerSchema, (LogicalSchema)readerSchema);
return Read(d=>d.ReadLogicalTypeValue((LogicalSchema)writerSchema));
default:
throw new AvroException("Unknown schema type: " + writerSchema);
}
Expand Down Expand Up @@ -400,12 +400,6 @@ private object ReadArray(object reuse, Decoder decoder, ArrayAccess arrayAccess,
return array;
}

private ReadItem ResolveLogical(LogicalSchema writerSchema, LogicalSchema readerSchema)
{
var baseReader = ResolveReader(writerSchema.BaseSchema, readerSchema.BaseSchema);
return (r, d) => readerSchema.LogicalType.ConvertToLogicalValue(baseReader(r, d), readerSchema);
}

private ReadItem ResolveFixed(FixedSchema writerSchema, FixedSchema readerSchema)
{
if (readerSchema.Size != writerSchema.Size)
Expand Down
15 changes: 3 additions & 12 deletions lang/csharp/src/apache/main/Generic/PreresolvingDatumWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using Avro.Util;
using Encoder = Avro.IO.Encoder;

namespace Avro.Generic
Expand Down Expand Up @@ -100,7 +102,7 @@ private WriteItem ResolveWriter( Schema schema )
case Schema.Type.Union:
return ResolveUnion((UnionSchema)schema);
case Schema.Type.Logical:
return ResolveLogical((LogicalSchema)schema);
return (v,e) => Write<object>(v, schema.Tag, (w)=>e.WriteLogicalTypeValue(w, (LogicalSchema)schema));
default:
return (v, e) => Error(schema, v);
}
Expand Down Expand Up @@ -235,17 +237,6 @@ private void WriteArray(WriteItem itemWriter, object array, Encoder encoder)
encoder.WriteArrayEnd();
}

/// <summary>
/// Serializes a logical value object by using the underlying logical type to convert the value
/// to its base value.
/// </summary>
/// <param name="schema">The logical schema.</param>
protected WriteItem ResolveLogical(LogicalSchema schema)
{
var baseWriter = ResolveWriter(schema.BaseSchema);
return (d, e) => baseWriter(schema.LogicalType.ConvertToBaseValue(d, schema), e);
}

private WriteItem ResolveMap(MapSchema mapSchema)
{
var itemWriter = ResolveWriter(mapSchema.ValueSchema);
Expand Down
23 changes: 23 additions & 0 deletions lang/csharp/src/apache/main/IO/BinaryDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,29 @@ public void ReadFixed(byte[] buffer, int start, int length)
Read(buffer, start, length);
}

/// <inheritdoc/>
public object ReadLogicalTypeValue(LogicalSchema logicalSchema)
{
Schema baseSchema = logicalSchema.BaseSchema;
switch(baseSchema.Tag)
{
case Schema.Type.Int:
return logicalSchema.LogicalType.ConvertToLogicalValue(ReadInt(), logicalSchema);
case Schema.Type.Long:
return logicalSchema.LogicalType.ConvertToLogicalValue(ReadLong(), logicalSchema);
case Schema.Type.Bytes:
return logicalSchema.LogicalType.ConvertToLogicalValue(ReadBytes(), logicalSchema);
case Schema.Type.String:
return logicalSchema.LogicalType.ConvertToLogicalValue(ReadString(), logicalSchema);
case Schema.Type.Fixed:
byte[] fixedValue = new byte[((FixedSchema)baseSchema).Size];
ReadFixed(fixedValue);
return logicalSchema.LogicalType.ConvertToLogicalValue(fixedValue, logicalSchema);
default:
throw new AvroException($"Unsupported logical type: {logicalSchema.Tag}");
}
}

/// <summary>
/// Skips over a null value.
/// </summary>
Expand Down
32 changes: 32 additions & 0 deletions lang/csharp/src/apache/main/IO/BinaryEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,38 @@ public void WriteFixed(byte[] data, int start, int len)
stream.Write(data, start, len);
}

/// <inheritdoc/>
public void WriteLogicalTypeValue(object value, LogicalSchema schema)
{
var baseValue = schema.LogicalType.ConvertToBaseValue(value, schema);
switch (baseValue)
{
case int i:
WriteInt(i);
break;
case long l:
WriteLong(l);
break;
case float f:
WriteFloat(f);
break;
case double d:
WriteDouble(d);
break;
case byte[] bytes:
WriteBytes(bytes);
break;
case string s:
WriteString(s);
break;
case Avro.Generic.GenericFixed fixedValue:
WriteFixed(fixedValue.Value);
break;
default:
throw new AvroTypeException($"Unsupported conversion from {baseValue.GetType()}");
}
}

private void writeBytes(byte[] bytes)
{
stream.Write(bytes, 0, bytes.Length);
Expand Down
8 changes: 8 additions & 0 deletions lang/csharp/src/apache/main/IO/Decoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ public interface Decoder
/// <param name="length">Number of bytes to read</param>
void ReadFixed(byte[] buffer, int start, int length);


/// <summary>
/// Reads a logical type value.
/// </summary>
/// <param name="schema">Schema of the logical type</param>
/// <returns></returns>
object ReadLogicalTypeValue(LogicalSchema schema);

/// <summary>
/// Skips a null Avro type on the stream.
/// </summary>
Expand Down
7 changes: 7 additions & 0 deletions lang/csharp/src/apache/main/IO/Encoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ public interface Encoder
/// <param name="len">Number of bytes to write.</param>
void WriteFixed(byte[] data, int start, int len);

/// <summary>
/// Writes a logical type value
/// </summary>
/// <param name="value">Value to be written</param>
/// <param name="schema">Logical type schema</param>
void WriteLogicalTypeValue(object value, LogicalSchema schema);

/// <summary>
/// Flushes the encoder.
/// </summary>
Expand Down
Loading
Loading