In [1]:

from pyspark.sql.functions import col, expr, when
from pyspark.sql.types import *

from pyspark.sql.functions import *

# Stopped = 0 
# Running = 1
# Communication failure = 2
# Standby = 3
# Missed = -1
#
#
def run_status_code(status):
    if status:
      if "0" in status:
        return 0
      elif "1" in status:
        return 1
      elif "2" in status:
        return 2
      elif "Standby" in status:
        return 3
      else:
        return -2
    else:
      return -1

run_status_code_udf = udf(run_status_code, IntegerType())

sd_status_code_norm_udf = udf(sd_status_code_norm, StringType())


# select where status ~ [contains 0 and not manual and not started and not NO FAULT PRESENT, etc.] 
# review "HI STG1 SUCT PRS", "LO SUCT PRS", "LOCAL COMM FAIL", "SPARE D117 SD","SPARE DI-03 SD","SPARE DI-13 SD","SPARE SHUTDOWN","RESET TIMER"
failure_status = set(["CAT FAIL TO STOP","CAT FAIL TO STRT","COOLER #1 VIBR","COOLER #2 VIBR","COOLER #3 VIBR","EICS SD","EICS SHUTDOWN","EMERGENCY SD","ENG OVERSPEED","ENG PANEL SD","ENG UNDERSPEED","ENGINE OVERSPD","ENGINE SHUTDOWN","ENGINE UNDERSPD","FAILURE TO CRANK","HI  SUCT PRS","HI 2 STG SCB LVL","HI CMP VIB","HI COMP OIL TMP","HI COMPOR VIB","HI COMPRESS  OIL","HI COOLER VIB","HI COOLER VIBR","HI CYL2 DISC TP","HI DISC PRESS","HI DISC PRS","HI DISC TP CYL 1","HI DISCH CYL 2 T","HI DISCH CYL 4 T","HI DISCHARG  PRS","HI ENG OIL TMP","HI ENG VIBR","HI ENG WTR TEMP","HI ENGINE VIB","HI ENGINE VIBR","HI FUEL SCRB LVL","HI FUEL SCRUB  L","HI INTERSTG1  PR","HI INTERSTG2 PRS","HI STAGE1 PRS","HI STAGE2 PRS","HI STG1 DISC PRS","HI STG1 DISC TMP","HI STG1 SCBR LVL","HI STG1 SCRB LVL","HI STG1 SCRUB  L","HI STG1 SUCT PRS","HI STG2 DIS PRS","HI STG2 DISC TMP","HI STG2 SCBR LVL","HI STG2 SCRB LVL","HI STG2 SCRUB  L","HI STG3 DIS PRS","HI STG3 DISC IMP","HI STG3 DISC PRS","HI STG3 SCBR LVL","HI SUCT PRESS","HI SUCT SCRB LVL","HI SUCT SCRUB  L","HI SUCT SD","HI SUCTION PRS","HI SUCTION TMP","HI TANK LVL","LB LUBE NO FLOW","LB LUBE NOFLOW","LO 1ST STG PRESS","LO 2ND STG PRESS","LO AUX WATER LVL","LO AUX WTR LVL","LO CMP OIL PRESS","LO COMP OIL LVL","LO COMP OIL PRS","LO COMPRESS OIL","LO DISC PRESS","LO DISC PRS","LO DISCHARG PRS","LO ENG COOL LVL","LO ENG JACKETWTR","LO ENG OIL LVL","LO ENG OIL PRESS","LO ENG WTR LVL","LO INTERSTG1 PRS","LO INTERSTG2 PRS","LO STAGE1 PRS","LO STAGE2 PRS","LO STG1 DIS PRS","LO STG1 DISC PRS","LO STG2 DIS PRS","LO STG2 DISC PRS","LO STG3 DIS PRS","LO STG3 DISC PRS","LO SUC PRS SD","LO SUCT PRESS","LO SUCT PRS","LO SUCTION PRS","LOSS OF RPM","LOST CMP OIL XMT","LOST COMP OIL PR","LOST DISC XMTR","LOST RPM SIGNAL","LOST RPM/STALL","LOST STG2 PRS XM","LOST STG2 XMTR","LUBE NO FLOW","OVERSPEED","PANEL ESD","RB LUBE NO FLOW","SPARE D117 SD","SPARE DI-03 SD","SPARE DI-13 SD","SPARE SHUTDOWN","UNDERSPEED","UNEXPECTED START"])
  
