**Sync With Drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Import necessary libraries

In [None]:
import numpy as np
import pandas as pd

import dask.dataframe as dd
import dask.array as da
import dask.bag as db

# **1. Import the main csv file sent by embedded team**

In [None]:
# ddf = dd.read_csv("/content/drive/MyDrive/Senzmate/RCA/jsonoutput3.csv", blocksize=25e6) # - deviceId ValueError("invalid literal for int() with base 10: 'peercore-F'")

ddf = dd.read_csv("/content/drive/MyDrive/Senzmate/RCA/jsonoutput3.csv", dtype={'deviceId': 'object'})



In [None]:
ddf.head()

Unnamed: 0.1,Unnamed: 0,_id,_class,deviceId,number,code,time,value,creationDate,modifiedDate
0,0,61cf998d2d3b872d3baa528e,com.magma.core.data.entity.Sensor,869170034808734,0,T,2022-01-01T00:00:13.521Z,21.14,2022-01-01T00:00:13.522Z,2022-01-01T00:00:13.522Z
1,1,61cf998d2d3b872d3baa528f,com.magma.core.data.entity.Sensor,869170034808734,1,H,2022-01-01T00:00:13.521Z,103.10,2022-01-01T00:00:13.523Z,2022-01-01T00:00:13.523Z
2,2,61cf998d2d3b872d3baa5290,com.magma.core.data.entity.Sensor,869170034808734,2,MEA4,2022-01-01T00:00:13.521Z,545.00/163.00,2022-01-01T00:00:13.524Z,2022-01-01T00:00:13.524Z
3,3,61cf998d2d3b872d3baa5291,com.magma.core.data.entity.Sensor,869170034808734,3,B,2022-01-01T00:00:13.521Z,266,2022-01-01T00:00:13.524Z,2022-01-01T00:00:13.524Z
4,4,61cf99902d3b872d3baa5297,com.magma.core.data.entity.Sensor,869170034809062,0,T,2022-01-01T00:00:16.445Z,13.44,2022-01-01T00:00:16.445Z,2022-01-01T00:00:16.445Z


# **Recreating a CSV by adding error flags directly to main csv**

Here didn't group the devices. If the dataponit is an error then the error flag i 1, otherwise 0.(If the datapoint is outlier => outlier flag =1)

**remove unwanted column**

In [None]:
ddf = ddf.drop(columns=['Unnamed: 0', '_id', '_class', 'number', 'creationDate', 'modifiedDate'])
ddf.head()

Unnamed: 0,deviceId,code,time,value
0,869170034808734,T,2022-01-01T00:00:13.521Z,21.14
1,869170034808734,H,2022-01-01T00:00:13.521Z,103.10
2,869170034808734,MEA4,2022-01-01T00:00:13.521Z,545.00/163.00
3,869170034808734,B,2022-01-01T00:00:13.521Z,266
4,869170034809062,T,2022-01-01T00:00:16.445Z,13.44


**Remove unwanted sensor data**

In [None]:
needed_values = ["B", "IT", "LIA1", "H", "IRO", "T", "ST", "SS"]
ddf2 = ddf.loc[ddf['code'].isin(needed_values)].reset_index(drop=True)

In [None]:
ddf2.head()

Unnamed: 0,deviceId,code,time,value
0,869170034808734,T,2022-01-01T00:00:13.521Z,21.14
1,869170034808734,H,2022-01-01T00:00:13.521Z,103.1
2,869170034808734,B,2022-01-01T00:00:13.521Z,266.0
3,869170034809062,T,2022-01-01T00:00:16.445Z,13.44
4,869170034809062,H,2022-01-01T00:00:16.445Z,98.55


**outlier detection**

In [None]:
VALUES_RANGE = {'B': {"min": 260, "max": 314},
                'IT': {"min": -55, "max": 125},
                'LIA1': {"min": 0, "max": 65535},
                'H': {"min": 0, "max": 110},
                'IRO': {"min": 0, "max": 200},
                'T': {"min": -40, "max": 125},
                'ST': {"min": -55, "max": 125},
                'SS': {"min": 2, "max": 30},
                }
def detect_outlier(row):
  try:
    if VALUES_RANGE[row.code]['min'] <= float(row.value) <= VALUES_RANGE[row.code]['max']:
      return 0
  except:
    pass
  return 1

In [None]:
ddf2['outlier_data_flag'] = ddf2.apply(lambda row: detect_outlier(row), axis=1, meta=pd.Series(dtype="int32"))

In [None]:
ddf2.head(5)

