New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NodeJs client and sending AVRO contents to the eventhub #6

Closed
jtaubensee opened this Issue Jan 3, 2017 · 14 comments

Comments

Projects
None yet
2 participants
@jtaubensee

From @ThomasAnnerel on April 19, 2016 20:13

Dear,

We are trying to use the node js client to send avro contents to the event hub. We use the avcs (https://github.com/mtth/avsc) library blockencoder to create a byte[] containing the schema and avro-object. But when we try to get sample data from the EventHub with Stream Analytics we always get the message that no event where found. If we configure the input as JSON we get a json object with the buffer back.

                var avroEncoder = new avro.streams.BlockEncoder(recordTypeBase);
                var buffers = [];
                avroEncoder.on('data',function(data){
                   console.log(data.length);buffers.push(data);
                });
                avroEncoder.on('finish', function() {
                    console.log('Sending message: ');
                    var systemProps = {};
                    systemProps["x-opt-partition-key"] = 'partition1';
                    var data = Buffer.concat(buffers);
                    var evtData = new EventData(data, systemProps);
                    tx.send(body);
                    done();
                });
                avroEncoder.write(avroObj);
                avroEncoder.end();

Copied from original issue: Azure/azure-event-hubs#118

@jtaubensee

This comment has been minimized.

Show comment
Hide comment
@jtaubensee

jtaubensee Jan 3, 2017

From @ryancrawcour on April 20, 2016 20:4

@djrosanova could you take a look. is this an issue with the avsc lib? or something on EH?

From @ryancrawcour on April 20, 2016 20:4

@djrosanova could you take a look. is this an issue with the avsc lib? or something on EH?

@jtaubensee

This comment has been minimized.

Show comment
Hide comment
@jtaubensee

jtaubensee Jan 3, 2017

From @ThomasAnnerel on April 21, 2016 10:16

@ryancrawcour and @djrosanova if I write it to a file with avsc lib I get a valid avro file. Readable by avro-tools (http://www.michael-noll.com/blog/2013/03/17/reading-and-writing-avro-files-from-the-command-line/)

From @ThomasAnnerel on April 21, 2016 10:16

@ryancrawcour and @djrosanova if I write it to a file with avsc lib I get a valid avro file. Readable by avro-tools (http://www.michael-noll.com/blog/2013/03/17/reading-and-writing-avro-files-from-the-command-line/)

@jtaubensee

This comment has been minimized.

Show comment
Hide comment
@jtaubensee

jtaubensee Jan 3, 2017

From @ThomasAnnerel on April 26, 2016 19:45

Any update on this one guys?

From @ThomasAnnerel on April 26, 2016 19:45

Any update on this one guys?

@jtaubensee

This comment has been minimized.

Show comment
Hide comment
@jtaubensee

jtaubensee Jan 3, 2017

From @ryancrawcour on April 26, 2016 20:12

I'm trying to repro this locally. In theory EH doesn't even look at the body of the message. Garbage in, garbage out. In theory.

From @ryancrawcour on April 26, 2016 20:12

I'm trying to repro this locally. In theory EH doesn't even look at the body of the message. Garbage in, garbage out. In theory.

@jtaubensee

This comment has been minimized.

Show comment
Hide comment
@jtaubensee

jtaubensee Jan 3, 2017

From @noodlefrenzy on April 26, 2016 21:18

Hey @ThomasAnnerel - under the covers this is sending data as AMQP Transfer frames using node-amqp10. I wonder if the issue lies there, as that does some "intelligent" work to try and ensure the types sent can be decoded by the .NET EH SDK on the other end (I say "intelligent" in quotes because I'm the one who did it).

Can you run again with the environment variable DEBUG set to amqp* and perhaps paste a Gist link to the output of one of your Transfer frames?

From @noodlefrenzy on April 26, 2016 21:18

Hey @ThomasAnnerel - under the covers this is sending data as AMQP Transfer frames using node-amqp10. I wonder if the issue lies there, as that does some "intelligent" work to try and ensure the types sent can be decoded by the .NET EH SDK on the other end (I say "intelligent" in quotes because I'm the one who did it).

Can you run again with the environment variable DEBUG set to amqp* and perhaps paste a Gist link to the output of one of your Transfer frames?

@jtaubensee

This comment has been minimized.

Show comment
Hide comment
@jtaubensee

This comment has been minimized.

Show comment
Hide comment
@jtaubensee

jtaubensee Jan 3, 2017

From @noodlefrenzy on April 28, 2016 17:39

Looking at the transfer frame:

Frame of length 5064:                                                                                          
  00                                                                                                           
    number(type=53) 14                                                                                         
    c0 15 11                                                                                                   
      0                                                                                                        
      number(type=52) 01                                                                                       
      a0 1                                                                                                     
        1                                                                                                      
      0                                                                                                        
      false                                                                                                    
      false                                                                                                    
      null                                                                                                     
      null                                                                                                     
      false                                                                                                    
      false                                                                                                    
      false                                                                                                    
    00                                                                                                         
      number(type=53) 72                                                                                       
      c1 34 2                                                                                                  
        a3 19                                                                                                  
          x-opt-partition-key                                                                                  
        a1 10                                                                                                  
          partition1                                                                                           
      00                                                                                                       
        number(type=53) 75                                                                                     
        b0 2465                                                                                                
          {"type":"Buffer","data":[79,98,...

It's quite clear that Sender is living up to its documentation: https://github.com/Azure/azure-event-hubs/blob/master/node/send_receive/lib/sender.js#L49 and JSON-encoding the Buffer before sending it.

Unfortunately, this is due to the behavior of the Service Bus policy in node-amqp10: https://github.com/noodlefrenzy/node-amqp10/blob/master/lib/policies/service_bus_policy.js#L12

As I mentioned the @ryancrawcour, the best way to tackle this would be to allow AzureEventHubs clients to provide an amqp10 instance factory on construction so they can build the underlying client with whatever policies they desire (or even stub out their own amqp10 provider).

All that said, it does seem like a bug that the ServiceBus policy wouldn't just pass through Buffers if given, so I've opened noodlefrenzy/node-amqp10#234 - unfortunately AzureEventHubs is still on v2 of node-amqp10 so that fix is unlikely to back-prop any time soon.

From @noodlefrenzy on April 28, 2016 17:39

Looking at the transfer frame:

Frame of length 5064:                                                                                          
  00                                                                                                           
    number(type=53) 14                                                                                         
    c0 15 11                                                                                                   
      0                                                                                                        
      number(type=52) 01                                                                                       
      a0 1                                                                                                     
        1                                                                                                      
      0                                                                                                        
      false                                                                                                    
      false                                                                                                    
      null                                                                                                     
      null                                                                                                     
      false                                                                                                    
      false                                                                                                    
      false                                                                                                    
    00                                                                                                         
      number(type=53) 72                                                                                       
      c1 34 2                                                                                                  
        a3 19                                                                                                  
          x-opt-partition-key                                                                                  
        a1 10                                                                                                  
          partition1                                                                                           
      00                                                                                                       
        number(type=53) 75                                                                                     
        b0 2465                                                                                                
          {"type":"Buffer","data":[79,98,...

It's quite clear that Sender is living up to its documentation: https://github.com/Azure/azure-event-hubs/blob/master/node/send_receive/lib/sender.js#L49 and JSON-encoding the Buffer before sending it.

Unfortunately, this is due to the behavior of the Service Bus policy in node-amqp10: https://github.com/noodlefrenzy/node-amqp10/blob/master/lib/policies/service_bus_policy.js#L12

As I mentioned the @ryancrawcour, the best way to tackle this would be to allow AzureEventHubs clients to provide an amqp10 instance factory on construction so they can build the underlying client with whatever policies they desire (or even stub out their own amqp10 provider).

All that said, it does seem like a bug that the ServiceBus policy wouldn't just pass through Buffers if given, so I've opened noodlefrenzy/node-amqp10#234 - unfortunately AzureEventHubs is still on v2 of node-amqp10 so that fix is unlikely to back-prop any time soon.

@jtaubensee

This comment has been minimized.

Show comment
Hide comment
@jtaubensee

jtaubensee Jan 3, 2017

From @ThomasAnnerel on May 2, 2016 21:16

After the fix still no success with sending avro through eventhub to stream analytics. Thinking about going to JSON.

From @ThomasAnnerel on May 2, 2016 21:16

After the fix still no success with sending avro through eventhub to stream analytics. Thinking about going to JSON.

@jtaubensee

This comment has been minimized.

Show comment
Hide comment
@jtaubensee

jtaubensee Jan 3, 2017

From @ryancrawcour on May 2, 2016 21:19

Have you tried from Java or.NET (as per the example blog post)? Just want to confirm that your problem is confined only to node.js. Knowing this gives us a starting point in finding the issue.

From @ryancrawcour on May 2, 2016 21:19

Have you tried from Java or.NET (as per the example blog post)? Just want to confirm that your problem is confined only to node.js. Knowing this gives us a starting point in finding the issue.

@jtaubensee

This comment has been minimized.

Show comment
Hide comment
@jtaubensee

jtaubensee Jan 3, 2017

From @ThomasAnnerel on May 3, 2016 4:46

I will try .net Today. Ryan can you confirm that the sample data feature of stream analytics reads the avro format?

We see with ServiceBus Explorer binary data on the event hub, so the buffer to json bug is gone

From @ThomasAnnerel on May 3, 2016 4:46

I will try .net Today. Ryan can you confirm that the sample data feature of stream analytics reads the avro format?

We see with ServiceBus Explorer binary data on the event hub, so the buffer to json bug is gone

@jtaubensee

This comment has been minimized.

Show comment
Hide comment
@jtaubensee

jtaubensee Jan 3, 2017

From @ThomasAnnerel on May 3, 2016 20:33

Tried with the same avro schema using .NET and it worked. See sample code:
https://gist.github.com/ThomasAnnerel/9e15aa8a9e844a1834ec08d530fd1a36

So I can validate that it's not the schema but the actual nodejs implementation.

If we get the sample data we notice that stream analytics converts it to json so it can be used in stream analytics test.

When writen to blob storage we notice it's a valid json

From @ThomasAnnerel on May 3, 2016 20:33

Tried with the same avro schema using .NET and it worked. See sample code:
https://gist.github.com/ThomasAnnerel/9e15aa8a9e844a1834ec08d530fd1a36

So I can validate that it's not the schema but the actual nodejs implementation.

If we get the sample data we notice that stream analytics converts it to json so it can be used in stream analytics test.

When writen to blob storage we notice it's a valid json

@jtaubensee

This comment has been minimized.

Show comment
Hide comment
@jtaubensee

jtaubensee Jan 3, 2017

From @ThomasAnnerel on May 3, 2016 21:40

Apparantly my mocha test are swallowing some exceptions from the SDK. I rewriten it in a simple nodejs console thingie.

Now I get this execption
Unhandled rejection (<{"descriptor":{"buffer":{"type":"Buffe...>, no stack trace)

var avro = require('avsc');
var fs = require('fs');
var EventHubClient = require('azure-event-hubs').Client;

console.log("start logging");
function parseAvroType(file){
    var avroSchemaString = fs.readFileSync(file);
    var avroBaseSchema = JSON.parse(avroSchemaString);
    var recordTypeBase = avro.parse(avroBaseSchema);

    return recordTypeBase;
}
function generateGuid() {
    function s4() {
        return Math.floor((1 + Math.random()) * 0x10000)
            .toString(16)
            .substring(1);
    }
    return s4() + s4() + '-' + s4() + '-' + s4() + '-' +
        s4() + '-' + s4() + s4() + s4();
}

var client = EventHubClient.fromConnectionString('', '');
client.createSender()
    .then(function(tx) {
        console.log('sender created ')
        tx.on('errorReceived', function(err) {
            console.log(err);
            done(err);
        });


        var avroType = parseAvroType('schemas/webview.avsc');

            var i = 1;
            var timestamp = new Date().getTime();
            var testObj = {
                "cookieId": generateGuid(),
                "timestamp" : timestamp,
                "viewId" : generateGuid(),
                "sessionId" : generateGuid(),
                "viewSequence" : i,
                "brand":"ds",
                "pageUrl":"http://www.standaard.be/" + i
            }
            var avroObj = avroType.clone(testObj,{wrapUnions:true});
            console.log("Clone avroObj :" + avroObj)
            var avroEncoder = new avro.streams.BlockEncoder(avroType);
            var buffers = [];
            avroEncoder.on('data',function(data){
                console.log(data.length);
                buffers.push(data);
                //message =message + data.toString('UTF8');
            });
            avroEncoder.on('finish', function() {
                var data = Buffer.concat(buffers);
                tx.send(data);
            });
            avroEncoder.write(avroObj);
            avroEncoder.end();




    });

From @ThomasAnnerel on May 3, 2016 21:40

Apparantly my mocha test are swallowing some exceptions from the SDK. I rewriten it in a simple nodejs console thingie.

Now I get this execption
Unhandled rejection (<{"descriptor":{"buffer":{"type":"Buffe...>, no stack trace)

var avro = require('avsc');
var fs = require('fs');
var EventHubClient = require('azure-event-hubs').Client;

console.log("start logging");
function parseAvroType(file){
    var avroSchemaString = fs.readFileSync(file);
    var avroBaseSchema = JSON.parse(avroSchemaString);
    var recordTypeBase = avro.parse(avroBaseSchema);

    return recordTypeBase;
}
function generateGuid() {
    function s4() {
        return Math.floor((1 + Math.random()) * 0x10000)
            .toString(16)
            .substring(1);
    }
    return s4() + s4() + '-' + s4() + '-' + s4() + '-' +
        s4() + '-' + s4() + s4() + s4();
}

var client = EventHubClient.fromConnectionString('', '');
client.createSender()
    .then(function(tx) {
        console.log('sender created ')
        tx.on('errorReceived', function(err) {
            console.log(err);
            done(err);
        });


        var avroType = parseAvroType('schemas/webview.avsc');

            var i = 1;
            var timestamp = new Date().getTime();
            var testObj = {
                "cookieId": generateGuid(),
                "timestamp" : timestamp,
                "viewId" : generateGuid(),
                "sessionId" : generateGuid(),
                "viewSequence" : i,
                "brand":"ds",
                "pageUrl":"http://www.standaard.be/" + i
            }
            var avroObj = avroType.clone(testObj,{wrapUnions:true});
            console.log("Clone avroObj :" + avroObj)
            var avroEncoder = new avro.streams.BlockEncoder(avroType);
            var buffers = [];
            avroEncoder.on('data',function(data){
                console.log(data.length);
                buffers.push(data);
                //message =message + data.toString('UTF8');
            });
            avroEncoder.on('finish', function() {
                var data = Buffer.concat(buffers);
                tx.send(data);
            });
            avroEncoder.write(avroObj);
            avroEncoder.end();




    });
@jtaubensee

This comment has been minimized.

Show comment
Hide comment
@jtaubensee

jtaubensee Jan 3, 2017

From @ThomasAnnerel on May 9, 2016 21:34

Found another problem in amqp10 lib for nodejs.
I found another problem in the servicebus policy
maxMessageSize: 10000, // Arbitrary choice

If you look at https://azure.microsoft.com/en-us/documentation/articles/service-bus-quotas/ we notice this should be 256 bytes

Kind regards
Thomas Annerel

From @ThomasAnnerel on May 9, 2016 21:34

Found another problem in amqp10 lib for nodejs.
I found another problem in the servicebus policy
maxMessageSize: 10000, // Arbitrary choice

If you look at https://azure.microsoft.com/en-us/documentation/articles/service-bus-quotas/ we notice this should be 256 bytes

Kind regards
Thomas Annerel

@amarzavery

This comment has been minimized.

Show comment
Hide comment
@amarzavery

amarzavery Apr 24, 2018

Collaborator

@ThomasAnnerel @shpros @djrosanova - We have a new preview version "0.1.0" of the event hubs node.js client. This client was re-written from scratch. Please try the new version and let us know if you still see issues. You can find examples over here.

Collaborator

amarzavery commented Apr 24, 2018

@ThomasAnnerel @shpros @djrosanova - We have a new preview version "0.1.0" of the event hubs node.js client. This client was re-written from scratch. Please try the new version and let us know if you still see issues. You can find examples over here.

@amarzavery amarzavery closed this Apr 24, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment