In [0]:
%pip install mlflow cloudpickle databricks-automl-runtime holidays lz4 psutil category-encoders scikit-learn


In [0]:
# Import DLT and src/db_device_pipeline
import dlt
# import sys
# sys.path.append(spark.conf.get("bundle.sourcePath", "."))
# from pyspark.sql.functions import expr
# from db_device_pipeline import main

In [0]:

try:
  volume_location = spark.conf.get("pipeline.volumeLocation")
except Exception as e:
  volume_location = None
if volume_location is None or volume_location == "":
  volume_location = "/Volumes/alex_young/device_demo"

print(volume_location)




In [0]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, TimestampType
from pyspark.sql import functions as SF

@dlt.expect("time_greater_than_2024", "time > '2024-01-01'")
@dlt.expect("expect_value_less_that_55", "value <= 55")
@dlt.expect_or_drop("null device_id", "device_id IS NOT NULL")
@dlt.expect_or_drop("null attribute", "attribute IS NOT NULL")
@dlt.expect_or_drop('null value', "value IS NOT NULL")
@dlt.table(
  name="alex_young.device_demo.raw_device_data",
  comment="raw table for device data",
  schema = StructType([
    StructField('attribute', StringType(), False),
    StructField('device_id', LongType(), False),
    StructField('endpoint_id', LongType(), False),
    StructField('time', TimestampType(), True),
    StructField('unixtime', LongType(), True),
    StructField('value', DoubleType(), False),
    StructField('_rescued_data', StringType(), True)
  ]),
  cluster_by=['device_id', 'time']
)
def get_raw_device_data():
  raw_data = (spark.readStream
    .format("cloudFiles")      
    .option("cloudFiles.format", "json")
    .option('multiline', 'true')
    .load(volume_location)
    .withColumn("time", SF.col("time").cast("timestamp"))
    .withColumn("device_id", SF.col("device_id").cast("long"))
    .withColumn("endpoint_id", SF.col("endpoint_id").cast("long"))
    .withColumn("value", SF.col("value").cast("double"))
    .withColumn("unixtime", SF.col("unixtime").cast("long"))

  )
  return raw_data




In [0]:
@dlt.view(
  name="device_dimensions_view",
  comment="raw view on top of device data",
)
def get_device_dimensions_view():
  return (spark.read.table('alex_young.device_demo.device_dimensions'))


In [0]:
import mlflow.pyfunc
predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri=f"models:/alex_young.device_demo.device_error_voltage_and_flow@main",env_manager="local")
input_cols=['attribute','value']

@dlt.table(
  name="alex_young.device_demo.device_enriched",
  comment='dimension_data_added_to_device_stream',
  cluster_by=['device_id', 'time']
)
def get_device_combined():
  df_dim = spark.read.table('device_dimensions_view')
  df_raw = spark.readStream.table('alex_young.device_demo.raw_device_data')
  df_raw_scored = (
    df_raw.drop('endpoint_id')
    .withColumn('predicted_error', predict_udf(SF.struct(input_cols)))
    .withColumn('predicted_error', SF.col('predicted_error').getItem(0))
  )

  return (df_raw_scored.join(df_dim, on='device_id', how='left'))

In [0]:
@dlt.view(
  name="device_flow",
  comment="raw view on top of device data",
)
def get_device_flows():
  return  (spark.read.table('alex_young.device_demo.device_enriched')
      .filter(SF.col('attribute')==SF.lit('Flow'))
      )

In [0]:
@dlt.view(
  name="device_voltage",
  comment="raw view on top of device data",
)
def get_device_voltage():
  return  (spark.read.table('alex_young.device_demo.device_enriched')
      .filter(SF.col('attribute')==SF.lit('Voltage'))
      )

In [0]:
@dlt.table(
  name="alex_young.device_demo.device_agg_by_time",
  comment='aggregate_table_avg_value_by_10_mins',
  cluster_by=['device_id','start_datetime']
)
def get_device_aggregates():
  return (
    spark.readStream.table("alex_young.device_demo.device_enriched")
      .select('device_id', 'time','attribute', 'value', 'location', 'site','preditected_error','longitude','latitude')
      .withWatermark("time", "1 hour")
      .groupBy(SF.col('device_id'), SF.col('attribute'), SF.window("time", "10 minutes").alias("time"))
      .agg(SF.avg('value').alias('avg_value'),
           SF.min('value').alias('min_value'),
           SF.max('value').alias('max_value'),
           SF.first('location').alias('location'),
           SF.first('longitude').alias('longitude'),
           SF.first('latitude').alias('latitude'),
           SF.first('site').alias('site'),
           SF.sum('predicted_error').alias('preditected_erros')
      )
      .withColumn('start_datetime', SF.col('time.start'))
      .withColumn('end_datetime', SF.col('time.end'))
      .select('device_id','attribute', 'location', 'site','avg_value','min_value','max_value','start_datetime','end_datetime')

  )