# Reading Avro Blobs Into Parquet Data Sets

### Dependency Importing and Environment Variable Retrieval

In [65]:
import os
import string
import json
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import TimestampType, StringType
from pyspark.storagelevel import StorageLevel
from azure.storage.table import TableService

#### Read Environment Variables

In [66]:
#For development purposes only until ENV Variables get set
from pathlib import Path
env_config_file_location = (str(Path.home())+"/NotebookEnvironmentVariablesConfig.json")
config_file = Path(env_config_file_location)
if not config_file.is_file():
  env_config_file_location = ("/dbfs"+str(Path.home())+"/NotebookEnvironmentVariablesConfig.json")
f = open(env_config_file_location)
env_variables = json.load(f)["DataIngestion"]

STORAGE_ACCOUNT_SUFFIX = 'core.windows.net'
STORAGE_ACCOUNT_NAME = env_variables["STORAGE_ACCOUNT_NAME"]
STORAGE_ACCOUNT_KEY = env_variables["STORAGE_ACCOUNT_KEY"]
TELEMETRY_CONTAINER_NAME = env_variables["TELEMETRY_CONTAINER_NAME"]
LOG_TABLE_NAME = env_variables["LOG_TABLE_NAME"]
DATA_ROOT = env_variables["DATA_ROOT_FOLDER"]

### Setting up Drop Folder

In [67]:
data_dir = DATA_ROOT + '/data'

#TODO: Convert data_dir into env variable
% rm -rf $data_dir
% mkdir $data_dir $data_dir/logs

### Retrieving telemetry data (as spark dataframe)

In [68]:
wasbTelemetryUrl = "wasb://{0}@{1}.blob.{2}/*/*/*/*/*/*/*".format(TELEMETRY_CONTAINER_NAME, 
                                                                  STORAGE_ACCOUNT_NAME, 
                                                                  STORAGE_ACCOUNT_SUFFIX)

sc = SparkSession.builder.getOrCreate()
hc = sc._jsc.hadoopConfiguration()
hc.set("avro.mapred.ignore.inputs.without.extension", "false")
if STORAGE_ACCOUNT_KEY:
     hc.set("fs.azure.account.key.{}.blob.core.windows.net".format(STORAGE_ACCOUNT_NAME), STORAGE_ACCOUNT_KEY)
hc.set("fs.azure.account.key.{}.blob.core.windows.net"
    .format(STORAGE_ACCOUNT_NAME), STORAGE_ACCOUNT_KEY)
sql = SQLContext.getOrCreate(sc)
avroblob = sql.read.format("com.databricks.spark.avro").load(wasbTelemetryUrl)
avroblob.show()

