In [0]:
from pyspark.sql.functions import col, from_json, from_unixtime, date_trunc, avg, stddev_pop, percentile_approx, row_number, expr, max as spark_max
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, LongType
from pyspark.sql.window import Window
import dlt
import yaml

# Import table configs: table_list, schema_list

In [0]:
# result = dbutils.notebook.run("./confluent_stream_test_configs", 60)
# table_list, schema_list = eval(result)

# Read from a YAML file
with open('./confluent_config.yaml', 'r') as file:
    data = yaml.safe_load(file)

table_dict = data['table_setup']
confluentBootstrapServers = data['confluent_setup']['server']
confluentApiKey = data['confluent_setup']['api_key']
confluentSecret = data['confluent_setup']['secret']

In [0]:
# confluentBootstrapServers = "pkc-wzjyjg.us-east-2.aws.confluent.cloud:9092"

# # NOTE: You have the right idea here to use secrets
# # confluentApiKey = dbutils.secrets.get(scope = "confluentTest", key = "api-key")
# # confluentSecret = dbutils.secrets.get(scope = "confluentTest", key = "secret")
# confluentApiKey = 'G6D3CLLKS6Y3AD5E'
# confluentSecret = '2q98GIqZQbrYpZKLwJ97yspZT9XjxmrStlgpluMX03FSSDDMThXEuI6KzuZUkZVE'

# # confluentTopicName = "logit-equipment-json"



In [0]:
# Schema Definitions

# sensor_bronze_schema = """
# timestamp LONG,
# sensor_E DOUBLE,
# sensor_C DOUBLE,
# sensor_B DOUBLE,
# sensor_A DOUBLE,
# sensor_F DOUBLE,
# sensor_D DOUBLE,
# energy DOUBLE,
# turbine_id STRING,
# UPDATE_TS BIGINT
# """

# turbine_schema = """
# country STRING,
# lat STRING,
# location STRING,
# long STRING,
# model STRING,
# state STRING,
# turbine_id STRING,
# UPDATE_TS BIGINT
# """

# ship_meta_schema = """
# homeport STRING,
# lat STRING,
# long STRING,
# model STRING,
# ship STRING,
# turbine_id STRING,
# UPDATE_TS BIGINT
# """

# historical_turbine_status_schema = """
# abnormal_sensor STRING,
# end_time LONG,
# start_time LONG,
# turbine_id STRING,
# UPDATE_TS BIGINT
# """

# parts_schema = """
# NSN STRING,
# height LONG,
# production_time LONG,
# sensors ARRAY<STRING>,
# stock_available LONG,
# stock_location STRING,
# type STRING,
# weight LONG,
# width LONG,
# UPDATE_TS BIGINT
# """

# # Dictionary of table names and info, info containes schema, confluent topic, and comment
# table_dict = {
#     # 'sensor_bronze': {
#     #   'schema': sensor_bronze_schema,
#     #   'confluentTopic': 'ADD TOPIC',
#     #   'comment': 'ADD COMMENT'
#     #   },
#     'parts': {
#       'schema': parts_schema,
#       'confluentTopic': 'parts',
#       'comment': 'Turbine parts from our manufacturing system',
#       'expectation': 'ADD EXPECTATION'
#       },
#     'ship_meta': {
#       'schema': ship_meta_schema,
#       'confluentTopic': 'ship-meta',
#       'comment': 'hip to turbine Meta_Data mapping'
#       },
#     'historical_turbine_status': {
#       'schema': historical_turbine_status_schema,
#       'confluentTopic': 'historical-turbine-status',
#       'comment': 'Turbine status to be used as label in our predictive maintenance model (to know which turbine is potentially faulty)'
#       },
#     'turbine': {
#       'schema': turbine_schema,
#       'confluentTopic': 'turbine',
#       'comment': 'Turbine details, with location, wind turbine model type etc'
#       }
# }

