In [2]:
import sys
sys.path.append("../")
from monitoring.data_collector import Online_Collector, Batch_Collector
from monitoring.data_drift import Drift_Analysis
import pandas as pd
import time
from azureml.core import Workspace
tenant_id = "72f988bf-86f1-41af-91ab-2d7cd011db47"
wsname = "ws01ent"
rg = "azureml"
subid = "0e9bace8-7a81-4922-83b5-d995ff706507"
from azureml.core.authentication import InteractiveLoginAuthentication

interactive_auth = InteractiveLoginAuthentication(tenant_id=tenant_id)

# ws = Workspace(workspace_name=wsname, resource_group=rg, subscription_id=subid, auth=interactive_auth)
ws = Workspace.from_config()

kv=ws.get_default_keyvault()


#### Prerequisite
1. Create a service principal and secret (SP)
2. Provision an ADX cluster and create a DB
3. Add the SP to be contributor of the cluster
4. pip install following packages: pip install --upgrade azure-mgmt-eventhub azure-eventhub azure-mgmt-kusto azure-kusto-ingest azure-kusto-data azure-identity azure-common


#### Provisioning resource


The following code provision target table at ADX, eventhub and ingestion procedure to ingest data automatically once data is available at eventhub

In [9]:
tenant_id = "72f988bf-86f1-41af-91ab-2d7cd011db47"
#Application ID
client_id = "af883abf-89dd-4889-bdb3-1ee84f68465e"
#Client Secret, set it at your WS' keyvault with key name same as your client_id
client_secret = kv.get_secret(client_id)
subscription_id = "0e9bace8-7a81-4922-83b5-d995ff706507"

cluster_uri = "https://adx02.westus2.kusto.windows.net" #URL of the ADX Cluster

db_name = "db01"
table_name = "isd_weather_test" #new dataset
kv=ws.get_default_keyvault()


sample_pd_data = pd.read_parquet("data/test_data.parquet")
sample_pd_data['timestamp'] = sample_pd_data['datetime']
sample_pd_data.drop(['datetime'], inplace=True, axis=1)
online_collector = Online_Collector(tenant_id, client_id,client_secret,cluster_uri,db_name,table_name, sample_pd_data)

.create table isd_weather_test (usaf: string, wban: string, latitude: real, longitude: real, elevation: real, windAngle: real, windSpeed: real, temperature: real, seaLvlPressure: real, cloudCoverage: string, presentWeatherIndicator: real, pastWeatherIndicator: real, precipTime: real, precipDepth: real, snowDepth: real, stationName: string, countryOrRegion: string, p_k: string, year: int, day: int, version: real, timestamp: datetime)


In [43]:
# time.sleep(120) #It takes about 2 minutes for stream_collect to start working 
# online_collector.stream_collect(sample_pd_data)
# online_collector.batch_collect(sample_pd_data)

In [11]:
analysis = Drift_Analysis(tenant_id, client_id, client_secret, cluster_uri,db_name)
df = analysis.query(f"""
{table_name}| take(10)
""")
df

