Skip to content

Commit

Permalink
Merge pull request #29 from krishanjmistry/simplified-ingest-client
Browse files Browse the repository at this point in the history
  • Loading branch information
AsafMah committed Apr 30, 2024
2 parents 2dc2482 + 4774e29 commit e1dca0a
Show file tree
Hide file tree
Showing 17 changed files with 1,343 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[workspace]
members = ["azure-kusto-data"]
members = ["azure-kusto-data", "azure-kusto-ingest"]
resolver = "2"
26 changes: 26 additions & 0 deletions azure-kusto-ingest/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "azure-kusto-ingest"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
azure-kusto-data = { path = "../azure-kusto-data", default-features = false }
# Azure SDK for Rust crates versions must be kept in sync
azure_core = "0.19"
azure_storage = "0.19"
azure_storage_blobs = "0.19"
azure_storage_queues = "0.19"

async-lock = "3"
rand = "0.8"
serde = { version = "1", features = ["serde_derive"] }
serde_json = "1"
thiserror = "1"
time = { version = "0.3", features = ["serde-human-readable", "macros"] }
url = "2"
uuid = { version = "1", features = ["v4", "serde"] }

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
61 changes: 61 additions & 0 deletions azure-kusto-ingest/examples/ingest_from_blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::env;

use azure_kusto_data::prelude::{ConnectionString, KustoClient, KustoClientOptions};
use azure_kusto_ingest::data_format::DataFormat;
use azure_kusto_ingest::descriptors::{BlobAuth, BlobDescriptor};
use azure_kusto_ingest::ingestion_properties::IngestionProperties;
use azure_kusto_ingest::queued_ingest::QueuedIngestClient;

/// Example of ingesting data into Kusto from Azure Blob Storage using managed identities.
/// This example enforces that the Kusto cluster has a system assigned managed identity with access to the storage account
///
/// There are some steps that need to be taken to allow for managed identities to work:
/// - Permissions as the ingestor to initiate ingestion
/// https://learn.microsoft.com/en-us/azure/data-explorer/kusto/api/netfx/kusto-ingest-client-permissions
/// - Permissions for Kusto to access storage
/// https://learn.microsoft.com/en-us/azure/data-explorer/ingest-data-managed-identity
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cluster_ingest_uri = env::var("KUSTO_INGEST_URI").expect("Must define KUSTO_INGEST_URI");
let user_mi_object_id =
env::var("KUSTO_USER_MI_OBJECT_ID").expect("Must define KUSTO_USER_MI_OBJECT_ID");

// Create a Kusto client with managed identity authentication via the user assigned identity
let kusto_client = KustoClient::new(
ConnectionString::with_managed_identity_auth(cluster_ingest_uri, Some(user_mi_object_id)),
KustoClientOptions::default(),
)?;

// Create a queued ingest client
let queued_ingest_client = QueuedIngestClient::new(kusto_client);

// Define ingestion properties
let ingestion_properties = IngestionProperties {
database_name: env::var("KUSTO_DATABASE_NAME").expect("Must define KUSTO_DATABASE_NAME"),
table_name: env::var("KUSTO_TABLE_NAME").expect("Must define KUSTO_TABLE_NAME"),
// Don't delete the blob on successful ingestion
retain_blob_on_success: Some(true),
// File format of the blob is Parquet
data_format: DataFormat::Parquet,
// Assume the server side default for flush_immediately
flush_immediately: None,
};

// Define the blob to ingest from
let blob_uri = env::var("BLOB_URI").expect("Must define BLOB_URI");
// Define the size of the blob if known, this improves ingestion performance as Kusto does not need to access the blob to determine the size
let blob_size: Option<u64> = match env::var("BLOB_SIZE") {
Ok(blob_size) => Some(blob_size.parse().expect("BLOB_SIZE must be a valid u64")),
Err(_) => None,
};

// Create the blob descriptor, also specifying that the blob should be accessed using the system assigned managed identity of the Kusto cluster
let blob_descriptor = BlobDescriptor::new(blob_uri, blob_size, None)
.with_blob_auth(BlobAuth::SystemAssignedManagedIdentity);

