#### Creating Maintenance and Monitor Health

In [0]:
from pyspark.sql import functions as F

silverMaintenance = spark.read.table("gasmonitordata.silver.monitormaintenance")
silverInfo = spark.read.table("gasmonitordata.silver.monitorinformation")

In [0]:
from pyspark.sql.window import Window
w = Window.partitionBy("monitorNum").orderBy("timeStamp")

maintenanceWithLag = (
    silverMaintenance
        .withColumn("prevDays", F.lag("daysUntilCalibration").over(w))
        .withColumn("prevBattery", F.lag("batteryHealth").over(w))
        .withColumn("readingDate", F.to_date("timeStamp"))
)

In [0]:
maintenanceStats = maintenanceWithLag.alias('m').join(silverInfo.alias('i'), silverMaintenance.monitorNum == silverInfo.monitorNum). \
    withColumn('readingDate', F.to_date(F.col('m.timeStamp'))). \
    groupBy('m.monitorNum', 'i.groupNum', 'readingDate'). \
    agg(
        F.sum(F.when(F.col("m.technicalFailure") == 'YES - MAJOR', 1).otherwise(0)).alias('numMajorFailures'),
        F.sum(F.when(F.col("m.technicalFailure") == 'YES - MINOR', 1).otherwise(0)).alias('numMinorFailures'),
        F.sum(F.when(F.col("m.technicalFailure") == 'NO', 1).otherwise(0)).alias('numOperational'),
        F.sum(F.when((F.col("m.batteryHealth") == 0) & (F.col("m.prevBattery") > 0), 1).otherwise(0)).alias('numBatteryDepletions'),
        F.sum(F.when((F.col("m.daysUntilCalibration") == 0) & (F.col("m.prevDays") > 0), 1).otherwise(0)).alias("numCalibrations")). \
    orderBy('m.monitorNum', 'i.groupNum', 'readingDate'). \
    select('m.monitorNum', 'i.groupNum','readingDate', 'numMajorFailures', 'numMinorFailures', 'numOperational', 'numBatteryDepletions', 'numCalibrations')

In [0]:
print(maintenanceStats.head())

In [0]:
maintenanceStats.write.mode('overwrite').option("overwriteSchema", "true").saveAsTable('gasmonitordata.gold.maintenancestats')