def is_failure_status(status_code, run_status):
  if run_status and "0" in run_status and str(status_code).strip().upper() in failure_status:
    return 1
  else:
    return 0

def sd_status_code_norm(value):
    return str(value).strip().upper()
  

# if we assume that my_func returns a string
is_failure_status_udf = udf(is_failure_status, IntegerType())

# File location and type
file_location = "/mnt/compressors"
file_type = "parquet"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format("parquet").load(file_location)
# consider to drop Downtime_Hrs_Yest Engine_Oil_Pressure - too many null values
df = df.toDF("Id","Asset_Name","Local_Timestamp","UTC_Milliseconds","Compressor_Oil_Pressure","Compressor_Oil_Temp","Compressor_Stages","Cylinder_1_Discharge_Temp","Cylinder_2_Discharge_Temp","Cylinder_3_Discharge_Temp","Cylinder_4_Discharge_Temp","Downtime_Hrs_Yest","Engine_Oil_Pressure","Engine_Oil_Temp","Facility_Desc","Facility_ID","Fuel_Pressure","Gas_Flow_Rate","Gas_Flow_Rate_RAW","Horsepower","Last_Successful_Comm_Time","Max_Discharge_Pressure","Max_Gas_Flowrate","Max_RPMs","Max_Suction_Pressure","Pct_Successful_Msgs_Today","RPM","Run_Status","Runtime_Hrs","SD_Status_Code","Stage_1_Discharge_Pressure","Stage_2_Discharge_Pressure","Stage_3_Discharge_Pressure","Successful_Msgs_Today","Suction_Pressure","Suction_Temp","TOW_Comp_Name","Unit_Size").drop("Facility_Desc","Asset_Name", "Local_Timestamp").cache()

labled_df = df.withColumn("is_failed", is_failure_status_udf(df["SD_Status_code"], df["Run_Status"]) ).withColumn('Run_Status_Code', run_status_code_udf(df['Run_Status'])).cache()

display(labled_df)

#input_df = df.withColumn('Run_Status_Code', run_status_code_udf(df['Run_Status'])).drop('Facility_Desc').cache()