Unnamed: 0,usaf,wban,latitude,longitude,elevation,windAngle,windSpeed,temperature,seaLvlPressure,cloudCoverage,...,precipTime,precipDepth,snowDepth,stationName,countryOrRegion,p_k,year,day,version,timestamp
0,999999,53878,35.419,-82.557,641.0,,,4.4,,,...,,,,ASHEVILLE 13 S,US,999999-53878,2008,23,1.0,2008-01-23 00:15:00+00:00
1,999999,53878,35.419,-82.557,641.0,,,-1.6,,,...,,,,ASHEVILLE 13 S,US,999999-53878,2008,21,1.0,2008-01-21 22:55:00+00:00
2,999999,53877,35.495,-82.614,656.0,,,-2.3,,,...,,,,ASHEVILLE 8 SSW,US,999999-53877,2008,22,1.0,2008-01-22 04:55:00+00:00
3,999999,53877,35.495,-82.614,656.0,,,1.3,,,...,,,,ASHEVILLE 8 SSW,US,999999-53877,2008,19,1.0,2008-01-19 09:50:00+00:00
4,999999,53877,35.495,-82.614,656.0,,,-2.2,,,...,,,,ASHEVILLE 8 SSW,US,999999-53877,2008,22,1.0,2008-01-22 03:50:00+00:00
5,999999,53877,35.495,-82.614,656.0,,,-0.9,,,...,,,,ASHEVILLE 8 SSW,US,999999-53877,2008,21,1.0,2008-01-21 22:30:00+00:00
6,999999,53878,35.419,-82.557,641.0,,0.3,1.0,,,...,1.0,0.0,,ASHEVILLE 13 S,US,999999-53878,2008,6,1.0,2008-01-06 13:00:00+00:00
7,999999,53878,35.419,-82.557,641.0,,,-9.5,,,...,,,,ASHEVILLE 13 S,US,999999-53878,2008,25,1.0,2008-01-25 06:35:00+00:00
8,999999,53878,35.419,-82.557,641.0,,,-4.2,,,...,,,,ASHEVILLE 13 S,US,999999-53878,2008,16,1.0,2008-01-16 06:20:00+00:00
9,999999,53878,35.419,-82.557,641.0,,,4.0,,,...,,,,ASHEVILLE 13 S,US,999999-53878,2008,28,1.0,2008-01-28 14:50:00+00:00


In [12]:
import matplotlib.pyplot as plt

In [103]:
df = analysis.query(f"""
{table_name}| summarize avg(temperature) by bin(timestamp, 7d) | sort by timestamp asc
""")
df

Unnamed: 0,timestamp,avg_temperature
0,2007-12-31 00:00:00+00:00,1.0
1,2008-01-14 00:00:00+00:00,-1.45
2,2008-01-21 00:00:00+00:00,-2.016667
3,2008-01-28 00:00:00+00:00,4.0


In [102]:
df = analysis.query(f"""
{table_name}| extend day = floor(timestamp % 7d, 1d) |
summarize avg(temperature) by day
""")
df

Unnamed: 0,day,avg_temperature
0,2 days,0.1
1,0 days,0.5
2,1 days,-2.25
3,5 days,1.3
4,6 days,1.0
5,4 days,-9.5


In [136]:
df.values[0][0]

Timestamp('2008-01-02 07:05:00+0000', tz='UTC')

In [69]:
df.timestamp.min()

2008

In [100]:
%matplotlib widget
import ipywidgets as widgets
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import dates as mpl_dates
import datetime

In [124]:
output = widgets.Output()
 
def get_values():
    df = analysis.query(f"""
    {text_tblname.value}| take 10
    """)
    return list(df.columns)

def get_type():
    df = analysis.query(f"""
    {text_tblname.value}| take 10
    """)
    num_cols = list(df.select_dtypes('number').columns)
    cat_cols = list(df.select_dtypes(include=["object"]).columns)
    if column_selector.value in num_cols:
        return ["avg", "min", "max"]
    elif column_selector.value in cat_cols:
        return ["euclidian_distance", "unique_values"]

def get_timestamp():
    df = analysis.query(f"""
    {text_tblname.value}| getschema | where ColumnType == "datetime"
    """)
    return list(df["ColumnName"].values)
    
def get_dates():
    df = analysis.query(f"""
    {text_tblname.value}| summarize min(timestamp), max(timestamp)
    """)

    min_max = df.values.tolist()[0]

    w_options = pd.date_range(pd.offsets.MonthBegin().rollback(min_max[0]), pd.offsets.MonthBegin().rollback(min_max[1]),
              freq="MS", inclusive="both").strftime("%b/%Y").tolist()
    return w_options

