In [0]:
import pandas as pd
import numpy as np
import os
from pyspark.sql.types import StringType, FloatType, TimestampType, StructType, StructField

Read in data. In production, this would be handled by AutoLoader, or some other batch/streaming data ingestion alternative if Databricks is not the platform of choice.

In [0]:
raw_data = pd.read_parquet(f'file:{os.getcwd()}/data/sample.parquet')

If we were using AutoLoader, the following example would create a structured stream of the data, with the accompanied schema, and write it out to a raw table. After this we would be able to import the latest data from the "bronze," or raw table, to continue our ETL process. Normally, I would have this be in a separate notebook/script, for increased modularity, but since we are simply using the sample parquet file for now, I'll leave here out of convenience. Thus, the rest of the transformations seen below in Pandas would not necessarily apply in this structured streaming scenario.

In [0]:
# schema = StructType(
#     [
#         StructField('time', TimestampType(), True),
#         StructField('value', FloatType(), True),
#         StructField('field', StringType(), True),
#         StructField('robot_id', StringType(), True),
#         StructField('run_uuid', StringType(), True),
#         StructField('sensor_type', StringType(), True)
#     ]
# )

# raw_stream = (
#     spark.readStream.format("cloudFiles")
#     .schema(schema)
#     .option("cloudFiles.format", "parquet")
#     .load(f'file:{os.getcwd()}/data/sample.parquet')
# )

#write stream to delta table
# (
#     raw_stream.writeStream
#     .format("delta")
#     .outputMode("append")
#     .trigger(availableNow=True)
#     .option("mergeSchema", "true")
#     .option("checkpointLocation", f'file:{os.getcwd()}/data/')
#     .table('activity_db.raw_telemetry')
#     .awaitTermination()
# )

Check Data Types

In [0]:
raw_data.dtypes

Out[3]: time            object
value          float64
field           object
robot_id         int64
run_uuid       float64
sensor_type     object
dtype: object

Fix Data Types. Again, in the production, the schema enforcement would be handled by the

In [0]:
raw_data['time'] = raw_data['time'].astype('datetime64')
raw_data['robot_id'] = raw_data['robot_id'].astype('str')
raw_data['run_uuid'] = raw_data['run_uuid'].astype('str')

Feature Engineering -- robot_id<->encoder variable

In [0]:
raw_data['combined'] = raw_data['field'] + "_" + raw_data['robot_id']

Convert from Long to Wide

In [0]:
wide_data = raw_data.pivot_table(index='time', columns='combined', values='value')

Interpolate the Values Missing Between Timestamps

Given the fact that the encoder and load cell sensors run independently and aren't in sync, this is not the best method overall. We are sacrificing accuracy for expedience. In this case, matching timestamps between features would likely be the better route, but would require more time and resources to do well. There would still be a risk of losing data, depending on the precision of the time resolution measurements, as well as the latency between the two robots' sensors. Given the current constraints, interpolation was chosen because it is simpler and more straightforward.

In [0]:
intpol_data = wide_data.interpolate()

Recreate the time column to then be able to engineer a once-differenced time column for use in the velocity calculations

In [0]:
intpol_data['time_col'] = intpol_data.index

In [0]:
intpol_data['time_diff'] = intpol_data['time_col'].diff().dropna()

Because the time_diff column is in a Timedelta format, we need it as a numeric type to be able to do the calculations, so we convert it to the total duration in milliseconds

In [0]:
intpol_data['time_diff_ms'] = intpol_data['time_diff'].apply(lambda x: x.total_seconds()*1000)

Compute the velocity, and accelaration values by taking the change in the cartesian coordinates over time, as well as the change in velocity over time.

In [0]:
# Compute velocity values
intpol_data['vx_1'] = intpol_data['x_1'].diff() / intpol_data['time_diff_ms']
intpol_data['vy_1'] = intpol_data['y_1'].diff() / intpol_data['time_diff_ms']
intpol_data['vz_1'] = intpol_data['z_1'].diff() / intpol_data['time_diff_ms']
intpol_data['vx_2'] = intpol_data['x_2'].diff() / intpol_data['time_diff_ms']
intpol_data['vy_2'] = intpol_data['y_2'].diff() / intpol_data['time_diff_ms']
intpol_data['vz_2'] = intpol_data['z_2'].diff() / intpol_data['time_diff_ms']

