In [None]:
!pip install azure-kusto-data
!pip install azure-kusto-ingest
!pip install azure-storage-blob
!pip install avro

In [None]:
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient

import avro.schema 
from avro.datafile import DataFileReader 
from avro.io import DatumReader

import json

In [None]:
FILE_NAME = "<FILE_PATH>"
CONTAINER_NAME = "<CONTAINER_NAME>"
CONN_STR = "<CONNECTION_STR>" # from Access Keys
blob_service_client = BlobServiceClient.from_connection_string(CONN_STR)
container_client = blob_service_client.get_container_client(CONTAINER_NAME)
blob_client = container_client.get_blob_client(FILE_NAME)
stream_downloader = blob_client.download_blob()

In [None]:
with open("userdata", "wb+") as my_file:
    my_file.write(blob_client.download_blob().readall())

In [None]:
reader: DataFileReader = DataFileReader(open('userdata', 'rb'), DatumReader())
schema: dict = json.loads(reader.meta.get('avro.schema').decode('utf-8'))

In [None]:
import os
os.remove("userdata")

In [None]:
len(schema['fields'])

In [None]:
fields = {}
for i in schema['fields']:
    if isinstance(i['type'],list):
        fields.update({i['name']: i['type'][1]})
    else:
        fields.update({i['name']: i['type']})

In [None]:
for k,v in fields.items():
    print(k+' - '+v)

In [None]:
AAD_TENANT_ID = "<TENANT_ID>"            # from Active Directory
KUSTO_URI = "<KUSTO_URI>"                # from ADX overview page
KUSTO_INGEST_URI = "<KUSTO_INGEST_URI>"  # from ADX overview page
KUSTO_DATABASE = "<DATABASE_NAME>"       # from ADX overview page  

In [None]:
KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(
KUSTO_INGEST_URI, AAD_TENANT_ID)
KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(
KUSTO_URI, AAD_TENANT_ID)
DESTINATION_TABLE = "UserDataAvro"
# DESTINATION_TABLE_COLUMN_MAPPING = ""

In [None]:
CONTAINER = "<CONTAINER_NAME>"
ACCOUNT_NAME = "<STORAGE_ACCOUNT_NAME>" 
SAS_TOKEN = "<SAS_Token>" 
FILE_PATH = "<PATH>"
FILE_SIZE = 0    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

In [None]:
KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create external table UserDataAvro (registration_dttm :string, \
                                                             id :long , \
                                                             first_name : string, \
                                                             last_name : string, \
                                                             email : string, \
                                                             gender : string, \
                                                             ip_address : string, \
                                                             cc : long ,\
                                                             country : string, \
                                                             birthdate : string,\
                                                             salary : double, \
                                                             title : string , \
                                                             comments : string ) \
kind = storage \
dataformat = apacheavro \
( \
h@'" + str(BLOB_PATH) + "'\
)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])

In [None]:
QUERY = "external_table('UserDataAvro') | take 10"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])