let _ = queued_ingest_client
.ingest_from_blob(blob_descriptor, ingestion_properties)
.await?;

Ok(())
}
51 changes: 51 additions & 0 deletions azure-kusto-ingest/src/client_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use azure_core::ClientOptions;

/// Allows configurability of ClientOptions for the storage clients used within [QueuedIngestClient](crate::queued_ingest::QueuedIngestClient)
#[derive(Clone, Default)]
pub struct QueuedIngestClientOptions {
pub queue_service_options: ClientOptions,
pub blob_service_options: ClientOptions,
}

impl From<ClientOptions> for QueuedIngestClientOptions {
/// Creates a `QueuedIngestClientOptions` struct where the same [ClientOptions] are used for all services
fn from(client_options: ClientOptions) -> Self {
Self {
queue_service_options: client_options.clone(),
blob_service_options: client_options,
}
}
}

/// Builder for [QueuedIngestClientOptions], call `build()` to create the [QueuedIngestClientOptions]
#[derive(Clone, Default)]
pub struct QueuedIngestClientOptionsBuilder {
queue_service_options: ClientOptions,
blob_service_options: ClientOptions,
}

impl QueuedIngestClientOptionsBuilder {
pub fn new() -> Self {
Self {
queue_service_options: ClientOptions::default(),
blob_service_options: ClientOptions::default(),
}
}

pub fn with_queue_service_options(mut self, queue_service_options: ClientOptions) -> Self {
self.queue_service_options = queue_service_options;
self
}

pub fn with_blob_service_options(mut self, blob_service_options: ClientOptions) -> Self {
self.blob_service_options = blob_service_options;
self
}

pub fn build(self) -> QueuedIngestClientOptions {
QueuedIngestClientOptions {
queue_service_options: self.queue_service_options,
blob_service_options: self.blob_service_options,
}
}
}
37 changes: 37 additions & 0 deletions azure-kusto-ingest/src/data_format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use serde::Serialize;

/// All data formats supported by Kusto.
/// Default is [DataFormat::CSV]
#[derive(Serialize, Clone, Debug, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum DataFormat {
ApacheAvro,
Avro,
#[default]
CSV,
JSON,
MultiJSON,
ORC,
Parquet,
PSV,
RAW,
SCSV,
SOHsv,
SingleJSON,
SStream,
TSV,
TSVe,
TXT,
W3CLOGFILE,
}

// Unit tests
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn data_format_default() {
assert_eq!(DataFormat::default(), DataFormat::CSV);
}
}
148 changes: 148 additions & 0 deletions azure-kusto-ingest/src/descriptors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use uuid::Uuid;

/// Encapsulates the information related to a blob that is required to ingest from a blob
#[derive(Debug, Clone)]
pub struct BlobDescriptor {
uri: String,
pub(crate) size: Option<u64>,
pub(crate) source_id: Uuid,
/// Authentication information for the blob; when [None], the uri is passed through as is
blob_auth: Option<BlobAuth>,
}

impl BlobDescriptor {
/// Create a new BlobDescriptor.
///
/// Parameters:
/// - `uri`: the uri of the blob to ingest from, note you can use the optional helper method `with_blob_auth` to add authentication information to the uri
/// - `size`: although the size is not required, providing it is recommended as it allows Kusto to better plan the ingestion process
/// - `source_id`: optional, useful if tracking ingestion status, if not provided, a random uuid will be generated
pub fn new(uri: impl Into<String>, size: Option<u64>, source_id: Option<Uuid>) -> Self {
let source_id = match source_id {
Some(source_id) => source_id,
None => Uuid::new_v4(),
};

Self {
uri: uri.into(),
size,
source_id,
blob_auth: None,
}
}

/// Mutator to modify the authentication information of the BlobDescriptor
pub fn with_blob_auth(mut self, blob_auth: BlobAuth) -> Self {
self.blob_auth = Some(blob_auth);
self
}

/// Returns the uri with the authentication information concatenated, ready to be serialized into the ingestion message
pub(crate) fn uri(&self) -> String {
match &self.blob_auth {
Some(BlobAuth::SASToken(sas_token)) => {
format!("{}?{}", self.uri, sas_token.as_str())
}
Some(BlobAuth::UserAssignedManagedIdentity(object_id)) => {
format!("{};managed_identity={}", self.uri, object_id)
}
Some(BlobAuth::SystemAssignedManagedIdentity) => {
format!("{};managed_identity=system", self.uri)
}
None => self.uri.to_string(),
}
}
}

