In [0]:
%run "./config/configurations-bw-kv-json"

In [0]:
#**********************************************************************************************************************
# RESET STREAMING (for testing and dev only)
# NOTE: uncomment this code block to clear streaming checkpoints and output folders 
#**********************************************************************************************************************

dbutils.fs.rm(checkpointPath, True)
dbutils.fs.rm(outputStagingPath, True)
dbutils.fs.rm(outputPivotPath, True)
dbutils.fs.rm(outputPathDir, True)

Out[2]: False

In [0]:
from pyspark.sql.functions import col, from_json, to_json, concat, lit, window, when, desc, concat_ws, regexp_replace, collect_set, transform, explode
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from pyspark import Row
from delta.tables import DeltaTable

#**********************************************************************************************************************
# HOUSE KEEPING
#**********************************************************************************************************************

# set the number of shuffle partitions to the total number of cores
spark.conf.set("spark.sql.shuffle.partitions", 8)

# load configuration and other variables through Notebook workflow
#dbutils.notebook.run("./configurations", 60)


#**********************************************************************************************************************
# DATA INGESTION: read stream from Kafka topic
#**********************************************************************************************************************

input_df = (
  spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", conf['bootstrap.servers'])
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(conf['sasl.username'], conf['sasl.password']))
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("subscribe", conf['topic'])
  .option("startingOffsets", conf['startingOffsets'])
  .option("failOnDataLoss", "false")
  .option("maxOffsetsPerTrigger", "1")
  .load()
  .select(col("value").cast("STRING"), col("key").cast("STRING"), "partition", "offset") 
)


#**********************************************************************************************************************
# TRANSFORM STEP: turn Kafka messages into sensor measurement dataframe
# NOTE: the ingress schema is obtained from 'configurations' notebook, but should be dynamically loaded in production
#
# ASSUMPTIONS:
# 1. schema of the Kafka json string is validated to match the ingress data structure 
# 2. header names do not contain periods (which Spark does not like)
#**********************************************************************************************************************

def clean_header(column_name):
  return regexp_replace(column_name, r'[\.\(\){} ]', '_')

formatted_df = (input_df
                .select(from_json("value", ingress_schema).alias("ingress_json"))
                .select(explode("ingress_json.rows").alias("records"))
                .select(transform(f.map_keys("records"), clean_header).alias("headers"), f.map_values("records").alias("data"))
               .select(f.arrays_zip("headers", "data").alias("mapped_fields"))
               .select(f.map_from_entries("mapped_fields").alias("mapped_fields"))
               .select(to_json("mapped_fields").alias("json_fields"))
               .select(from_json("json_fields", sensor_data_schema_ddl).alias("sensor_data_json"))  # MODIFY for dynamic loading of data schema info
               .select(col("sensor_data_json.*"))
              )

#formatted_df.printSchema()
#display(formatted_df)

In [0]:
#**********************************************************************************************************************
# TRANSFORM STEP: apply business logic mapping to incoming data columns
# NOTE: the logic mapping config json is grabbed from 'configurations' notebook in this prototype, but it should be dynamically loaded during production
#**********************************************************************************************************************

processed_df = (formatted_df
               #.na.drop()                        # DISCUSSION: logic to drop rows with null values / NOTE: need to optimize for this step
               .select(regexp_replace(concat_ws('_', *logic_mapping_json["line"]), r'_+', '_').alias("line"),
                       regexp_replace(concat_ws('_', *logic_mapping_json["station_config"]), r'_+', '_').alias("station"), 
                       regexp_replace(concat_ws('_', *logic_mapping_json["sensor_config"]), r'_+', '_').alias("sensor"),
                       regexp_replace(concat_ws('_', *logic_mapping_json["part_number"]), r'_+', '_').alias("part_number"),
                       regexp_replace(concat_ws('_', *logic_mapping_json["serial_number"]), r'_+', '_').alias("serial_number"),
                       col(logic_mapping_json["measured_time"]).alias("timestamp"),
                       *logic_mapping_json["measurement"]
                      )
                .filter("serial_number!=''")     # DISCUSSION: how to handle records with missing serial numbers
               .join(sensorDf.select("sensor", "sensor_uuid"), "sensor", "left")
              )