Unnamed: 0,deviceId,code,time,value,outlier_data_flag
0,869170034808734,T,2022-01-01T00:00:13.521Z,21.14,0
1,869170034808734,H,2022-01-01T00:00:13.521Z,103.1,0
2,869170034808734,B,2022-01-01T00:00:13.521Z,266.0,0
3,869170034809062,T,2022-01-01T00:00:16.445Z,13.44,0
4,869170034809062,H,2022-01-01T00:00:16.445Z,98.55,0


In [None]:
ddf2.tail(5)

Unnamed: 0,deviceId,code,time,value,outlier_data_flag
283293,8691700348087260,T,2022-07-01T00:00:00.000Z,18.15,0
283294,8691700348087260,H,2022-07-01T00:00:00.000Z,101.55,0
283295,8691700348087260,B,2022-07-01T00:00:00.000Z,273.0,0
283296,8691700348087260,IT,2022-07-01T00:00:00.000Z,43.0,0
283297,8691700348087260,ST,2022-07-01T00:00:00.000Z,0.0,0


**high volume check**

In [None]:
ddf2['high_volume'] = 1
ddf2.groupby(['deviceId','code', 'time']).high_volume.count().reset_index()

Unnamed: 0_level_0,deviceId,code,time,high_volume
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
,object,object,object,int64
,...,...,...,...


In [None]:
ddf2.head(5)

Unnamed: 0,deviceId,code,time,value,outlier_data_flag,high_volume
0,869170034808734,T,2022-01-01T00:00:13.521Z,21.14,0,1
1,869170034808734,H,2022-01-01T00:00:13.521Z,103.1,0,1
2,869170034808734,B,2022-01-01T00:00:13.521Z,266.0,0,1
3,869170034809062,T,2022-01-01T00:00:16.445Z,13.44,0,1
4,869170034809062,H,2022-01-01T00:00:16.445Z,98.55,0,1


In [None]:
# ddf2.to_csv('/content/drive/MyDrive/Senzmate/RCA/result_1/export-*.csv', index=False)  

In [None]:
ddf2['high_volume_flag'] = ddf2.apply(lambda row: 1 if row.high_volume > 1 else 0, axis=1, meta=pd.Series(dtype="int32"))
ddf2.head(5)

Unnamed: 0,deviceId,code,time,value,outlier_data_flag,high_volume,high_volume_flag
0,869170034808734,T,2022-01-01T00:00:13.521Z,21.14,0,1,0
1,869170034808734,H,2022-01-01T00:00:13.521Z,103.1,0,1,0
2,869170034808734,B,2022-01-01T00:00:13.521Z,266.0,0,1,0
3,869170034809062,T,2022-01-01T00:00:16.445Z,13.44,0,1,0
4,869170034809062,H,2022-01-01T00:00:16.445Z,98.55,0,1,0


In [None]:
# ddf2.to_csv('/content/drive/MyDrive/Senzmate/RCA/result_2/export-*.csv') 

In [None]:
# ddf2['high_volume'] = ddf2.groupby(['deviceId', 'code', 'time'])['value'].transform('size')
# ddf2['high_volume_flag'] = ddf2.apply(lambda row: 1 if row.high_volume > 1 else 0, axis=1, meta=pd.Series(dtype="int64"))

In [None]:
ddf2.head(5)

Unnamed: 0,deviceId,code,time,value,outlier_data_flag,high_volume,high_volume_flag
0,869170034808734,T,2022-01-01T00:00:13.521Z,21.14,0,1,0
1,869170034808734,H,2022-01-01T00:00:13.521Z,103.1,0,1,0
2,869170034808734,B,2022-01-01T00:00:13.521Z,266.0,0,1,0
3,869170034809062,T,2022-01-01T00:00:16.445Z,13.44,0,1,0
4,869170034809062,H,2022-01-01T00:00:16.445Z,98.55,0,1,0


In [1]:
ddf2['miss_data_flag'] = 0
ddf2.head(0)

NameError: ignored

In [None]:
grp_deviceId_time = ddf2.groupby(['deviceId', 'time'])

In [None]:
# ddf2.groupby(['deviceId', 'time']).apply(func)

In [None]:
l1 = []
def my_function(df):
  captured_values = df['code'].values
  res = [*set(captured_values)]
  for sensor in needed_values:
    if sensor not in res:
      new_row = {'deviceId':df['deviceId'].iat[0], 'code':sensor, 'time':df['time'].iat[0], 'value':0, 'outlier_data_flag':0, 'high_volume':0, 'high_volume_flag':0, 'miss_data_flag': 1}
      new_df = pd.DataFrame.from_dict(new_row)

      df.append(new_df)
  l1.append(df)
  return df

In [None]:
meta_df = pd.DataFrame(columns=['outlier_data_flag', 'high_volume', 'high_volume_flag', 'miss_data_flag'])

