Skip to content

Latest commit

 

History

History

samples-csharp

Kusto bindings for Azure Functions - .NET

Table of Contents

Setup Function Project

These instructions will guide you through creating your Function Project and adding the Kusto binding extension. This only needs to be done once for every function project you create. If you have one created already you can skip this step.

  1. Install Azure Functions Core Tools

  2. Create a function project for .NET:

    mkdir SampleApp
    cd SampleApp
    func init --worker-runtime dotnet
  3. Enable Kusto bindings on the function project.

    Install the extension.

    dotnet add package Microsoft.Azure.WebJobs.Extensions.Kusto --prerelease
  4. Use the local.settings.json to provide the KustoConnectionString

    {
        "IsEncrypted": false,
        "Values": {
            "AzureWebJobsStorage": "UseDevelopmentStorage=true",
            "FUNCTIONS_WORKER_RUNTIME": "dotnet",
            "AzureWebJobsDashboard": "",
            "KustoConnectionString": "Data Source=https://<kusto-cluster>.kusto.windows.net;Database=<database>;Fed=True;AppClientId=<app-id>;AppKey=<app-key>;Authority Id=<tenant-id>"
        },
        "ConnectionStrings": {
            "rabbitMQConnectionAppSetting": "amqp://guest:guest@rabbitmq:5672"
        }
    }

    and host.json

        {
            "version": "2.0",
            "logging": {
                "applicationInsights": {
                    "samplingSettings": {
                        "isEnabled": true,
                        "excludedTypes": "Request"
                    }
                }
            }
        }
  5. Reference the set-up to create sample tables, mappings , functions required for the example

  6. For advanced set-up sample with dynamic binding and time based exports, a simple RabbitMQ (with management plugins) docker instance can be set up. A simple queue can be declared by using the CLI

    rabbitmqadmin declare queue name=bindings.test.queue durable=false

Input Binding

See Input Binding Overview for general information about the Kusto Input binding.

KustoAttribute for Input Bindings

The KustoAttribute for Input bindings takes four arguments:

Takes a KQL query or KQL function to run (with optional parameters) and returns the output to the function. The input binding takes the following attributes

  • Database: The database against which the query has to be executed

  • ManagedServiceIdentity: A managed identity can be used to connect to Kusto. To use a System managed identity, use "system", any other identity names are interpreted as user managed identity

  • KqlCommand: The KqlQuery that has to be executed. Can be a KQL query or a KQL Function call

  • KqlParameters: Parameters that act as predicate variables for the KqlCommand. For example "@name={name},@Id={id}" where the parameters {name} and {id} will be substituted at runtime with actual values acting as predicates

  • Connection: The name of the variable that holds the connection string, resolved through environment variables or through function app settings. Defaults to lookup on the variable KustoConnectionString, at runtime this variable will be looked up against the environment. Documentation on connection string can be found at Kusto connection strings e.g.: "KustoConnectionString": "Data Source=https://_**cluster**_.kusto.windows.net;Database=_**Database**_;Fed=True;AppClientId=_**AppId**_;AppKey=_**AppKey**_;Authority Id=TenantId Note that the application id should atleast have viewer privileges on the table(s)/function(s) being queried in the KqlCommand