#**********************************************************************************************************************
# TRANSFORM STEP: to prepare for pivot transformation of the data stream, turn table values into key-value pairs
#**********************************************************************************************************************

pivot_json_df = (processed_df
                 .select(concat(lit('"serial_number":"'), col("serial_number"), lit('"')).alias("serial_number"), 
                         concat(lit('"part_number":"'), col("part_number"), lit('"')).alias("part_number"), 
                         concat(lit('"timestamp":"'), col("timestamp"), lit('"')).alias("timestamp"),
                         concat(lit('"station":"'), col("station"), lit('"')).alias("station"),
                         *[concat(lit('"'), col('sensor_uuid'), lit('_'), lit(m), lit('":"'), col(m), lit('"')).alias(m + '_json') 
                           for m in logic_mapping_json["measurement"]],
                         *[concat(col('sensor_uuid'), lit('_'), lit(m)).alias(m + '_column') 
                           for m in logic_mapping_json["measurement"]]
                        )
                 .select(concat_ws(", ", "part_number", "serial_number", "timestamp", "station", *[m+"_json" for m in logic_mapping_json["measurement"]])
                         .alias("json")
                        )
                 .select(concat(lit("{"), "json", lit("}")).alias("json"))
                )


#**********************************************************************************************************************
# TRANSFORM STEP: perform pivot transformation of the data stream, collapse records with same timestamp into a single record
#**********************************************************************************************************************

pivot_df = (pivot_json_df
            .select(from_json("json", pivot_schema_ddl).alias("schema_json"))
            .select(col("schema_json.*"))
            .withColumn("timestamp", f.to_timestamp("timestamp"))
            .groupBy("part_number","serial_number", "timestamp")
            .agg(collect_set('station').alias("station_list"), *[f.max(c).alias(c) for c in pivot_feature_cols])
           )


#**********************************************************************************************************************
# WRITE PIVOT TABLE TO DELTA LAKE
# The pivotted results are written to disk, this completes stage 1/2 of the cube transformation. 
# The results will be read back for subsequent steps because the following transformations can only be applied to non-streaming data
#**********************************************************************************************************************


def upsertPivotToDelta(df, batch_id):
  match_on_key = """s.part_number=u.part_number and s.serial_number=u.serial_number and s.timestamp=u.timestamp and s.station_list=u.station_list"""
  
  (DeltaTable
   .forPath(spark, outputPivotPath)
   .alias("s")
   .merge(df.alias("u"), match_on_key)
   .whenMatchedUpdateAll()
   .whenNotMatchedInsertAll()
   .execute()
  )
  
  
(spark
 .createDataFrame([], pivot_df.schema)
 .write
 .option("mergeSchema", "true")
 .format("delta")
 .mode("append")
 .save(outputPivotPath))

savePivotPrep = (pivot_df
             .writeStream
             .queryName("cube_pivot_to_delta_staging")
             .trigger(processingTime="1 seconds")
             .format("delta")
             .option("checkpointLocation", checkpointPath)
                 .option("mergeSchema", "true")
                 .foreachBatch(upsertPivotToDelta)
             .outputMode("update")
             .start(outputPivotPath)
            )

#display(input_df)

In [0]:
#display(pivot_df)

In [0]:
### For Dev and Testing, load back the table and view
# pivot_load_df = spark.read.format('delta').load(outputPivotPath)
# display(pivot_load_df)

In [0]:
### This starts stage 2 of the cube transformation. Some transformation steps in this stage are only applicable to non-streaming dataframes
#
# USAGE: for dev and testing, please run the above Stage 1 code for a prolonged period (> 5 min) to ensure adequate data are written to disk
#        before running the Stage 2 code below
#**********************************************************************************************************************
from pyspark.sql.functions import col, from_json, to_json, concat, lit, window, when, desc, concat_ws, regexp_replace, collect_set
import pyspark.sql.functions as f
from pyspark.sql.window import Window


#**********************************************************************************************************************
# READ STAGING RESULTS BACK INTO A STATIC DATAFRAME
#**********************************************************************************************************************