ddf2.groupby(['deviceId', 'time']).apply(my_function, meta=meta_df).compute()

ValueError: ignored

In [None]:
result = dd.concat(l1)

In [None]:
from joblib import Parallel, delayed
import multiprocessing

needed_values = ["B", "IT", "LIA1", "H", "IRO", "T", "ST", "SS"]

def tmpFunc(df):
  captured_values = ddf['code'].values
  res = [*set(captured_values)]
  for sensor in needed_values:
    if sensor not in res:
      new_row = {'deviceId':id[0], 'code':sensor, 'time':id[1], 'value':0, 'outlier_data_flag':0, 'high_volume':0, 'high_volume_flag':0, 'miss_data_flag': 1}
      new_df = pd.DataFrame.from_dict(new_row)

      df.append(new_df)
  return df

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)

result = ddf2.groupby(['deviceId', 'time']).apply(tmpFunc)

ValueError: ignored

In [None]:
needed_values = ["B", "IT", "LIA1", "H", "IRO", "T", "ST", "SS"]
frames = []
i = 0
for id, ddf in grp_deviceId_time:
  if i > 3:
    break
  captured_values = ddf['code'].values
  res = [*set(captured_values)]
  for sensor in needed_values:
    if sensor not in res:
      new_row = {'deviceId':id[0], 'code':sensor, 'time':id[1], 'value':0, 'outlier_data_flag':0, 'high_volume':0, 'high_volume_flag':0, 'miss_data_flag': 1}
      new_df = pd.DataFrame.from_dict(new_row)

      ddf.append(new_df)
  i = i + 1
  frames.append(ddf)
  # result = pd.concat([result, df], axis=1)
result = dd.concat(frames)
result['miss_data_flag'] = result['miss_data_flag'].fillna(0).astype('Int32')
result.head()

NotImplementedError: ignored

In [None]:
result.to_csv('/content/drive/MyDrive/RCA/result_20.csv')

In [None]:
needed_values = ["B", "IT", "LIA1", "H", "IRO", "T", "ST", "SS"]
new_index = Index(needed_values, name="code")
# result = pd.DataFrame()
frames = []
for id,df in grp_deviceId_time:
  df['miss_data_flag'] = 0
  df.set_index("code").reindex(new_index).reset_index()
  df['deviceId'] = df['deviceId'].fillna(id[0])
  df['time'] = df['time'].fillna(id[1])
  df['miss_data_flag'] = df['miss_data_flag'].fillna(0).astype('Int32')

  frames.append(df)

result = pd.concat(frames, axis=1)
result['miss_data_flag'] = result['miss_data_flag'].fillna(0).astype('Int32')
result.head(5)

NameError: ignored

In [None]:
result.to_csv('/content/drive/MyDrive/Senzmate/RCA/result_2.csv')

# **=====> creating datasets for each devices and add error flag columnsin table and create json file for each devices contain their errors**

# **2. Group the dataframe for each device**

*   "deviceId" is used to group

In [None]:
grp_deviceId = df.groupby('deviceId')

In [None]:
VALUES_RANGE = {'B': {"min": 260, "max": 314},
                'IT': {"min": -55, "max": 125},
                'LIA1': {"min": 0, "max": 65535},
                'H': {"min": 0, "max": 110},
                'IRO': {"min": 0, "max": 200},
                'T': {"min": -40, "max": 125},
                'ST': {"min": -55, "max": 125},
                'SS': {"min": 2, "max": 30},
                }
def detect_outlier(value_array, sensor_name):
  min_value = VALUES_RANGE[sensor_name]['min']
  max_value = VALUES_RANGE[sensor_name]['max']
  try:
    value_array_float = value_array.astype(np.float)
    detect = np.any((value_array_float <= max_value)|(value_array_float >= min_value ))
    if detect: 
      return 0
  except:
    pass
    # print("sensor_name ", sensor_name, " value_array ", value_array)
  return 1


# **3. Create new dataframe for each devices with error flags**

In [None]:
needed_values = ["B", "IT", "LIA1", "H", "IRO", "T", "ST", "SS"]