/// Helper for adding authentication information to a blob path in the format expected by Kusto
#[derive(Clone)]
pub enum BlobAuth {
/// adds `?<sas_token>` to the blob path
SASToken(String),
/// adds `;managed_identity=<identity>` to the blob path
UserAssignedManagedIdentity(String),
/// adds `;managed_identity=system` to the blob path
SystemAssignedManagedIdentity,
}

/// Custom impl of Debug to avoid leaking sensitive information
impl std::fmt::Debug for BlobAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BlobAuth::SASToken(_) => f.debug_struct("SASToken").finish(),
BlobAuth::UserAssignedManagedIdentity(object_id) => f
.debug_struct("UserAssignedManagedIdentity")
.field("object_id", object_id)
.finish(),
BlobAuth::SystemAssignedManagedIdentity => {
f.debug_struct("SystemAssignedManagedIdentity").finish()
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn blob_descriptor_with_no_auth_modification() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let blob_descriptor = BlobDescriptor::new(uri, None, None);

assert_eq!(blob_descriptor.uri(), uri);
}

#[test]
fn blob_descriptor_with_sas_token() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let sas_token = "my_sas_token";
let blob_descriptor = BlobDescriptor::new(uri, None, None)
.with_blob_auth(BlobAuth::SASToken(sas_token.to_string()));

assert_eq!(blob_descriptor.uri(), format!("{uri}?{sas_token}"));
}

#[test]
fn blob_descriptor_with_user_assigned_managed_identity() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let object_id = "my_object_id";
let blob_descriptor = BlobDescriptor::new(uri, None, None)
.with_blob_auth(BlobAuth::UserAssignedManagedIdentity(object_id.to_string()));

assert_eq!(
blob_descriptor.uri(),
format!("{uri};managed_identity={object_id}")
);
}

#[test]
fn blob_descriptor_with_system_assigned_managed_identity() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let blob_descriptor = BlobDescriptor::new(uri, None, None)
.with_blob_auth(BlobAuth::SystemAssignedManagedIdentity);

assert_eq!(
blob_descriptor.uri(),
format!("{uri};managed_identity=system")
);
}

#[test]
fn blob_descriptor_with_size() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let size = 123;
let blob_descriptor = BlobDescriptor::new(uri, Some(size), None);

assert_eq!(blob_descriptor.size, Some(size));
}

#[test]
fn blob_descriptor_with_source_id() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let source_id = Uuid::new_v4();
let blob_descriptor = BlobDescriptor::new(uri, None, Some(source_id));

assert_eq!(blob_descriptor.source_id, source_id);
}
}
20 changes: 20 additions & 0 deletions azure-kusto-ingest/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//! Defines [Error] for representing failures in various operations.

/// Error type for kusto ingestion operations.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Error raised when failing to obtain ingestion resources.
#[error("Error obtaining ingestion resources: {0}")]
ResourceManagerError(#[from] super::resource_manager::ResourceManagerError),

/// Error relating to (de-)serialization of JSON data
#[error("Error in JSON serialization/deserialization: {0}")]
JsonError(#[from] serde_json::Error),

/// Error occurring within core azure crates
#[error("Error in azure-core: {0}")]
AzureError(#[from] azure_core::error::Error),
}

/// Result type for kusto ingest operations.
pub type Result<T> = std::result::Result<T, Error>;
Loading

0 comments on commit e1dca0a

Please sign in to comment.