In [0]:
def stream_and_parse_confluent(table, info):
  bronze_name = f"{table}_bronze"
  silver_name = f"{table}_silver"

  @dlt.table(name=bronze_name, comment=f"Raw streamed data from Confuent for table {table}")
  def confluent_stream(topic=info['confluentTopic']):
    return (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", confluentBootstrapServers)
    .option("subscribe", topic)
    .option("startingOffsets", "earliest")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret))
    .option("kafka.ssl.endpoint.identification.algorithm", "https")
    .option("kafka.sasl.mechanism", "PLAIN")
    .load()
    .withColumn("value", col("value").cast("string"))
    )
    
  @dlt.table(name=silver_name, comment=info['comment'])  
  def confluent_stream_parsed(bronze_name=bronze_name, schema=info['schema']):
    return (
      spark
      .read
      .table(bronze_name)
      .withColumn("value_parsed", from_json("value", schema))
      .select(col("value_parsed.*"))
      .drop('UPDATE_TS')
    )

for table, info in table_dict.items():
  stream_and_parse_confluent(table, info)
  
  

In [0]:
catalog = spark.conf.get("catalog")
db = spark.conf.get("db")

@dlt.table(
  comment="Raw sensor data coming from json files ingested in incremental with Auto Loader: vibration, energy produced etc. 1 point every X sec per sensor.",
  table_properties={
    "quality": "bronze"
  }
)
@dlt.expect_or_drop("correct_schema", "_rescued_data IS NULL")
@dlt.expect_or_drop("correct_energy", "energy IS NOT NULL and energy > 0")
def sensor_bronze():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "parquet")
      .option("cloudFiles.inferColumnTypes", "true")
      .load(f"/Volumes/{catalog}/{db}/navy_raw_landing/incoming_data")
  )

In [0]:
@dlt.table(
  comment="Hourly sensor stats, used to describe signal and detect anomalies",
  table_properties={
    "quality": "silver"
  }
)
@dlt.expect_or_drop("turbine_id_valid", "turbine_id IS NOT NULL")
@dlt.expect_or_drop("timestamp_valid", "hourly_timestamp IS NOT NULL")
def sensor_hourly():
  return (
    dlt.read("sensor_bronze")
      .withColumn("hourly_timestamp", date_trunc('hour', from_unixtime(col("timestamp"))))
      .groupBy("hourly_timestamp", "turbine_id")
      .agg(
        avg("energy").alias("avg_energy"),
        stddev_pop("sensor_A").alias("std_sensor_A"),
        stddev_pop("sensor_B").alias("std_sensor_B"),
        stddev_pop("sensor_C").alias("std_sensor_C"),
        stddev_pop("sensor_D").alias("std_sensor_D"),
        stddev_pop("sensor_E").alias("std_sensor_E"),
        stddev_pop("sensor_F").alias("std_sensor_F"),
        percentile_approx("sensor_A", [0.1, 0.3, 0.6, 0.8, 0.95]).alias("percentiles_sensor_A"),
        percentile_approx("sensor_B", [0.1, 0.3, 0.6, 0.8, 0.95]).alias("percentiles_sensor_B"),
        percentile_approx("sensor_C", [0.1, 0.3, 0.6, 0.8, 0.95]).alias("percentiles_sensor_C"),
        percentile_approx("sensor_D", [0.1, 0.3, 0.6, 0.8, 0.95]).alias("percentiles_sensor_D"),
        percentile_approx("sensor_E", [0.1, 0.3, 0.6, 0.8, 0.95]).alias("percentiles_sensor_E"),
        percentile_approx("sensor_F", [0.1, 0.3, 0.6, 0.8, 0.95]).alias("percentiles_sensor_F")
      )
  )