pivot_load_df = spark.read.format('delta').load(outputPivotPath)


#**********************************************************************************************************************
# DEFINE WINDOWS FOR AGGREGATION
#**********************************************************************************************************************

w1 = Window.partitionBy(["part_number", "serial_number"]).orderBy("timestamp")
w2 = Window.partitionBy(["part_number", "serial_number", "station"]).orderBy("timestamp")
w3 = Window.partitionBy(["part_number", "serial_number", "station", "consecutive_idx"]).orderBy(f.desc("timestamp"))
w4 = Window.partitionBy(["part_number", "serial_number"]).rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
w5 = Window.partitionBy(["part_number", "serial_number", "target_operation"]).orderBy("target_test_date")


#**********************************************************************************************************************
# TRANSFORM STEP: add derived columns to prepare for path extraction
#
# New column details:
# 1. op_order: order of operations for a part goes through the assembly line, starting from 1 for the first operation
# 2. op_rerun: # of times a part passes a station
# 3. consecutive_idx (TEMPORARY): consecutive visits of a part to the same station (i.e. no other station in between) have the same consecutive_idx
# 4. is_last_measure: if a part is consecutively measured at a station, only the last measurement have is_last_measure = 1
#**********************************************************************************************************************

run_order_df = (pivot_load_df
                .withColumn("station", f.element_at("station_list", 1))
                .withColumn("op_order", f.dense_rank().over(w1))
                .withColumn("op_rerun", f.dense_rank().over(w2))
                .withColumn("consecutive_idx", col("op_order")-col("op_rerun"))
                .withColumn("is_last_measure", f.dense_rank().over(w3))
                .select("part_number", "serial_number", "station", "timestamp", "op_order", "op_rerun", "is_last_measure", *pivot_feature_cols)
               )


#**********************************************************************************************************************
# TRANSFORM STEP: identify 'path'(s) for each unit
#
# OUTPUT:  a dataframe with one row for each 'path', identified by 
#          1) the part and serial number of the assembled unit (part_number, serial_number)
#          2) the target station and the corresponding test date (target_operation, target_test_date)
#          3) a pass (1)/fail (0) flag indicating whether the part passed or failed at the target station (pass_fail)
#          4) a run number indicating the number of attempts for the unit to reach the target station
#**********************************************************************************************************************

target_ops_df = (run_order_df.withColumn("max_op_order", f.max("op_order").over(w4))
                      .filter((col("station")==target) & (col("is_last_measure")==1))
                      .withColumn("max_target_rerun", f.max("op_rerun").over(w4))
                      .withColumn("pass_fail", f.when(col("op_order")==col("max_op_order"), 2)
                                                 .when(col("op_rerun")==col("max_target_rerun"), 1)
                                                 .when(col("op_rerun")<col("max_target_rerun"), 0)
                                                 .otherwise(-1)
                                 )
                      .select("part_number", "serial_number", "station", "timestamp", "pass_fail")
                      .withColumnRenamed("timestamp", "target_test_date")
                      .withColumnRenamed("station", "target_operation")
                      .withColumn("run_number", f.row_number().over(w5))
                     )


#**********************************************************************************************************************
# TRANSFORM STEP: associate the pivotted sensor measurements to their corresponding path
#**********************************************************************************************************************

target_ops_df.registerTempTable("target_ops")
run_order_df.registerTempTable("run_order")
join_target_with_ops_sql = f"""
select o.part_number as part_number, o.serial_number as serial_number, run_number, 
    pass_fail, station, timestamp, `{'`,`'.join(pivot_feature_cols)}`,
	dense_rank() over (
		partition by o.part_number, o.serial_number, target_test_date, station order by timestamp
	) as op_rerun_in_path
from target_ops o join run_order r 
on o.part_number=r.part_number and o.serial_number=r.serial_number and target_test_date>=timestamp 
"""
cube_run_df= spark.sql(join_target_with_ops_sql)


#**********************************************************************************************************************
# TRANSFORM STEP: drop columns with all null values
# #**********************************************************************************************************************

