In [57]:
input_table_location = '/datalake/normalized/streaming_device'
test_output_path = '/test-ouptut/verify-delta/output.txt'
assert_events_per_second = 900
assert_latency_milliseconds = 15000
assert_duplicate_fraction = 0
assert_out_of_sequence_fraction = 0

StatementMeta(sasesssparkpool, 454, 23, Finished, Available)

In [58]:
from pyspark.sql.functions import *
from pyspark.sql import Window

StatementMeta(sasesssparkpool, 454, 24, Finished, Available)

In [59]:
input_data = spark.read.format('delta').load(input_table_location).cache()
assertions_failed = []

StatementMeta(sasesssparkpool, 454, 25, Finished, Available)

In [60]:
stored_by_minute_stats = input_data.withColumn("storedAtMinute", (floor(unix_timestamp(col('storedAt')) / 60) * 60).cast("timestamp")) \
    .withColumn("latency", col('storedAt').cast("double") - col('enqueuedAt').cast("double")) \
    .groupBy(col('storedAtMinute')) \
    .agg(
      (count(col('eventId'))/lit(60)).alias("events_per_second"),
      avg(col('latency')).alias("avg_latency_s")
    ) \
    .orderBy(col('storedAtMinute')) \
    .cache()

StatementMeta(sasesssparkpool, 454, 26, Finished, Available)

In [61]:
stored_stats = stored_by_minute_stats.agg(
  count(col('storedAtMinute')).alias("minutesWithData"),
  max(col('events_per_second')).alias("maxThroughputEventsPerSecond"),
  min(col('avg_latency_s')).alias("minLatencySeconds")
).cache()

stored_stats_row = stored_stats.collect()[0]
max_throughput_events_per_second = stored_stats_row['maxThroughputEventsPerSecond']
min_latency_seconds = stored_stats_row['minLatencySeconds']

if assert_events_per_second and assert_events_per_second >= 0:
  expected = assert_events_per_second
  actual = max_throughput_events_per_second
  if (actual or (actual < expected)):
    assertions_failed.append(f"min throughput per second: expected min {expected}, got {actual}")

if assert_latency_milliseconds and assert_latency_milliseconds >= 0:
  expected = assert_latency_milliseconds
  actual = min_latency_seconds
  if (actual or ((actual * 1000) > expected)):
    assertions_failed.append(f"max latency in milliseconds: expected max {expected} milliseconds, got {actual} seconds")

StatementMeta(sasesssparkpool, 454, 27, Finished, Available)

In [62]:
duplicates = input_data \
    .groupBy(col('eventId')) \
    .agg(count(col('eventId')).alias("count")) \
    .where(col('count') > 1) \
    .count()

duplicate_fraction = float(duplicates)/float(input_data.count())

if assert_duplicate_fraction and assert_duplicate_fraction >= 0:
  expected = assert_duplicate_fraction
  if duplicate_fraction > expected:
    assertions_failed.append(f"fraction of duplicate events: expected max {expected}, got {duplicate_fraction}")

StatementMeta(sasesssparkpool, 454, 28, Finished, Available)

In [63]:
time_sequence = Window.partitionBy("deviceId").orderBy(col('storedAt'), col('deviceSequenceNumber'))

out_of_sequence = input_data \
  .withColumn("deviceSequenceNumberDelta", col('deviceSequenceNumber') - lag(col('deviceSequenceNumber'), 1).over(time_sequence)) \
  .filter(col('deviceSequenceNumberDelta') > 1) \
  .count()

out_of_sequence_fraction = float(out_of_sequence) / float(input_data.count())

if assert_out_of_sequence_fraction and assert_out_of_sequence_fraction >= 0:
  expected = assert_outofsequence_fraction
  if (out_of_sequence_fraction > expected):
    assertionsFailed.append(f"fraction of out-of-sequence events: expected max {expected}, got {out_of_sequence_fraction}")

StatementMeta(sasesssparkpool, 454, 29, Finished, Available)

In [64]:
device_sequence = Window.partitionBy("deviceId").orderBy(col('deviceSequenceNumber'))
device_partition = Window.partitionBy("deviceId")

missing_events = input_data \
  .withColumn("orderInDevice", row_number().over(device_sequence)) \
  .withColumn("countForDevice", count("*").over(device_partition)) \
  .withColumn("fractionInOrder", col('orderInDevice').cast("double") / col('countForDevice')) \
  .filter((col('fractionInOrder') >= lit(0.1)) & (col('fractionInOrder') <= lit(0.9))) \
  .withColumn("deviceSequenceNumberDelta", col('deviceSequenceNumber') - lag(col('deviceSequenceNumber'), 1).over(device_sequence)) \
  .filter(col('deviceSequenceNumberDelta') > lit(1)) \
  .count()

missing_fraction = float(missing_events) / float(input_data.count())

assert_missing_fraction = 0.0
expected = assert_missing_fraction
if (missing_fraction > expected):
    assertionsFailed.append(f"fraction of missing events: expected max {expected}, got {missing_fraction}")

StatementMeta(sasesssparkpool, 454, 30, Finished, Available)

In [67]:
assertions_failed_str = '\n'.join(assertions_failed)
mssparkutils.fs.put(test_output_path, assertions_failed_str, True)

StatementMeta(sasesssparkpool, 454, 33, Finished, Available)

True