Id,Asset_Name,Local_Timestamp,UTC_Milliseconds,Compressor_Oil_Pressure,Compressor_Oil_Temp,Compressor_Stages,Cylinder_1_Discharge_Temp,Cylinder_2_Discharge_Temp,Cylinder_3_Discharge_Temp,Cylinder_4_Discharge_Temp,Downtime_Hrs_Yest,Engine_Oil_Pressure,Engine_Oil_Temp,Facility_ID,Fuel_Pressure,Gas_Flow_Rate,Gas_Flow_Rate_RAW,Horsepower,Last_Successful_Comm_Time,Max_Discharge_Pressure,Max_Gas_Flowrate,Max_RPMs,Max_Suction_Pressure,Pct_Successful_Msgs_Today,RPM,Run_Status,Runtime_Hrs,SD_Status_Code,Stage_1_Discharge_Pressure,Stage_2_Discharge_Pressure,Stage_3_Discharge_Pressure,Successful_Msgs_Today,Suction_Pressure,Suction_Temp,TOW_Comp_Name,Unit_Size,is_failed,Run_Status_Code
461317,Bednorz B 8-10H CMP,8/23/2017 7:30:52.9040069 PM,1503534652904,48.3,178.0,3,218.2382,226.073,220.63,217.26,,,124.0,BEDNORZ_B8_10_CMP,,1452.18,1452.18,380,08/23 18:30:52,1000,2356,1800,85,100,1641,1-RUNNING,16723.8,NO FAULT PRESENT,181.0,420.0,983.0,3307.0,48.3,115.448,BEDNORZ B #8H,KTA19,0,1
461318,Bednorz B 8-10H CMP,8/23/2017 7:32:03.1050109 PM,1503534723105,48.2,178.3748,3,217.8052,225.849,220.3939,216.7877,,,123.8851,BEDNORZ_B8_10_CMP,,1441.008,1441.008,380,08/23 18:32:02,1000,2356,1800,85,100,1640,1-RUNNING,16723.82,NO FAULT PRESENT,180.6252,419.0,979.0,3313.0,48.2,115.3752,BEDNORZ B #8H,KTA19,0,1
461319,Bednorz B 8-10H CMP,8/23/2017 7:34:00.1830139 PM,1503534840183,47.94009,179.0,3,217.0,225.225,220.0,216.0,,,123.6934,BEDNORZ_B8_10_CMP,,1422.375,1422.375,380,08/23 18:33:59,1000,2356,1800,85,100,1613,1-RUNNING,16723.85,NO FAULT PRESENT,180.0,419.6498,990.0,3316.0,47.94009,115.2536,BEDNORZ B #8H,KTA19,0,1
461320,Bednorz B 8-10H CMP,8/23/2017 7:35:03.2860107 PM,1503534903286,47.8,179.0,3,216.6335,224.8393,218.6786,214.8967,,,123.5901,BEDNORZ_B8_10_CMP,,1412.863,1412.863,380,08/23 18:35:02,1000,2356,1800,85,100,1586,1-RUNNING,16723.87,NO FAULT PRESENT,180.3665,420.0,982.0,3322.0,47.8,115.1881,BEDNORZ B #8H,KTA19,0,1
461321,Bednorz B 8-10H CMP,8/23/2017 7:36:52.477005 PM,1503535012477,47.86058,178.0014,3,216.0014,224.0014,216.9986,214.3565,,,123.4113,BEDNORZ_B8_10_CMP,,1413.122,1413.122,380,08/23 18:36:51,1000,2356,1800,85,100,1582,1-RUNNING,16723.9,NO FAULT PRESENT,180.9999,421.0028,967.0,3325.0,47.86058,115.0748,BEDNORZ B #8H,KTA19,0,1
461322,Bednorz B 8-10H CMP,8/23/2017 7:38:03.5360107 PM,1503535083536,47.9,178.9861,3,216.9861,224.9861,216.0139,214.0049,,,123.295,BEDNORZ_B8_10_CMP,,1413.29,1413.29,380,08/23 18:38:03,1000,2356,1800,85,100,1579,1-RUNNING,16723.92,NO FAULT PRESENT,180.9021,423.0,992.0,3331.0,47.9,115.001,BEDNORZ B #8H,KTA19,0,1
461323,Bednorz B 8-10H CMP,8/23/2017 7:39:54.2200012 PM,1503535194220,48.0,177.1634,3,216.6185,224.3878,219.0,216.0,,,123.1138,BEDNORZ_B8_10_CMP,,1413.551,1413.551,380,08/23 18:39:52,1000,2356,1800,85,100,1603,1-RUNNING,16723.95,NO FAULT PRESENT,180.7497,420.0,985.0,3334.0,48.0,114.7849,BEDNORZ B #8H,KTA19,0,1
461324,Bednorz B 8-10H CMP,8/23/2017 7:41:03.7030029 PM,1503535263703,47.9,176.0,3,216.3769,224.0,218.0,215.0,,,123.0,BEDNORZ_B8_10_CMP,,1405.694,1405.694,380,08/23 18:41:02,1000,2356,1800,85,100,1576,1-RUNNING,16723.97,NO FAULT PRESENT,180.6541,420.2794,983.0,3340.0,47.9,114.6486,BEDNORZ B #8H,KTA19,0,1
461325,Bednorz B 8-10H CMP,8/23/2017 7:42:52.0750122 PM,1503535372075,47.7,177.0,3,216.0,225.0,217.2493,214.0,,,122.833,BEDNORZ_B8_10_CMP,,1392.304,1392.304,380,08/23 18:42:51,1000,2356,1800,85,100,1544,1-RUNNING,16724.0,NO FAULT PRESENT,180.5049,420.7153,967.0,3343.0,47.7,114.436,BEDNORZ B #8H,KTA19,0,1
461326,Bednorz B 8-10H CMP,8/23/2017 7:44:02.8680114 PM,1503535442868,47.9,178.0,3,215.6815,224.6075,216.759,213.6075,,,122.7239,BEDNORZ_B8_10_CMP,,1383.557,1383.557,380,08/23 18:44:02,1000,2356,1800,85,100,1572,1-RUNNING,16724.02,NO FAULT PRESENT,180.4075,421.0,996.0,3349.0,47.9,114.2971,BEDNORZ B #8H,KTA19,0,1