The following are valid binding types for the result of the query/stored procedure execution:

  • IEnumerable<T>: Each element is a row of the generic type T, where T is a user-defined POCO, or Plain Old C# Object. T should follow the structure of a row in the queried table. See the Query String section for an example of what T should look like. An example is provided here

  • IAsyncEnumerable<T>: Each element is again a row of the generic type T, but the rows are retrieved "lazily". A row of the result is only retrieved when MoveNextAsync is called on the enumerator. This is useful in the case that the query and predicate return a lot of rows. An example is provided here

  • String: A JSON string representation of the rows of the result (an example is provided here. Note that as a generic representation, returns are a JSONArray with 1 row in case of 1 row being selected

  • JArray: A JSONArray type of the rows of the result (an example is provided here. Note that as a generic representation, returns are a JSONArray with 1 row in case of 1 row being selected

Samples for Input Bindings

The repo contains examples of each of these binding types here.

Query String

The input binding executes the declare query_parameters (productId:long);Products | where ProductID == productId query, returning the result as an IEnumerable<Product>, where Product is a user-defined POCO. The Parameters argument passes the {productId} specified in the URL that triggers the function, getproducts/{productId}, as the value of the @productId parameter in the query.

ProductID Name Cost
104 Prod-104 11.01
104 Prod-104 11.11

The corresponding POCO for Product is as follows

    public class Product
    {
        [JsonProperty("ProductID")]
        public long ProductID { get; set; }

        [JsonProperty("Name")]
        public string Name { get; set; }

        [JsonProperty("Cost")]
        public double Cost { get; set; }
    }
    [FunctionName("GetProductsList")]
    public static async Task<IActionResult> RunAsync(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "getproducts/{productId}")]
        HttpRequest req,
        [Kusto(Database:"functionsdb" ,
        KqlCommand = "declare query_parameters (productId:long,rmqPrefix:string);Products | where ProductID == productId and Name !has rmqPrefix" ,
        KqlParameters = "@productId={productId},@rmqPrefix=R-MQ", // Exclude any parameters that have this prefix
        Connection = "KustoConnectionString")]
        IEnumerable<Product> products)
    {
        return new OkObjectResult(products);
    }

KQL Functions

GetProductsByName is the name of a KQL Function in the database functionsdb. The parameter value of the @name parameter in the procedure is in the {name} specified in the getproductsfn/{name} URL.

    [FunctionName("GetProductsFunction")]
    public static IActionResult Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "getproductsfn/{name}")]
        HttpRequest req,
        [Kusto(Database:"functionsdb" ,
        KqlCommand = "declare query_parameters (name:string);GetProductsByName(name)" ,
        KqlParameters = "@name={name}",
        Connection = "KustoConnectionString")]
        IEnumerable<Product> products)
    {
        return new OkObjectResult(products);
    }

IAsyncEnumerable

Using the IAsyncEnumerable binding generally requires that the Run function be async. It is also important to call DisposeAsync at the end of function execution to make sure all resources used by the enumerator are freed.

    [FunctionName("GetProductsAsyncEnumerable")]
    public static async Task<IActionResult> RunAsync(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "getproducts-ae?name={name}")]
        HttpRequest req,
        [Kusto(Database:"functionsdb" ,
        KqlCommand = "declare query_parameters (name:string);Products | where Name == name" ,
        KqlParameters = "@name={name}",
        Connection = "KustoConnectionString")]
        IAsyncEnumerable<Product> products)
    {
        IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
        var productList = new List<Product>();
        while (await enumerator.MoveNextAsync())
        {
            productList.Add(enumerator.Current);
        }
        await enumerator.DisposeAsync();
        return new OkObjectResult(productList);
    }

KustoAttribute for Output Bindings

See Output Binding Overview for general information about the Kusto Input binding.

The KustoAttribute for Output bindings takes the following arguments:

  • Database: The database against which the query has to be executed

  • ManagedServiceIdentity: A managed identity can be used to connect to Kusto. To use a System managed identity, use "system", any other identity names are interpreted as user managed identity

  • TableName: The table to ingest the data into

  • MappingRef: Optional attribute to pass a mapping ref that is already defined in the ADX cluster

  • Connection: The name of the variable that holds the connection string, resolved through environment variables or through function app settings. Defaults to lookup on the variable KustoConnectionString, at runtime this variable will be looked up against the environment. Documentation on connection string can be found at Kusto connection strings

  • DataFormat: The default dataformat is multijson/json. This can be set to text formats supported in the datasource format enumeration. Samples are validated and provided for csv and JSON formats.

The following are valid binding types for the rows to be inserted into the table:

  • ICollector<T>/IAsyncCollector<T>: Each element is a row represented by T, where T is a user-defined POCO, or Plain Old C# Object. T should follow the structure of a row in the queried table. See the Query String for an example of what T should look like.

  • T: Used when just one row is to be inserted into the table.

  • T[]: Each element is again a row of the generic type T. This output binding type requires manual instantiation of the array in the function.

  • string: When data is not a POCO, rather a raw CSV for example that needs to be ingested.

    19222,prod2,220.22
    19223,prod2,221.22
    

