# Data Ingestor for IoT Telemetry and Failure Data

This notebook ingests and preprocesses IoT device telemetry data in the Azure blob service and IoT device failure logs in Azure storage table to use in Feature Engineering and Model Training.

This imitates a production scenario where telemetry is collected over a period of time whereas failure/maintenance logs are manually populated with new data.


### Dependency Importing and Environment Variable Retrieval

In [9]:
pip install azure-cosmosdb-table

Collecting azure-cosmosdb-table
  Downloading azure_cosmosdb_table-1.0.6-py2.py3-none-any.whl (125 kB)
[K     |████████████████████████████████| 125 kB 6.7 MB/s eta 0:00:01
Collecting azure-cosmosdb-nspkg>=2.0.0
  Downloading azure_cosmosdb_nspkg-2.0.2-py2.py3-none-any.whl (2.9 kB)
Installing collected packages: azure-cosmosdb-nspkg, azure-cosmosdb-table
Successfully installed azure-cosmosdb-nspkg-2.0.2 azure-cosmosdb-table-1.0.6
Note: you may need to restart the kernel to use updated packages.


In [8]:
import os
import string
import json
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType, StringType
from pyspark.storagelevel import StorageLevel
from azure.cosmosdb.table.tableservice import TableService

#### Read Environment Variables

In [10]:
STORAGE_ACCOUNT_SUFFIX = 'core.windows.net'
STORAGE_ACCOUNT_NAME = 'ws1237100807342'
STORAGE_ACCOUNT_KEY = 'lw3oUodMT+6sVFrHY02WgmcktJhH1YpbyHjU4zOmgpckFdtoWbJPWWkwyVi8MHbzMzdFDaHSJNrfoc1cOFyqJA=='
TELEMETRY_CONTAINER_NAME = 'azureml-blobstore-0ff7b38e-1ad8-4df4-ba02-5135c243b83f'
LOG_TABLE_NAME = 'logs'

from pathlib import Path
DATA_ROOT = os.path.join(os.getcwd(), "data")

### Setting up Ingested Data Drop Folder
This location is where the prepared ingested IoT data is stored for further use in the notebooks to follow.

In [3]:
data_dir = DATA_ROOT + '/data'

#TODO: Convert data_dir into env variable
% rm -rf $data_dir
% mkdir $data_dir $data_dir/logs

UsageError: Line magic function `%` not found.


### Retrieving telemetry data
The raw data retrieved from the PdM solution storage contains all the IoT telemetry data in the "Body" column of the dataframe in a byte array. It needs to be deserialized into a string representing JSON, then expanded into a separate dataframe to be used by FeatureEngineering and ModelTraining.

In [6]:
wasbTelemetryUrl = "wasb://{0}@{1}.blob.{2}/*/*/*/*/*/*/*".format(TELEMETRY_CONTAINER_NAME, 
                                                                  STORAGE_ACCOUNT_NAME, 
                                                                  STORAGE_ACCOUNT_SUFFIX)

print(wasbTelemetryUrl)
sc = SparkSession.builder.getOrCreate()
hc = sc._jsc.hadoopConfiguration()
hc.set("avro.mapred.ignore.inputs.without.extension", "false")
if STORAGE_ACCOUNT_KEY:
     hc.set("fs.azure.account.key.{}.blob.core.windows.net".format(STORAGE_ACCOUNT_NAME), STORAGE_ACCOUNT_KEY)
hc.set("fs.azure.account.key.{}.blob.core.windows.net"
    .format(STORAGE_ACCOUNT_NAME), STORAGE_ACCOUNT_KEY)
sql = SQLContext.getOrCreate(sc)
avroblob = sql.read.format("com.databricks.spark.avro").load(wasbTelemetryUrl)
avroblob.show()

wasb://azureml-blobstore-0ff7b38e-1ad8-4df4-ba02-5135c243b83f@ws1237100807342.blob.core.windows.net/*/*/*/*/*/*/*


NameError: name 'SparkSession' is not defined

### Convert byteformatted "body" of raw blob data into JSON, explode result into new Pyspark DataFrame
The output here shows the schema of the telemetry data as well as a preview of the telemetry data with the specific columns necessary for FeatureEngineering and ModelTraining

In [4]:
#Convert byteformat to string format in pyspark dataframe
from json import loads as Loads
column = avroblob['Body']
string_udf = udf(lambda x: x.decode("utf-8"))
avroblob=avroblob.withColumn("BodyString", string_udf(column))
avroblob.printSchema()

#Convert "body" into new DataFrame
telemetry_df = sql.read.json(avroblob.select("BodyString").rdd.map(lambda r: r.BodyString))
subsetted_df = telemetry_df.select(["timestamp", "ambient_pressure","ambient_temperature","machineID","pressure","speed","speed_desired","temperature"])
subsetted_df.show()

NameError: name 'avroblob' is not defined

In [None]:
import datetime
e = '%Y-%m-%dT%H:%M:%S.%f'
reformatted_time_df = subsetted_df.withColumn("timestamp", F.col("timestamp").cast("timestamp"))

reformatted_time_df.printSchema()

### Write dataframe to Parquet format

In [None]:
reformatted_time_df.write.parquet(data_dir+"/telemetry", mode="overwrite")

## Get Logs

In [13]:
#table retrieval
table_service = TableService(account_name=STORAGE_ACCOUNT_NAME, account_key=STORAGE_ACCOUNT_KEY)
tblob = table_service.query_entities(LOG_TABLE_NAME)

### Process log table data into Pandas DataFrame

In [14]:
attributes = list()
for row in tblob:
    if (len(attributes) == 0):
        for attribute in row:
            attributes.append(attribute)
    break
log_df = pd.DataFrame(columns=attributes)
for row in tblob:
    if (row["Level"] != "DEBUG"):
        row_dict = {}    
        for attribute in row:
            if (attribute != "Timestamp"):
                row_dict[attribute] = row[attribute]
            else:
                newtime = row[attribute].replace(tzinfo=None)
                timeitem = pd.Timestamp(newtime, tz=None)
                row_dict[attribute] = timeitem
        log_df = log_df.append(row_dict, ignore_index=True)
log_df.head()

### Number of Run-To-Failure Sequences
The number of Run-To-Failure sequences is especially important for FeatureEngineering and ModelTraining as these log instances are used to train the predictive model. If there are no failure sequences logged, then training a predictive model is useless as the model has no reference for what a situation for failure may look like. Do not proceed with the notebooks if there are no Run-To-Failure sequences logged.

In [15]:
message_counts = log_df['Message'].value_counts()
if ('failure' in message_counts):
    print("Number of Run-to-Failures:", message_counts['failure'])
else:
    raise ValueError('Run to failure count is 0. Do not proceed.')

KeyError: 'Message'

### Select necessary attributes

In [None]:
log_df = log_df[["Timestamp", "Code", "Level", "PartitionKey"]].astype(str)
log_df.columns = ["timestamp", "code","level","machineID"]
log_df.index = log_df['timestamp']
log_df.head()

### Write logs to system storage

In [None]:
log_df = sqlContext.createDataFrame(log_df)
log_df.write.parquet(data_dir+"/logs", mode="overwrite")