In [2]:
from pyspark.sql.functions import col



failed_comp = labled_df.groupBy("Asset_Name", "Facility_ID", "is_failed").count().where("is_failed == 1")

display(labled_df.select('Facility_ID').subtract(failed_comp.select('Facility_ID')))


Facility_ID
BROWN_A_4_5_CMP
KRAUSE_C_1_CMP
BUTLER_W_A5_7_CMP
OLIVER_D3_5_CMP
CANTU_B6_9_CMP


In [3]:
display(labled_df.where(" Facility_ID == 'BUTLER_W_A5_7_CMP'"))

Id,Asset_Name,Local_Timestamp,UTC_Milliseconds,Compressor_Oil_Pressure,Compressor_Oil_Temp,Compressor_Stages,Cylinder_1_Discharge_Temp,Cylinder_2_Discharge_Temp,Cylinder_3_Discharge_Temp,Cylinder_4_Discharge_Temp,Downtime_Hrs_Yest,Engine_Oil_Pressure,Engine_Oil_Temp,Facility_ID,Fuel_Pressure,Gas_Flow_Rate,Gas_Flow_Rate_RAW,Horsepower,Last_Successful_Comm_Time,Max_Discharge_Pressure,Max_Gas_Flowrate,Max_RPMs,Max_Suction_Pressure,Pct_Successful_Msgs_Today,RPM,Run_Status,Runtime_Hrs,SD_Status_Code,Stage_1_Discharge_Pressure,Stage_2_Discharge_Pressure,Stage_3_Discharge_Pressure,Successful_Msgs_Today,Suction_Pressure,Suction_Temp,TOW_Comp_Name,Unit_Size,is_failed,Run_Status_Code
2122520,"Butler, W. A 5-11 CMP",1/1/2017 12:01:07.4670104 AM,1483250467467,53.0,164.0,3,273.0,235.0,233.0,71.0,,53,,BUTLER_W_A5_7_CMP,,,,690,01/01 00:01:07,1000,3200,1400,74,100,1409,1-RUNNING,1193046.0,NO FAULT PRESENT,202.0,302.0,890.0,205,48.0,64.0,W. BUTLER A #5H,3508ULB,0,1
2122521,"Butler, W. A 5-11 CMP",1/1/2017 12:13:37.9620056 AM,1483251217962,53.0,164.0,3,273.0,233.0,227.0,70.0,,53,,BUTLER_W_A5_7_CMP,,,,690,01/01 00:13:38,1000,3200,1400,74,100,1403,1-RUNNING,1193046.0,NO FAULT PRESENT,210.0,311.0,888.0,18,51.0,64.0,W. BUTLER A #5H,3508ULB,0,1
2122522,"Butler, W. A 5-11 CMP",1/1/2017 12:18:36.4869995 AM,1483251516486,53.0,165.0,3,272.0,233.0,229.0,70.0,,53,,BUTLER_W_A5_7_CMP,,,,690,01/01 00:18:36,1000,3200,1400,74,100,1404,1-RUNNING,1193046.0,NO FAULT PRESENT,211.0,312.0,891.0,24,51.0,65.0,W. BUTLER A #5H,3508ULB,0,1
2122523,"Butler, W. A 5-11 CMP",1/1/2017 12:23:36.4950103 AM,1483251816495,53.0,164.0,3,272.0,234.0,228.0,70.0,,53,,BUTLER_W_A5_7_CMP,,,,690,01/01 00:18:36,1000,3200,1400,74,100,1405,1-RUNNING,1193046.0,NO FAULT PRESENT,210.0,309.0,878.0,30,51.0,65.0,W. BUTLER A #5H,3508ULB,0,1
2122524,"Butler, W. A 5-11 CMP",1/1/2017 12:28:40.1210021 AM,1483252120121,53.0,164.0,3,271.0,234.0,229.0,70.0,,53,,BUTLER_W_A5_7_CMP,,,,690,01/01 00:26:07,1000,3200,1400,74,100,1398,1-RUNNING,1193046.0,NO FAULT PRESENT,208.0,308.0,887.0,36,51.0,65.0,W. BUTLER A #5H,3508ULB,0,1
2122525,"Butler, W. A 5-11 CMP",1/1/2017 12:33:37.1800079 AM,1483252417180,53.0,164.0,3,272.0,234.0,230.0,70.0,,53,,BUTLER_W_A5_7_CMP,,,,690,01/01 00:31:07,1000,3200,1400,74,100,1410,1-RUNNING,1193046.0,NO FAULT PRESENT,207.0,311.0,890.0,42,50.0,65.0,W. BUTLER A #5H,3508ULB,0,1
2122526,"Butler, W. A 5-11 CMP",1/1/2017 12:41:07.3860015 AM,1483252867386,53.0,165.0,3,272.0,235.0,230.0,70.0,,53,,BUTLER_W_A5_7_CMP,,,,690,01/01 00:41:07,1000,3200,1400,74,100,1405,1-RUNNING,1193046.0,NO FAULT PRESENT,205.0,305.0,885.0,50,49.0,65.0,W. BUTLER A #5H,3508ULB,0,1
2122527,"Butler, W. A 5-11 CMP",1/1/2017 12:43:52.4680023 AM,1483253032468,53.0,166.0,3,272.0,235.0,230.0,70.0,,53,,BUTLER_W_A5_7_CMP,,,,690,01/01 00:41:07,1000,3200,1400,74,100,1408,1-RUNNING,1193046.0,NO FAULT PRESENT,205.0,307.0,880.0,54,50.0,65.0,W. BUTLER A #5H,3508ULB,0,1
2122528,"Butler, W. A 5-11 CMP",1/1/2017 12:51:06.4190063 AM,1483253466419,53.0,165.0,3,272.0,234.0,229.0,70.0,,53,,BUTLER_W_A5_7_CMP,,,,690,01/01 00:51:06,1000,3200,1400,74,100,1402,1-RUNNING,1193046.0,NO FAULT PRESENT,211.0,311.0,874.0,62,51.0,66.0,W. BUTLER A #5H,3508ULB,0,1
2122529,"Butler, W. A 5-11 CMP",1/1/2017 1:03:38.2170104 AM,1483254218217,53.0,165.0,3,273.0,234.0,229.0,71.0,,53,,BUTLER_W_A5_7_CMP,,,,690,01/01 01:03:38,1000,3200,1400,74,100,1409,1-RUNNING,1193046.0,NO FAULT PRESENT,211.0,316.0,884.0,78,51.0,64.0,W. BUTLER A #5H,3508ULB,0,1