The repo contains examples of each of these binding types here. A few examples are also included below.

Samples for Output Bindings

The following are some samples for the above collector types and options

ICollector<T>/IAsyncCollector<T>

When using an ICollector, it is not necessary to instantiate it. The function can add rows to the ICollector directly, and its contents are automatically inserted once the function exits.

   [FunctionName("AddProductsCollector")]
   public static IActionResult Run(
       [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductscollector")]
       HttpRequest req, ILogger log,
       [Kusto(Database:SampleConstants.DatabaseName ,
       TableName =SampleConstants.ProductsTable ,
       Connection = "KustoConnectionString")] ICollector<Product> collector)
   {
       log.LogInformation($"AddProducts function started");
       string body = new StreamReader(req.Body).ReadToEnd();
       Product[] products = JsonConvert.DeserializeObject<Product[]>(body);
       products.ForEach(p =>
       {
           collector.Add(p);
       });
       return products != null ? new ObjectResult(products) { StatusCode = StatusCodes.Status201Created } : new BadRequestObjectResult("Please pass a well formed JSON Product array in the body");
   }

It is also possible to force an insert within the function by calling FlushAsync() on an IAsyncCollector

    [FunctionName("AddProductsAsyncCollector")]
    public static IActionResult Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductsasynccollector")]
        HttpRequest req, ILogger log,
        [Kusto(Database:SampleConstants.DatabaseName ,
        TableName =SampleConstants.ProductsTable ,
        Connection = "KustoConnectionString")] IAsyncCollector<Product> collector)
    {
        log.LogInformation($"AddProductsAsyncCollector function started");
        string body = new StreamReader(req.Body).ReadToEnd();
        Product[] products = JsonConvert.DeserializeObject<Product[]>(body);
        products.ForEach(p =>
        {
            collector.AddAsync(p);
        });
        collector.FlushAsync();
        return products != null ? new ObjectResult(products) { StatusCode = StatusCodes.Status201Created } : new BadRequestObjectResult("Please pass a well formed JSON Product array in the body");
    }

Array

This output binding type requires explicit instantiation within the function body. Note also that the Product[] array must be prefixed by out when attached to the output binding

    [FunctionName("AddProductsArray")]
    public static IActionResult Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductsarray")]
        HttpRequest req, ILogger log,
        [Kusto(Database:SampleConstants.DatabaseName ,
        TableName =SampleConstants.ProductsTable ,
        Connection = "KustoConnectionString")] out Product[] products)
    {
        log.LogInformation($"AddProducts function started");
        string body = new StreamReader(req.Body).ReadToEnd();
        products = JsonConvert.DeserializeObject<Product[]>(body);
        return products != null ? new ObjectResult(products) { StatusCode = StatusCodes.Status201Created } : new BadRequestObjectResult("Please pass a well formed JSON Product array in the body");
    }

Single Row

When binding to a single row, it is also necessary to prefix the row with out

    [FunctionName("AddProductUni")]
    public static IActionResult Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
        HttpRequest req, ILogger log,
        [Kusto(Database:SampleConstants.DatabaseName ,
        TableName =SampleConstants.ProductsTable ,
        Connection = "KustoConnectionString")] out Product product)
    {
        log.LogInformation($"AddProduct function started");
        string body = new StreamReader(req.Body).ReadToEnd();
        product = JsonConvert.DeserializeObject<Product>(body);
        string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
                    product.Name, product.ProductID, product.Cost);
        log.LogInformation("Ingested product {}", productString);
        return new CreatedResult($"/api/addproductuni", product);
    }

Ingest CSV / Multiline CSV

A csv row can be bound to an out string and processed as follows. Note the DataFormat element used in the binding

    [FunctionName("AddProductCsv")]
    public static IActionResult Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductcsv")]
        HttpRequest req, ILogger log,
        [Kusto(Database:SampleConstants.DatabaseName ,
        TableName =SampleConstants.ProductsTable ,
        DataFormat = "csv",
        Connection = "KustoConnectionString")] out string productCsv)
    {
        productCsv = new StreamReader(req.Body).ReadToEnd();
        string productString = string.Format(CultureInfo.InvariantCulture, "(Csv : {0})", productCsv);
        log.LogInformation("Ingested product CSV {}", productString);
        return new CreatedResult($"/api/addproductcsv", productString);
    }