In [0]:
# Compute acceleration values
intpol_data['ax_1'] = intpol_data['vx_1'].diff() / intpol_data['time_diff_ms']
intpol_data['ay_1'] = intpol_data['vy_1'].diff() / intpol_data['time_diff_ms']
intpol_data['az_1'] = intpol_data['vz_1'].diff() / intpol_data['time_diff_ms']
intpol_data['ax_2'] = intpol_data['vx_2'].diff() / intpol_data['time_diff_ms']
intpol_data['ay_2'] = intpol_data['vy_2'].diff() / intpol_data['time_diff_ms']
intpol_data['az_2'] = intpol_data['vz_2'].diff() / intpol_data['time_diff_ms']

Compute the total velocities, accelarations, and force by taking the root of the sum of the squares of each velocity, acceleration, and force vector.

In [0]:
# Compute total velocities and accelerations
intpol_data['v1'] = np.sqrt(intpol_data['vx_1']**2 + intpol_data['vy_1']**2 + intpol_data['vz_1']**2)
intpol_data['v2'] = np.sqrt(intpol_data['vx_2']**2 + intpol_data['vy_2']**2 + intpol_data['vz_2']**2)
intpol_data['a1'] = np.sqrt(intpol_data['ax_1']**2 + intpol_data['ay_1']**2 + intpol_data['az_1']**2)
intpol_data['a2'] = np.sqrt(intpol_data['ax_2']**2 + intpol_data['ay_2']**2 + intpol_data['az_2']**2)

# Same thing for total force
intpol_data['f1'] = np.sqrt(intpol_data['fx_1']**2 + intpol_data['fy_1']**2 + intpol_data['fz_1']**2)
intpol_data['f2'] = np.sqrt(intpol_data['fx_2']**2 + intpol_data['fy_2']**2 + intpol_data['fz_2']**2)

Compute the Runtime Statistics by run_uuid by first grouping the data, then calculating the min/max for start/stop time and corresponding total runtime.

In [0]:
grouped_df = raw_data.groupby('run_uuid')
stats_df_start = grouped_df.agg({'time': 'min'})
stats_df_stop = grouped_df.agg({'time': 'max'})
stats_df_stop['total_runtime'] = stats_df_stop['time'] - stats_df_start['time']
stats_df_distance = grouped_df.agg({'value':'sum'})

Merging all the dataframes together and renaming their columns for the final runtime statistics dataframe

In [0]:
stats_df_both = stats_df_start.merge(stats_df_stop, on='run_uuid')
final_stats_df = stats_df_both.merge(stats_df_distance, on = 'run_uuid')
final_stats_df.rename(columns = {'time_x':'start_time', 'time_y':'stop_time', 'value':'total_distance'})

Unnamed: 0_level_0,start_time,stop_time,total_runtime,total_distance
run_uuid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1.2405186538561671e+19,2022-11-23 20:40:00.003,2022-11-23 20:41:17.630,0 days 00:01:17.627000,-682965.0
6.176976534744076e+18,2022-11-23 20:40:00.007,2022-11-23 20:49:59.999,0 days 00:09:59.992000,185882100.0
7.58229308099147e+18,2022-11-23 20:40:00.001,2022-11-23 20:49:59.998,0 days 00:09:59.997000,240897100.0
8.910095844186657e+18,2022-11-23 20:40:00.005,2022-11-23 20:49:59.999,0 days 00:09:59.994000,120028300.0


Save the data into one database, but as 2 separate tables.

In [0]:
spark_stats_df = spark.createDataFrame(final_stats_df)
spark_stats_df.write.mode('append').saveAsTable('activity_db.runtime_stats')

In [0]:
spark_intpol_df = spark.createDataFrame(intpol_data)
spark_intpol_df.write.mode('append').saveAsTable('activity_db.sensor_telemetry')