In [4]:
train_df, test_df = randomSplit([0.8, 0.2], 27564)

In [5]:
labled_df.describe()

In [6]:
display(Out[37])

summary,Id,Asset_Name,Local_Timestamp,UTC_Milliseconds,Compressor_Oil_Pressure,Compressor_Oil_Temp,Compressor_Stages,Cylinder_1_Discharge_Temp,Cylinder_2_Discharge_Temp,Cylinder_3_Discharge_Temp,Cylinder_4_Discharge_Temp,Downtime_Hrs_Yest,Engine_Oil_Pressure,Engine_Oil_Temp,Facility_ID,Fuel_Pressure,Gas_Flow_Rate,Gas_Flow_Rate_RAW,Horsepower,Last_Successful_Comm_Time,Max_Discharge_Pressure,Max_Gas_Flowrate,Max_RPMs,Max_Suction_Pressure,Pct_Successful_Msgs_Today,RPM,Run_Status,Runtime_Hrs,SD_Status_Code,Stage_1_Discharge_Pressure,Stage_2_Discharge_Pressure,Stage_3_Discharge_Pressure,Successful_Msgs_Today,Suction_Pressure,Suction_Temp,TOW_Comp_Name,Unit_Size,is_failed,Run_Status_Code
count,16044830.0,16044830,16044830,16044830.0,15420532.0,15434446.0,16044830.0,15408689.0,15400608.0,15402591.0,9558365.0,7836685.0,984638.0,8426859.0,16044830,0.0,14226306.0,14226306.0,16044830.0,16042500,16044830.0,16044830.0,16044830.0,16044830.0,16017811.0,16042812.0,15438570,15952646.0,15662896,15392137.0,15395787.0,15396306.0,16042184.0,15383217.0,9629097.0,16044377,15772116,16044830.0,16044830.0
mean,8022415.5,,,1504644707619.838,69.09302580539607,276.249840040907,3.0,300.2422866896765,310.0705315451295,275.56722746795003,809.7024919118788,1.6371860305149009,8734.33492613529,924.8303478531632,,,1431.1248798577128,1510.8198672597211,390.2403706365228,,958.5609320884048,2010.9349877187851,1661.4415983217025,62.84008986072149,99.16332269122168,1553.8371049913194,0.0,35829.78497039214,756.4485946732299,309.6892969625179,476.46721812432855,939.8720639759366,1973.846066782428,186.9301096647048,205.03226132610132,,3508.0,0.0006513001384246515,0.9254803572241028
stddev,4631743.604138449,,,7061341835.184146,484.298098947696,1984.721277784983,0.0,1313.9979151585303,1614.5381666210617,1266.389775519761,5386.861719586803,5.095416361282485,19736.441301656992,6553.001116609749,,,967.8550792660928,981.9951443105992,216.2466772974747,,187.2852154160555,1266.9528934359444,281.1714401737059,22.14958339602919,3.766782000957744,2327.9694908285283,0.0,143090.09443322063,6286.372257178314,2239.4823113589623,1700.787995054039,1338.7825504572595,1250.9954128855552,1443.4357183107252,2210.585875306401,,0.0,0.0255122713046179,0.3909709653523413
min,1.0,"Baker, A.J. A 2 CMP",1/1/2017 10:01:03.2160034 PM,1483250464737.0,0.0,0.0,3.0,0.0,0.0,0.0,0.0,-0.9516219,0.0,0.0,BAKER_AJ_2H_CMP,,0.0,0.0,0.0,01/01 00:01:04,0.0,0.0,0.0,0.0,0.0,0.0,0,0.0,100,0.0,0.0,0.0,0.0,0.0,0.0,A.J. BAKER A #2H,3304NA,0.0,-1.0
max,16044830.0,"Witte, S. A 1H CMP",9/9/2017 9:59:17.0220031 PM,1514786359984.0,61696.0,65521.0,3.0,65466.0,65472.0,65496.0,65459.0,24.0,64274.0,65459.0,WITTE_S_A_1_CMP,,12510.94,12510.94,690.0,Unknown,1200.0,4179.0,1800.0,110.0,100.0,65516.0,Standby,1193046.0,Unexpected Start,65390.0,65459.0,65459.0,4374.0,61696.0,65531.0,WILLEKE A #1H,KTA19,1.0,3.0


In [7]:
display(labled_df.sample(fraction=0.05, seed=6534).select("Downtime_Hrs_Yest","Engine_Oil_Pressure","Engine_Oil_Temp","Facility_ID"))

Downtime_Hrs_Yest,Engine_Oil_Pressure,Engine_Oil_Temp,Facility_ID
,,123.0,BEDNORZ_B8_10_CMP
,,115.8837,BEDNORZ_B8_10_CMP
,,114.9206,BEDNORZ_B8_10_CMP
,,114.0,BEDNORZ_B8_10_CMP
,,109.6681,BEDNORZ_B8_10_CMP
,,108.2412,BEDNORZ_B8_10_CMP
,,108.0,BEDNORZ_B8_10_CMP
,,106.6593,BEDNORZ_B8_10_CMP
,,104.9264,BEDNORZ_B8_10_CMP
,,103.328,BEDNORZ_B8_10_CMP


In [8]:
import mmlspark.CleanMissingData



In [9]:
import matplotlib.pyplot as plt

import plotly.plotly as py