# create some control elements
text_tblname = widgets.Text(value="isd_weather_test", description="table name", continuous_update=False)
text_tblnamecomp = widgets.Text(value="", description="table name", continuous_update=False)
clusteruri_name = widgets.Text(value="https://adx02.westus2.kusto.windows.net", description="cluster URI", continuous_update=False)
database_name = widgets.Text(value="db01", description="Database Name", continuous_update=False)
column_selector = widgets.Dropdown(value=get_values()[0], options=get_values(), description="column name")
datecol_selector = widgets.Dropdown(value=get_timestamp()[0], options=get_timestamp(), description="Date Column")
metric_selector = widgets.Dropdown(value=get_type()[0], options=get_type(), description="metric")
freq_selector = widgets.Dropdown(value="daily", options=["daily", "weekly", "monthly"], description="frequency")
date_slider = widgets.SelectionRangeSlider(
    options=get_dates(),
    index=(0,len(get_dates())-1),
    disabled=False,
    description="date range")
test_button = widgets.Button(description="Plot")
 

# callback functions
def update_tbl(change):
    df = analysis.query(f"""
    {text_tblname.value}| take 10
    """)
    column_selector.options= list(df.columns)

def update_col(change):
    new_cols = get_type()
    metric_selector.options = new_cols

def get_data():
    freq_dict = {"daily": "1d",
                "weekly": "7d"}
    metric = metric_selector.value
    col = column_selector.value
    start_date = datetime.datetime.strptime(date_slider.value[0], "%b/%Y").strftime("%Y-%m-%d")
    end_date = datetime.datetime.strptime(date_slider.value[1], "%b/%Y").strftime("%Y-%m-%d")
    date_col = datecol_selector.value
    freq = freq_dict[freq_selector.value]
    if metric in ["avg", "min", "max"]:
        df = analysis.query(f"""
        {table_name}
        | summarize {metric}({col}) by bin({date_col}, {freq}) | sort by {date_col} asc
        """)
        # | where timestamp >= datetime({start_date}) and timestamp <= datetime({end_date})
        return df
    else:
        return None
def run_button(b):
    output.clear_output(wait=True)
    plt.close('all')
    datafr = get_data()

    x_data = list(datafr.timestamp.values)
    y_data = list(datafr.iloc[:,-1].values)
    with output:
        fig, ax = plt.subplots(1, 1, figsize=(6, 4)) # constrained_layout=True,
        
        # move the toolbar to the bottom
        fig.canvas.toolbar_position = 'bottom'
        # ax.grid(True)    
        line, = ax.plot(x_data, y_data, linestyle='solid')
        ax.set_xlim(left=x_data[0], right=x_data[-1])
        fig.autofmt_xdate()
        date_format = mpl_dates.DateFormatter('%m-%d-%Y')
        ax.xaxis.set_major_formatter(date_format)
        ax.set_ylim(bottom=0, top=max(y_data)*1.1)
        ax.set_title(f"{metric_selector.value} {freq_selector.value} {column_selector.value}")
        fig.suptitle("")
        # fig.canvas.draw()
        fig.show()


# connect callbacks and traits
text_tblname.observe(update_tbl, 'value')
column_selector.observe(update_col, 'value')
test_button.on_click(run_button)

controls1 = widgets.VBox([clusteruri_name, database_name])
controls2_1 = widgets.Accordion(children=[text_tblnamecomp])
controls2_1.set_title(0, "Optional")
controls2 = widgets.VBox([text_tblname, column_selector, datecol_selector, metric_selector, freq_selector, date_slider, controls2_1, test_button]) # color_picker, text_xlabel, text_ylabel, int_slider
controls3 = widgets.VBox([])

tab2 = widgets.VBox([controls2, output])
tab = widgets.Tab([controls1, tab2, controls3, controls3])

tab.set_title(0, "Login")
tab.set_title(1, "Feature Details")
tab.set_title(2, "Data Drift")
tab.set_title(3, "Model Monitoring")
widgets.VBox([tab])

VBox(children=(Tab(children=(VBox(children=(Text(value='https://adx02.westus2.kusto.windows.net', continuous_u…

In [49]:
plt.close('all')