# Connect with Data Explorer

In [None]:
# %pip install Kqlmagic --no-cache-dir  --upgrade

In [None]:
%reload_ext Kqlmagic

In [None]:
%kql AzureDataExplorer://tenant="Microsoft.com";code;cluster='iiotmfgdev.westus2';database='mfgdb'

# Load telemetry data from Data Explorer in tabular format

In [None]:
%%kql
opcua_raw
| where payload contains "BatchNumber" and unixtime_milliseconds_todatetime(todouble(payload.SourceTimestamp)) between (datetime(2022-05-04T20:32:00.000Z).. datetime(2022-05-05T00:50:00.000Z))
| mv-apply payload on (
    extend key = tostring(bag_keys(payload)[0])
    | extend value = payload[key]
    | summarize b = make_bag(pack(key, value))
) | evaluate bag_unpack(b)

In [10]:
import pandas as pd
telemetrydf = _kql_raw_result_.to_dataframe()

In [11]:
# Format Datetime Column
telemetrydf["SourceTimestamp"] = pd.to_datetime(telemetrydf["SourceTimestamp"],unit='ms')
print("Rows => {0}".format(telemetrydf.shape[0]))
print("Columns => {0}".format(telemetrydf.shape[1]))
telemetrydf.head(5)

Rows => 7711
Columns => 43


Unnamed: 0,BatchNumber,S1,S10,S11,S12,S13,S14,S15,S16,S17,...,S39,S4,S40,S41,S5,S6,S7,S8,S9,SourceTimestamp
0,7423.0,4.998839,39.5,21.9,866.6161,2046.699,2089.468,943.1278,22.02698,25.83976,...,5773.366,,9501.005,7221.264648,4.999668,5.050548,23.5739,27.6,36.1,2022-05-05 00:40:20.077
1,7424.0,4.999668,39.4,21.8,893.7124,2052.43,2069.875,930.0323,22.02698,25.67179,...,5773.366,,9501.005,7213.02832,4.999668,5.056084,23.60346,27.7,35.9,2022-05-05 00:40:22.084
2,7425.0,4.999668,39.5,21.9,894.306,2060.232,2075.162,930.1899,22.36291,25.83976,...,5773.366,,9501.005,7229.500977,4.999668,5.048703,23.57944,28.1,35.9,2022-05-05 00:40:24.091
3,7426.0,4.999668,39.6,21.9,876.8541,2046.037,2137.928,943.1278,22.19494,25.67179,...,5773.366,,9501.005,7213.02832,4.999668,5.04778,23.60447,28.4,35.9,2022-05-05 00:40:26.098
4,7427.0,4.999668,39.6,22.1,895.3438,2033.45,2090.136,943.4471,22.02698,25.83976,...,5789.014,,9501.005,7229.500977,4.999668,5.045012,23.64226,28.8,36.1,2022-05-05 00:40:28.105


# Get Process Quality Data

In [12]:
import os
from azureml.core.workspace import Workspace
from azureml.core import Dataset, Datastore, Workspace

ws = Workspace.from_config()
iiotmfgdatalakestore = Datastore.get(ws,"iiotmfgdatalakestore") # provide the Datastore name

qualitydf = Dataset.Tabular.from_delimited_files(path = [(iiotmfgdatalakestore, 'qualitydata/batch-quality-data.csv')]).to_pandas_dataframe()
print("Rows => {0}".format(qualitydf.shape[0]))
print("Columns => {0}".format(qualitydf.shape[1]))
qualitydf.head()

Rows => 9797
Columns => 2


Unnamed: 0,Quality,BatchNumber
0,1,1
1,1,2
2,1,3
3,1,4
4,0,5


# Join Telemetry and Quality Data

In [13]:
finaldf = pd.merge(telemetrydf,qualitydf, on='BatchNumber')
print("Rows => {0}".format(finaldf.shape[0]))
print("Columns => {0}".format(finaldf.shape[1]))
finaldf.head()

Rows => 7711
Columns => 44


Unnamed: 0,BatchNumber,S1,S10,S11,S12,S13,S14,S15,S16,S17,...,S4,S40,S41,S5,S6,S7,S8,S9,SourceTimestamp,Quality
0,7423.0,4.998839,39.5,21.9,866.6161,2046.699,2089.468,943.1278,22.02698,25.83976,...,,9501.005,7221.264648,4.999668,5.050548,23.5739,27.6,36.1,2022-05-05 00:40:20.077,1
1,7424.0,4.999668,39.4,21.8,893.7124,2052.43,2069.875,930.0323,22.02698,25.67179,...,,9501.005,7213.02832,4.999668,5.056084,23.60346,27.7,35.9,2022-05-05 00:40:22.084,1
2,7425.0,4.999668,39.5,21.9,894.306,2060.232,2075.162,930.1899,22.36291,25.83976,...,,9501.005,7229.500977,4.999668,5.048703,23.57944,28.1,35.9,2022-05-05 00:40:24.091,1
3,7426.0,4.999668,39.6,21.9,876.8541,2046.037,2137.928,943.1278,22.19494,25.67179,...,,9501.005,7213.02832,4.999668,5.04778,23.60447,28.4,35.9,2022-05-05 00:40:26.098,1
4,7427.0,4.999668,39.6,22.1,895.3438,2033.45,2090.136,943.4471,22.02698,25.83976,...,,9501.005,7229.500977,4.999668,5.045012,23.64226,28.8,36.1,2022-05-05 00:40:28.105,1


# Upload joined process quality data to Datalake

In [18]:
localDataDirectoryName = "data"
datasetFileName = "/iiot_quality_labeled_data.csv"

data_folder = os.path.join(os.getcwd(), localDataDirectoryName)
os.makedirs(data_folder, exist_ok=True)

# Save dataframe to local file
finaldf.to_csv("{0}{1}".format(data_folder,datasetFileName),index=False)

# Upload file to Datalake
iiotmfgdatalakestore.upload_files(files=["{0}{1}".format(data_folder,datasetFileName)],overwrite=True)

Uploading an estimated of 1 files
Uploading /mnt/batch/tasks/shared/LS_root/mounts/clusters/jomitdev/code/Users/jovagh/data/iiot_quality_labeled_data.csv
Uploaded /mnt/batch/tasks/shared/LS_root/mounts/clusters/jomitdev/code/Users/jovagh/data/iiot_quality_labeled_data.csv, 1 files out of an estimated total of 1
Uploaded 1 files


$AZUREML_DATAREFERENCE_iiotmfgdatalakestore

# Register dataset in workspace

In [19]:
iiot_quality_dataset = Dataset.Tabular.from_delimited_files(path=[(iiotmfgdatalakestore, datasetFileName)])
iiot_quality_dataset = iiot_quality_dataset.register(workspace=ws, name="iiot_quality_labeled_data", description="raw telemetry data merged with quality data",create_new_version=True)

In [20]:
allds = Dataset.get_all(ws)
print(allds)

{ 'iiot_quality_labeled_data': DatasetRegistration(id='109d19cf-8980-4ef5-a0d5-3de63bfc6fa7', name='iiot_quality_labeled_data', version=1, description='raw telemetry data merged with quality data', tags={})}