for id,df in grp_deviceId:
  df = df[['time','code','value']]
  df_columns = list(df.columns)
  df = df.groupby(['time', 'code'])['value'].apply(list).reset_index()        # group the values for a same instant of a sensor(Find missing and high volume flags)
  pivot_df = df.reset_index().pivot(values='value', index='time', columns='code') # unmelt the sensor category

  for column in pivot_df.columns:
    if column in needed_values:
      data_miss_flag = '{}_data_miss_flag'.format(column)
      high_volume = '{}_high_volumn'.format(column)
      high_volume_flag = '{}_high_volumn_flag'.format(column)

      outlier_flag = '{}_outlier_flag'.format(column)

      pivot_df[data_miss_flag] = pivot_df.apply(lambda row: 1 if np.array(pd.isnull(row[column])).any() else 0, axis=1)
      pivot_df[high_volume] = pivot_df.apply(lambda row: len(row[column]) if row[data_miss_flag] == 0 else 0, axis=1)
      pivot_df[high_volume_flag] = pivot_df.apply(lambda row: 1 if row[high_volume] > 1 else 0, axis=1)
      pivot_df[outlier_flag] = pivot_df.apply(lambda row: detect_outlier(np.array(row[column]), column) if row[data_miss_flag] == 0 else 0, axis=1)

      pivot_df = pivot_df.drop(high_volume, axis=1)
    else:
      pivot_df = pivot_df.drop(column, axis=1)

  pivot_df.to_csv('/content/drive/MyDrive/Senzmate/RCA/Analyzed_dataframes_for_each_devices/{}.csv'.format(id))

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  


In [None]:
{'CN', 'SS', 'CS', 'RF', 'N', 'B', 'MEA3', 'X', 'M', 'H', 'MEA2', 'ST', 'WD', 'P', 'WS', 'LIA1', 'IT', 'MEA4', 'T', 'MEA', 'CT', 'Y', 'IRO', 'MEA1'}


# **4. Create json files for each device to record its faults**

In [None]:
path = '/content/drive/MyDrive/Senzmate/RCA/Analyzed_dataframes_for_each_devices/'
os.chdir(path)
csv_files = glob.glob('*.csv')
print(csv_files)

['0033201246.csv', '0033208845.csv', '1031324235.csv', '1032721006.csv', '1033201220.csv', '1033915738.csv', '1036217273.csv', '1136917432.csv', '2030564842.csv', '1036917432.csv', '1044444444.csv', '2035371870.csv', '2038599606.csv', '29032431423.csv', '3032721006.csv', '3601225.csv', '4031324235.csv', '4035663649.csv', '64034160230.csv', '62035787778.csv', '42220711.csv', '67023401394.csv', '56030210204.csv', '67023401626.csv', '67023402079.csv', '70033201204.csv', '70033894495.csv', '70033894792.csv', '7023401402.csv', '70034808528.csv', '70034810839.csv', '7028360827.csv', '72035398465.csv', '74032175024.csv', '74033087103.csv', '861311002933124.csv', '861311003410015.csv', '861311003410023.csv', '861311003411179.csv', '861311003412789.csv', '861311003414587.csv', '861311003420956.csv', '861311003609699.csv', '861311004363833.csv', '8635840320262950.csv', '8643690373854100.csv', '865067023394920.csv', '8647640306331980.csv', '8650670233952730.csv', '865067023395711.csv', '865067023

In [None]:
needed_values = ["B", "IT", "LIA1", "H", "IRO", "T", "ST", "SS"]

for id_csv_file in csv_files:
  id = Path(id_csv_file).stem
  # print("id ", id)
  save_data = {}
  fault_detected = False
  pivot_df = pd.read_csv("/content/drive/MyDrive/Senzmate/RCA/Analyzed_dataframes_for_each_devices/{}".format(id_csv_file),  on_bad_lines='skip')
  for column in pivot_df.columns:
    if column in needed_values:
      save_data[column] = {}
      grp_by_data_miss_flag = pivot_df.groupby('{}_data_miss_flag'.format(column))
      if 1 in grp_by_data_miss_flag.groups:
        miss_data_df = grp_by_data_miss_flag.get_group(1)
        save_data[column]["miss_data_error"] = list(miss_data_df['time'])
        fault_detected = True

      grp_by_data_high_volumn_flag = pivot_df.groupby('{}_high_volumn_flag'.format(column))
      if 1 in grp_by_data_high_volumn_flag.groups:
        high_volumn_data_df = grp_by_data_high_volumn_flag.get_group(1)
        save_data[column]["high_volumn_error"] = list(high_volumn_data_df['time'])
        fault_detected = True
      
      grp_by_outlier_flag = pivot_df.groupby('{}_outlier_flag'.format(column))
      if 1 in grp_by_outlier_flag.groups:
        outlier_data_df = grp_by_outlier_flag.get_group(1)
        save_data[column]["outlier_data_error"] = list(outlier_data_df['time'])
        fault_detected = True
  
  if fault_detected:
    with open("/content/drive/MyDrive/Senzmate/RCA/Analyzed_dataframes_for_each_devices/{}.json".format(id), "w") as outfile:
      json.dump(save_data, outfile)