Ingest with mappings

In the event that we had a POCO of type item

        public class Item
        {
            public long ItemID { get; set; }
    #nullable enable
            public string? ItemName { get; set; }
            public double ItemCost { get; set; }
        }

and we have to ingest this to the product table which has got different names. An ingestion mapping reference of item_to_product_json can be created and referenced. For example see mapping reference in the database below

    .show table Products ingestion mappings
Name Kind Mapping
item_to_product_json Json [{"column":"ProductID","path":"$.ItemID","datatype":"","transform":null},{"column":"Name","path":"$.ItemName","datatype":"","transform":null},{"column":"Cost","path":"$.ItemCost","datatype":"","transform":null}]
        [FunctionName("AddProductsWithMapping")]
        public static IActionResult Run(
            [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductswithmapping")]
            HttpRequest req, ILogger log,
            [Kusto(Database:SampleConstants.DatabaseName ,
            TableName =SampleConstants.ProductsTable ,
            MappingRef = "item_to_product_json",
            Connection = "KustoConnectionString")] out Item item)
        {
            log.LogInformation($"AddProductsWithMapping function started");
            string body = new StreamReader(req.Body).ReadToEnd();
            item = JsonConvert.DeserializeObject<Item>(body);
            string productString = string.Format(CultureInfo.InvariantCulture, "(ItemName:{0} ItemID:{1} ItemCost:{2})",
                        item.ItemName, item.ItemID, item.ItemCost);
            log.LogInformation("Ingested item {}", productString);
            return item != null ? new ObjectResult(item) { StatusCode = StatusCodes.Status201Created } : new BadRequestObjectResult("Please pass a well formed JSON Product array in the body");
        }

Advanced example - Dynamic bindings & delta exports

A sightly more advanced example of combining a time based export, dynamic bindings in function (supported in C#) is depicted below. This assumes that a rabbitmq server is running (example in the setup). In this case there is no declarative syntax used in the binding, values for time are picked up from the timer and bound at runtime and the data queried, transformed & then exported to a destination (RabbitMQ in this case for simplicity).

        [FunctionName("TimeBasedExport")]
        public static async Task Run(
            [TimerTrigger("*/5 * * * * *")] TimerInfo exportTimer,
            IBinder binder, ILogger log,
            [RabbitMQ(QueueName = "bindings.test.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] IAsyncCollector<Product> outputProducts)
        {
            DateTime? dateOfRun = exportTimer?.ScheduleStatus?.Last;
            DateTime runTime = dateOfRun == null ? DateTime.UtcNow : exportTimer.ScheduleStatus.Last.ToUniversalTime();
            string startTime = runTime.ToString("yyyy'-'MM'-'dd'T'HH':'mm':'ss'.'fff'Z'");
            // Runs every 5sec, so query this with 5sec delta
            string endTime = runTime.AddSeconds(5).ToString("yyyy'-'MM'-'dd'T'HH':'mm':'ss'.'fff'Z'");
            var kustoAttribute = new KustoAttribute(SampleConstants.DatabaseName)
            {
                Connection = "KustoConnectionString",
                KqlCommand = "declare query_parameters (name:string,startTime:string,endTime:string);Products | extend ig=ingestion_time() | where Name has name | where ig >= todatetime(startTime) and ig <= todatetime(endTime) | order by ig asc",
                KqlParameters = $"@name=Item,@startTime={startTime},@endTime={endTime}"
            };
            // List of ingested records
            var exportedRecords = (await binder.BindAsync<IEnumerable<Product>>(kustoAttribute)).ToList();
            // Count for logs
            log.LogInformation($"Querying data between {startTime} and {endTime} yielded {exportedRecords.Count} records");
            // Send them to a continuous export topic. Just transform the names in this case
            foreach (Product item in exportedRecords)
            {
                item.Name = $"R-MQ-{item.ProductID}"; // A simple transform!
                await outputProducts.AddAsync(item);
            }
        }