# Part 1 Create Cluster 

In [59]:
from azure.common.credentials import ServicePrincipalCredentials
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.resource.resources.models import DeploymentMode

## Get and Set Azure Credentials 


In [None]:
!az login

In [6]:
!az ad sp create-for-rbac --sdk-auth > mycredentials.json

[33mRetrying role assignment creation: 1/36[0m


In [8]:
import os, json
with open('mycredentials.json') as data_file:    
    azure_session = json.load(data_file)
# delete credentials file
os.remove("mycredentials.json")

## Create Azure Resource Manager Client

In [60]:
credentials = ServicePrincipalCredentials(
    client_id=azure_session["clientId"],
    secret=azure_session["clientSecret"],
    tenant=azure_session["tenantId"]
)
client = ResourceManagementClient(credentials, azure_session["subscriptionId"])

## Resource Group Parameters

In [61]:
RESOURCE_GROUP_NAME = 'kustodeploymenttest' # Set resource group name here
AZURE_REGION = 'East US' # Set region here
location = ''.join(AZURE_REGION.split()).lower()

## Set Azure Data Explorer Cluster Parameters

In [62]:
CLUSTER_NAME = 'cdacluster'
CLUSTER_NODE_SIZE = 'D13_v2'
CLUSTER_NODE_TEIR = 'Standard'
CLUSTER_NODE_CAPACITY = 2

## Set Azure Data Explorer DB Deployment Parameters

In [63]:
DB_NAME = 'cdadb'
DB_SOFT_DELETION_PERIOD = 3650
DB_HOT_CACHE_PERIOD = 31

## Define Azure Data Explorer Deployment Template

In [64]:
URI = "https://{}.{}.kusto.windows.net:443".format(CLUSTER_NAME, location)
DATA_INGESTION_URI = "https://ingest-{}.{}.kusto.windows.net:443/".format(CLUSTER_NAME, location)
template = {\
    "$schema": "https://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
    "contentVersion": "1.0.0.0",
    "resources": [\
        {\
            "type": "Microsoft.Kusto/Clusters",
            "sku": {\
                "name": CLUSTER_NODE_SIZE,
                "tier": CLUSTER_NODE_TEIR,
                "capacity": CLUSTER_NODE_CAPACITY
            },
            "name": CLUSTER_NAME,
            "apiVersion": "2017-09-07-privatepreview",
            "location": AZURE_REGION,
            "properties": {\
                "trustedExternalTenants": [
                    {
                        "value": azure_session["tenantId"]
                    }
                ],                
            }
        },
        {\
            "type": "Microsoft.Kusto/Clusters/Databases",
            "name": "{}/{}".format(CLUSTER_NAME, DB_NAME),
            "apiVersion": "2017-09-07-privatepreview",
            "location": AZURE_REGION,
            "properties": {
                "softDeletePeriodInDays": DB_SOFT_DELETION_PERIOD,
                "hotCachePeriodInDays": DB_HOT_CACHE_PERIOD,
            },
            "dependsOn": [\
                "[resourceId('Microsoft.Kusto/Clusters', '{}')]".format(CLUSTER_NAME)
            ]
        }
    ]
}

In [65]:
deployment_properties = {
    'mode': DeploymentMode.incremental,
    'template': template,
}

## Create Resource Group and Deploy 

Note this could take 10-15 min

In [66]:
resource_group_params = {'location':location}
client.resource_groups.create_or_update('kustodeploymenttest', resource_group_params)

# https://docs.microsoft.com/en-us/python/api/azure-mgmt-resource/azure.mgmt.resource.resources.v2018_05_01.operations.deploymentsoperations?view=azure-python#create-or-update
deployment_async_operation = client.deployments.create_or_update(
    RESOURCE_GROUP_NAME,
    'azure-sample',
    deployment_properties
)
deployment_async_operation.wait()

# Part 2 Populate Cluster

In [None]:
# Install Perquisite Libraries
!pip install azure-kusto-data==0.0.13
!pip install azure-kusto-ingest==0.0.13

## Batch Ingestion

### Import Statements and Set Constants


In [67]:
from azure.kusto.data.request import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
import pandas as pd
import datetime

In [79]:
# Construct the connection string. In this example we are using key authentication, 
# alternate options are AAD Application Certificate, AAD Application Key, and AAD User Password. 
# More samples available @ # https://github.com/Azure/azure-kusto-python/blob/master/azure-kusto-data/tests/sample.py

KCSB_INGEST = KustoConnectionStringBuilder.with_aad_application_key_authentication(DATA_INGESTION_URI,
                                                                            azure_session["clientId"],
                                                                            azure_session["clientSecret"],
                                                                            azure_session["tenantId"])

KCSB_ENGINE = KustoConnectionStringBuilder.with_aad_application_key_authentication(URI,
                                                                            azure_session["clientId"],
                                                                            azure_session["clientSecret"],
                                                                            azure_session["tenantId"])

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

### Set Source File Information 

In this example, we are using sample file hosted on Azure Blob Storage

In [76]:
from azure.storage.blob import BlockBlobService
from azure.kusto.ingest import KustoIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamplefiles"
SAS_TOKEN = "?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D"
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + CONTAINER + "/" + FILE_PATH + SAS_TOKEN

In [77]:
KUSTO_CLIENT = KustoClient(KCSB_ENGINE)

### Create Table in Azure Data Explorer

Click [here](https://docs.microsoft.com/en-us/azure/kusto/management/tables#create-table) for more info on Table commands

In [78]:
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DB_NAME, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])

Unnamed: 0,TableName,Schema,DatabaseName,Folder,DocString
0,StormEvents,"{""Name"":""StormEvents"",""OrderedColumns"":[{""Name...",cdadb,,


### Create Ingestion Mapping

Ingestion mapping is required for Azure Data Explorer to map the data in the input file with the table in Azure Data Explorer. 
Click [here](https://docs.microsoft.com/en-us/azure/kusto/management/mappings) for more info on Ingestion Mapping.

In [80]:
CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(DB_NAME, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Unnamed: 0,Name,Kind,Mapping,LastUpdatedOn,Database,Table
0,StormEvents_CSV_Mapping,CSV,"[{""Name"":""StartTime"",""DataType"":""datetime"",""Cs...",2018-11-17 21:01:53.854675400,cdadb,StormEvents


### Queue up a message for ingestion

In the below code we are queuing-up a message for Azure Data Explorer to pull data from the Blob storage and ingest the data into Azure Data Explorer.

In [81]:
INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)

### All ingestion properties are documented here: https://docs.microsoft.com/en-us/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES  = IngestionProperties(database=DB_NAME, table=DESTINATION_TABLE, dataFormat=DataFormat.csv, mappingReference=DESTINATION_TABLE_COLUMN_MAPPING, additionalProperties={'ignoreFirstRecord': 'true'})
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)  # 10 is the raw size of the data in bytes
INGESTION_CLIENT.ingest_from_blob(BLOB_DESCRIPTOR,ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Done queuing up ingestion with Azure Data Explorer


### Validate the data was ingested into the table

Wait for 5-10 min for the queued ingestion to schedule the ingest and load the data into Azure Data Explorer. Run the following command to get the count of records in the StormEvents table.

In [83]:
QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(DB_NAME, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Unnamed: 0,Count
0,59066


# Part 3 Query Cluster 

### Connect to Kusto and execute query

The following command will execute Kusto queries against Help cluster and store the output in Pandas DataFrame

In [89]:
KUSTO_QUERY  = "StormEvents | sort by StartTime desc | take 1000"

RESPONSE = KUSTO_CLIENT.execute(DB_NAME, KUSTO_QUERY)

In [90]:
# Explore data in DataFrame

df = dataframe_from_result_table(RESPONSE.primary_results[0])

df

Unnamed: 0,StartTime,EndTime,EpisodeId,EventId,State,EventType,InjuriesDirect,InjuriesIndirect,DeathsDirect,DeathsIndirect,...,Source,BeginLocation,EndLocation,BeginLat,BeginLon,EndLat,EndLon,EpisodeNarrative,EventNarrative,StormSummary
0,2007-12-31 23:53:00,2007-12-31 23:53:00,12037,65838,CALIFORNIA,High Wind,0,0,0,0,...,Mesonet,,,,,,,The Warm Springs RAWS sensor reported northerl...,"{\r\n ""TotalDamages"": 0,\r\n ""StartTime"": ""2...",
1,2007-12-31 23:53:00,2007-12-31 23:53:00,12037,65839,CALIFORNIA,High Wind,0,0,0,0,...,Mesonet,,,,,,,North to northeast winds gusting to around 58 ...,"{\r\n ""TotalDamages"": 0,\r\n ""StartTime"": ""2...",
2,2007-12-31 22:30:00,2007-12-31 23:59:00,12950,71590,MICHIGAN,Winter Storm,0,0,0,0,...,Trained Spotter,,,,,,,This heavy snow event continued into the early...,"{\r\n ""TotalDamages"": 0,\r\n ""StartTime"": ""2...",
3,2007-12-31 22:30:00,2007-12-31 23:59:00,12950,71589,MICHIGAN,Winter Storm,0,0,0,0,...,Trained Spotter,,,,,,,This heavy snow event continued into the early...,"{\r\n ""TotalDamages"": 0,\r\n ""StartTime"": ""2...",
4,2007-12-31 22:30:00,2007-12-31 23:59:00,12950,71588,MICHIGAN,Winter Storm,0,0,0,0,...,Trained Spotter,,,,,,,This heavy snow event continued into the early...,"{\r\n ""TotalDamages"": 0,\r\n ""StartTime"": ""2...",
5,2007-12-31 20:30:00,2007-12-31 23:59:00,12950,71587,MICHIGAN,Winter Storm,0,0,0,0,...,Trained Spotter,,,,,,,This heavy snow event continued into the early...,"{\r\n ""TotalDamages"": 0,\r\n ""StartTime"": ""2...",
6,2007-12-31 20:30:00,2007-12-31 23:59:00,12950,71586,MICHIGAN,Winter Storm,0,0,0,0,...,Trained Spotter,,,,,,,This heavy snow event continued into the early...,"{\r\n ""TotalDamages"": 0,\r\n ""StartTime"": ""2...",
7,2007-12-31 19:00:00,2007-12-31 23:59:00,12994,71917,ALASKA,High Wind,0,0,0,0,...,AWOS,,,,,,,Craig reported very windy with higher gusts fr...,"{\r\n ""TotalDamages"": 0,\r\n ""StartTime"": ""2...",
8,2007-12-31 19:00:00,2007-12-31 23:59:00,12994,71915,ALASKA,High Wind,0,0,0,0,...,ASOS,,,,,,,Sitka observed a measured gust of 48 MPH at 22...,"{\r\n ""TotalDamages"": 0,\r\n ""StartTime"": ""2...",
9,2007-12-31 18:30:00,2007-12-31 23:59:00,13007,71979,CALIFORNIA,High Wind,0,0,0,0,...,Mesonet,,,,,,,A 72 mph wind gusts was measured by the Tonner...,"{\r\n ""TotalDamages"": 0,\r\n ""StartTime"": ""2...",