# counts_per_column = join_target_with_ops_df.agg(*[f.count(c).alias(c) for c in pivot_feature_cols]).first().asDict()
# null_cols = [c for c in counts_per_column if counts_per_column[c]==0]
# cube_run_df = join_target_with_ops_df.drop(*null_cols)


#**********************************************************************************************************************
# WRITE CUBE TO DISK
# Write cube dataframe to parquet on disk. This completes the cube transformation.
#**********************************************************************************************************************

saveCube = cube_run_df.write.parquet(outputPathDir)  

display(cube_run_df)

part_number,serial_number,run_number,pass_fail,station,timestamp,acc8bdf3-c872-4c77-bc1d-f8d3134aa77f_Ang__deg__,acc8bdf3-c872-4c77-bc1d-f8d3134aa77f_Torque,4fae6029-da79-4ac1-b03c-98f3e5f33b4a_Ang__deg__,4fae6029-da79-4ac1-b03c-98f3e5f33b4a_Torque,8515451f-c865-4cd5-9d55-279578065db5_Ang__deg__,8515451f-c865-4cd5-9d55-279578065db5_Torque,002f3aa6-fc1a-4030-b92f-d7ef66704225_Ang__deg__,002f3aa6-fc1a-4030-b92f-d7ef66704225_Torque,4ad3b2c5-5198-4c11-ac65-86aef08f3db3_Ang__deg__,4ad3b2c5-5198-4c11-ac65-86aef08f3db3_Torque,30963d81-db2e-467f-8a4c-28a37a15da8c_Ang__deg__,30963d81-db2e-467f-8a4c-28a37a15da8c_Torque,f269a193-9198-434b-bc4c-fb46f7cdb1f3_Ang__deg__,f269a193-9198-434b-bc4c-fb46f7cdb1f3_Torque,e6b8d50e-a042-4a6a-8d75-6519b3c02934_Ang__deg__,e6b8d50e-a042-4a6a-8d75-6519b3c02934_Torque,a65c94ea-4616-4ece-ac87-d919cc97d035_Ang__deg__,a65c94ea-4616-4ece-ac87-d919cc97d035_Torque,1f100034-a532-4a58-8b6c-f4444e074afe_Ang__deg__,1f100034-a532-4a58-8b6c-f4444e074afe_Torque,b8d46933-5220-4bad-8221-c54715a835fd_Ang__deg__,b8d46933-5220-4bad-8221-c54715a835fd_Torque,op_rerun_in_path
12639700098,482106351600004,1,2,L14_Core_03,2021-03-16T04:08:46.070+0000,,,,,,,,,,,,,,,,,90.0,143.05,,,,,1
12639700098,482106351600004,1,2,L14_Final_03,2021-03-16T04:15:22.973+0000,,,,,,,,,,,,,,,34.0,10.45,,,,,,,1
12639700098,482106351600004,1,2,L14_Final_03,2021-03-16T04:15:55.177+0000,,,,,,,,,,,,,,,49.0,10.48,,,,,,,2
12639700098,482106351600004,1,2,L14_Final_04,2021-03-16T04:17:06.337+0000,,,,,,,,,,,,,988.0,17.04,,,,,,,,,1
12639700098,482106351600004,1,2,L14_Final_04,2021-03-16T04:17:26.523+0000,,,,,,,,,,,56.0,13.69,,,,,,,,,,,2
12639700098,482106351600004,1,2,L14_Final_04,2021-03-16T04:17:31.570+0000,,,,,,,,,,,62.0,13.64,,,,,,,,,,,3
12639700098,482106351600004,1,2,L14_Final_04,2021-03-16T04:17:37.837+0000,,,,,,,,,,,80.0,13.73,,,,,,,,,,,4
12639700098,482106351600004,1,2,L14_Final_04,2021-03-16T04:17:45.057+0000,,,,,,,,,,,69.0,13.75,,,,,,,,,,,5
12639700098,482106452300007,1,2,L14_Final_04,2021-03-16T04:10:50.377+0000,,,,,,,,,,,,,992.0,17.08,,,,,,,,,1
12639700098,482106452300007,1,2,L14_Final_04,2021-03-16T04:11:11.290+0000,,,,,,,,,,,55.0,13.78,,,,,,,,,,,2