+--------------------+----------+--------------------+--------------------+
|     EnqueuedTimeUtc|Properties|    SystemProperties|                Body|
+--------------------+----------+--------------------+--------------------+
|2018-06-28T19:51:...|     Map()|Map(connectionAut...|[7B 22 6D 61 63 6...|
|2018-06-28T19:51:...|     Map()|Map(connectionAut...|[7B 22 6D 61 63 6...|
|2018-06-28T19:51:...|     Map()|Map(connectionAut...|[7B 22 6D 61 63 6...|
|2018-06-28T19:51:...|     Map()|Map(connectionAut...|[7B 22 6D 61 63 6...|
|2018-06-28T19:51:...|     Map()|Map(connectionAut...|[7B 22 6D 61 63 6...|
|2018-06-28T19:51:...|     Map()|Map(connectionAut...|[7B 22 6D 61 63 6...|
|2018-06-28T19:51:...|     Map()|Map(connectionAut...|[7B 22 6D 61 63 6...|
|2018-06-28T19:51:...|     Map()|Map(connectionAut...|[7B 22 6D 61 63 6...|
|2018-06-28T19:51:...|     Map()|Map(connectionAut...|[7B 22 6D 61 63 6...|
|2018-06-28T19:51:...|     Map()|Map(connectionAut...|[7B 22 6D 61 63 6...|
|2018-06-28T

### Convert byteformatted "body" of raw blob data into JSON, then explode result into new Pyspark DataFrame

In [69]:
#Convert byteformat to string format in pyspark dataframe
from json import loads as Loads
column = avroblob['Body']
string_udf = udf(lambda x: x.decode("utf-8"))
avroblob=avroblob.withColumn("BodyString", string_udf(column))
avroblob.printSchema()

#Convert "body" into new DataFrame
telemetry_df = sql.read.json(avroblob.select("BodyString").rdd.map(lambda r: r.BodyString))
telemetry_df.show()

root
 |-- EnqueuedTimeUtc: string (nullable = true)
 |-- Properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- SystemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- Body: binary (nullable = true)
 |-- BodyString: string (nullable = true)

+----------------+-------------------+-----------+--------+-------+-------------+-----------+--------------------+---------+
|ambient_pressure|ambient_temperature|  machineID|pressure|  speed|speed_desired|temperature|           timestamp|vibration|
+----------------+-------------------+-----------+--------+-------+-------------+-----------+--------------------+---------+
|          101.09|              19.92|machine-009|  988.07|1185.79|         1000|      159.3|2018-06-28T19:51:...|     null|
|          100.91|              19.94|Machine-003| 1792.65|1180.34|         1000|     131.88|2018-06-28T19:51:...|     null|
|          100

In [70]:
#columns to retain: timestamp, ambient_pressure, ambient_temperature machineID, pressure, speed, 
#                   speed_desired, temperature
subsetted_df = telemetry_df.select(["timestamp", "ambient_pressure","ambient_temperature","machineID","pressure","speed","speed_desired","temperature"])


In [71]:
import datetime
e = '%Y-%m-%dT%H:%M:%S.%f'
reformatted_time_df = subsetted_df.withColumn("timestamp", F.col("timestamp").cast("timestamp"))

reformatted_time_df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- ambient_pressure: double (nullable = true)
 |-- ambient_temperature: double (nullable = true)
 |-- machineID: string (nullable = true)
 |-- pressure: double (nullable = true)
 |-- speed: double (nullable = true)
 |-- speed_desired: long (nullable = true)
 |-- temperature: double (nullable = true)



### Write dataframe to Parquet in system storage

In [72]:
reformatted_time_df.write.parquet(data_dir+"/telemetry", mode="overwrite")

## Get Logs

In [73]:
#table retrieval
table_service = TableService(account_name=STORAGE_ACCOUNT_NAME, account_key=STORAGE_ACCOUNT_KEY)
tblob = table_service.query_entities(LOG_TABLE_NAME)

### Process log table data into Pandas DataFrame

In [74]:
attributes = list()
for row in tblob:
    if (len(attributes) == 0):
        for attribute in row:
            attributes.append(attribute)
    break
log_df = pd.DataFrame(columns=attributes)
for row in tblob:
    if (row["Level"] != "DEBUG"):
        row_dict = {}    
        for attribute in row:
            if (attribute != "Timestamp"):
                row_dict[attribute] = row[attribute]
            else:
                newtime = row[attribute].replace(tzinfo=None)
                timeitem = pd.Timestamp(newtime, tz=None)
                row_dict[attribute] = timeitem
        log_df = log_df.append(row_dict, ignore_index=True)
log_df.head()

Unnamed: 0,Level,_Driver,Code,etag,Message,PartitionKey,RowKey,Timestamp
0,INFO,8621bba7-55e2-485f-b191-c7365d9f5847,,"W/""datetime'2018-06-28T18%3A09%3A07.0277527Z'""",Simulation started.,Machine-000,43a1504005d2467694a8f7b8b73888fb,2018-06-28 18:09:07.027752
1,INFO,c465fc84-4710-4ac3-ae77-5648037b3cfe,,"W/""datetime'2018-06-28T20%3A41%3A23.2824814Z'""",Simulation started.,Machine-001,105126d47a784dabb10064019c14cc63,2018-06-28 20:41:23.282481
2,INFO,acebdf79-34a8-4866-9399-a9bad449b725,,"W/""datetime'2018-06-28T20%3A31%3A26.0426356Z'""",Simulation started.,Machine-001,2446526a166143f89803ff1e7fe3ec8a,2018-06-28 20:31:26.042635
3,INFO,7ae0038c-3c64-47bb-8dba-6789980ef560,,"W/""datetime'2018-06-28T18%3A09%3A07.0027293Z'""",Simulation started.,Machine-001,6270bed934a244bab651ca74279271c7,2018-06-28 18:09:07.002729
4,INFO,9c8b76a2-d097-4301-ab3a-3f716bf9f1ac,,"W/""datetime'2018-06-28T20%3A04%3A24.180131Z'""",Simulation started.,Machine-001,73709a650c044fb998dc1b36a5295de3,2018-06-28 20:04:24.180131


### Number of Run-To-Failure Sequences

In [75]:
message_counts = log_df['Message'].value_counts()
if ('failure' in message_counts):
    print("Number of Run-to-Failures:", message_counts['failure'])
else:
    raise ValueError('Run to failure count is 0. Do not proceed.')

Number of Run-to-Failures: 2


### Select necessary attributes

In [76]:
log_df = log_df[["Timestamp", "Code", "Level", "PartitionKey"]].astype(str)
log_df.columns = ["timestamp", "code","level","machineID"]
log_df.index = log_df['timestamp']
log_df.head()

Unnamed: 0_level_0,timestamp,code,level,machineID
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2018-06-28 18:09:07.027752,2018-06-28 18:09:07.027752,,INFO,Machine-000
2018-06-28 20:41:23.282481,2018-06-28 20:41:23.282481,,INFO,Machine-001
2018-06-28 20:31:26.042635,2018-06-28 20:31:26.042635,,INFO,Machine-001
2018-06-28 18:09:07.002729,2018-06-28 18:09:07.002729,,INFO,Machine-001
2018-06-28 20:04:24.180131,2018-06-28 20:04:24.180131,,INFO,Machine-001


### Write logs to system storage

In [77]:
log_df = sqlContext.createDataFrame(log_df)
log_df.write.parquet(data_dir+"/logs", mode="overwrite")