In [0]:
@dlt.table(
  comment="Hourly sensor stats, used to describe signal and detect anomalies"
)
def turbine_training_dataset():
    sensor_hourly = dlt.read("sensor_hourly")
    turbine = dlt.read("turbine_silver")
    historical_turbine_status = dlt.read("historical_turbine_status_silver")
    
    return (
        sensor_hourly.alias("m")
        .join(turbine.alias("t"), "turbine_id")
        .join(
            historical_turbine_status.alias("s"),
            (sensor_hourly.turbine_id == historical_turbine_status.turbine_id) &
            (from_unixtime(historical_turbine_status.start_time) < sensor_hourly.hourly_timestamp) &
            (from_unixtime(historical_turbine_status.end_time) > sensor_hourly.hourly_timestamp)
        )
        .selectExpr("* except(m.turbine_id)")
    )
    #Removed t._rescued_data, s._rescued_data since stream refactor does not contain expectations
    # Original line .selectExpr("* except(t._rescued_data, s._rescued_data, m.turbine_id)")
    # ADD BACK IN IF USING EXPECTATIONS IN STREAM INGEST

In [0]:
@dlt.table(
  comment="Navy gas turbine last status based on model prediction"
)
def turbine_current_status():
    latest_metrics = (
        dlt.read("sensor_hourly")
        .join(dlt.read("turbine_silver"), on="turbine_id")
        .withColumn("row_number", row_number().over(Window.partitionBy("turbine_id", "hourly_timestamp").orderBy(col("hourly_timestamp").desc())))
    )
    
    return latest_metrics.filter("row_number = 1").selectExpr(
        "* EXCEPT(row_number)", 
        "predict_maintenance(hourly_timestamp, avg_energy, std_sensor_A, std_sensor_B, std_sensor_C, std_sensor_D, std_sensor_E, std_sensor_F, percentiles_sensor_A, percentiles_sensor_B, percentiles_sensor_C, percentiles_sensor_D, percentiles_sensor_E, percentiles_sensor_F, location, model, state) as prediction"
    )

In [0]:
@dlt.table
def fleet_current_status_gold():
    turbine_current_status = (
      dlt.read("turbine_current_status")
      .select(
        "turbine_id",
        "hourly_timestamp",
        "prediction"
        )
      )
    
    ship_meta = dlt.read("ship_meta_silver")
    sensor_maintenance = spark.table(f"{catalog}.{db}.sensor_maintenance")
    parts = dlt.read("parts_silver")
    max_hourly_timestamp = turbine_current_status.agg(spark_max("hourly_timestamp")).collect()[0][0]
    
    return (
      turbine_current_status
      .join(ship_meta, "turbine_id")
      .join(sensor_maintenance, turbine_current_status.prediction == sensor_maintenance.fault, "left")
      .filter(col("hourly_timestamp") == max_hourly_timestamp)
      .select(
        *[col(f"{c}") for c in turbine_current_status.columns],
        *[col(f"{c}") for c in ship_meta.columns if c != "turbine_id"],
        *[col(f"{c}") for c in sensor_maintenance.columns]
   )
)
    #     turbine_current_status.alias("t")
    #     .join(ship_meta.alias('s'), "turbine_id")
    #     .join(sensor_maintenance.alias('m'), turbine_current_status.prediction == sensor_maintenance.fault, "left")
    #     .join(parts.alias("p"), expr("array_contains(p.sensors, t.prediction)"), "left")
    #     .filter(col("hourly_timestamp") == max_hourly_timestamp)
    #     .selectExpr("* EXCEPT(s.turbine_id)")
    # )

# (turbine_current_status
#  .join(ship_meta, "turbine_id")
#  .join(sensor_maintenance, turbine_current_status.prediction == sensor_maintenance.fault, "left")
#  .filter(col("hourly_timestamp") == max_hourly_timestamp)
#  .select(
#    *[col(f"{c}") for c in ship_meta.columns if c != "turbine_id"],
#    *[col(f"{c}") for c in sensor_maintenance.columns]
#    )
# )