In [1]:
#all spark imports
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

#python imports
import pandas as pd 
import numpy as np

#instantiate the spark session
spark = SparkSession.builder.appName("Cleaning").getOrCreate()

#set the shuffle partition same as number of cpu cores to improve performance
spark.conf.set("spark.sql.shuffle.partitions", 8)

## Feature name encoding for ease of understading while model development later
###Some of the i0x## feature are renamed based on their characteristics as follows:
##### - last two letters are same as ## in the 'i0x##' in the original feature names.
##### - 'B_' before the last ## means that the feature is binary.
##### - 'T_' before the last ## means that the feature is Ternary.
##### - If a feature is prefixed with 'c_const', it remains constant thorughout a single cycle.
##### - If a feature is prefixed with 'P_const', it remains constant thorughout a single cell-protocol combination.
##### - If a feature starts with 'i0x' it varies throughout the data and it does not have any of the above characteristics.

In [3]:
#define schema with encoded feature names
myschema = StructType([
       StructField("time",TimestampType(),True),
       StructField("ocv",DoubleType(), True),
       StructField("di",DoubleType(),True),
       StructField("c_const_B_2d",IntegerType(),True),
       StructField("c_const_T_c4",DoubleType(),True),
       StructField("i0x91",DoubleType(),True),
       StructField("c_const_B_81",IntegerType(),True),
       StructField("c_const_B_40",IntegerType(),True),
       StructField("c_const_T_32",DoubleType(),True),
       StructField("B_65",IntegerType(),True),
       StructField("i0x2",DoubleType(),True),
       StructField("c_const_T_bc",DoubleType(),True),
       StructField("P_const_30",IntegerType(),True),
       StructField("P_const_9f",IntegerType(),True),
       StructField("c_const_B_6b",IntegerType(),True),
       StructField("c_const_B_9",IntegerType(),True),
       StructField("const_8f",IntegerType(),True),
       StructField("c_const_B_3b",IntegerType(),True),
       StructField("c_const_B_c9",IntegerType(),True),
       StructField("c_const_B_b2",IntegerType(),True),
       StructField("c_const_B_14",IntegerType(),True),
       StructField("c_const_B_76",IntegerType(),True),
       StructField("c_const_B_29",IntegerType(),True),
       StructField("P_const_2c",IntegerType(),True),
       StructField("i0xcd",DoubleType(),True),
       StructField("i0x28",DoubleType(),True),
       StructField("i0xb1",DoubleType(),True),
       StructField("i0x83",DoubleType(),True),
       StructField("i0x8c",DoubleType(),True),
       StructField("i0x6",DoubleType(),True),
       StructField("T_5a",DoubleType(),True),
       StructField("B_78",IntegerType(),True),
       StructField("i0xa7",DoubleType(),True),
       StructField("i0x2a",DoubleType(),True),
       StructField("i0x8a",DoubleType(),True),
       StructField("T_b6",DoubleType(),True),
       StructField("c_const_T_5",DoubleType(),True),
       StructField("i0x94",DoubleType(),True),
       StructField("i0x73",DoubleType(),True),
       StructField("average_di",DoubleType(),True),
       StructField("charge_duration",DoubleType(),True),
       StructField("cell_no", IntegerType(),True),
       StructField("protocol", StringType(),True),
       StructField("cycle",IntegerType(),True)])

#extract column names from schema
column_names = myschema.fieldNames()

In [4]:
#read csv file with all data
all_DF = spark.read.csv("/FileStore/tables/GBatteries_alldata.csv", inferSchema = True, header = True)
#cast time as Timestamp
all_DF = all_DF.withColumn("time", all_DF["time"].cast(TimestampType()))

#create rdd to enforce new column names and create DF
all_rdd = all_DF.rdd.map(list)
all_DF = spark.createDataFrame(all_rdd, column_names).repartition(8).cache()

#materialize the all_DF
all_DF.count()

In [5]:
#add charge_time in schema to be used in next code
myschema.add('charge_time', DoubleType(), nullable=True)

In [6]:
#define a custom function to clean anomalies 
#due to mixed charge and discharge cycles
# and add time passed since charging started as charge_time
def clean_and_add_time(x):
    x = x.sort_values(by = ['time'])
    x['ocv_cube'] = x['ocv']**3
    x['diff'] = x['ocv_cube'].diff(1)
    try:
      idx = np.where(x.iloc[1:]['diff'].lt(0))[0][0]
      x.drop(['ocv_cube', 'diff'], inplace = True, axis = 1)
      x = x.iloc[:idx+1]
      x['charge_time'] = x['time'] - x['time'].iloc[0]
      x['charge_time'] = x['charge_time'] / np.timedelta64(1, 's')
      return x
    except IndexError:
      x.drop(['ocv_cube', 'diff'], inplace = True, axis = 1)
      x['charge_time'] = x['time'] - x['time'].iloc[0]
      x['charge_time'] = x['charge_time'] / np.timedelta64(1, 's')
      return x
    
clean_DF = all_DF.select("*") \
  .groupBy("cell_no","protocol", "cycle") \
  .applyInPandas(clean_and_add_time, schema= myschema) \
  .drop(*['average_di', 'charge_duration', 'time']).cache()

clean_DF.count()

In [7]:
#write the clean_DF as csv file to use later 

#clean_DF.coalesce(1) \
#.orderBy("cell_no","protocol", "cycle") \
#.write.format("com.databricks.spark.csv") \
#.option("header", "true") \
#.save("/FileStore/tables/dir_clean_data.csv")

### Down sample the data to make the series regular

In [9]:
#Down sample the data upto cycle level by aggregating appropriately
cyc_agg_DF = clean_DF.groupBy("cell_no", "protocol", "cycle") \
  .agg(*[avg(c).alias(c) for c in clean_DF.columns if c not in {'cell_no', 'protocol', 'cycle', 'ocv', 'charge_time'}], min('ocv').alias('min_ocv'), max('ocv').alias('max_ocv'), (max('ocv')-min('ocv')).alias('rng_ocv'), max('charge_time').alias('charge_duration')) \
  .withColumn('dur_by_ocv', col('charge_duration')/col('rng_ocv')) \
  .orderBy('cell_no','protocol', 'cycle')

### Feature i0x8f (const_8f) remain constant throughout the data
### Rearrange the columns and drop const_8f

In [11]:
cyc_agg_DF = cyc_agg_DF.select('cell_no', 'protocol', 'cycle', 'di', 'dur_by_ocv', 'min_ocv', 'max_ocv', 'rng_ocv', 'charge_duration', *[i for i in cyc_agg_DF.columns if i.startswith('i0x')], *[i for i in cyc_agg_DF.columns if i.startswith('B_')], *[i for i in cyc_agg_DF.columns if i.startswith('T_')], *[i for i in cyc_agg_DF.columns if i.startswith('c_')], *[i for i in cyc_agg_DF.columns if i.startswith('P_')]).cache()

### Anomaly due to cell not charging (ocv_rng < 70 )

In [13]:
display(cyc_agg_DF.where(cyc_agg_DF['rng_ocv'] < 70))

cell_no,protocol,cycle,di,dur_by_ocv,min_ocv,max_ocv,rng_ocv,charge_duration,i0x91,i0x2,i0xcd,i0x28,i0xb1,i0x83,i0x8c,i0x6,i0xa7,i0x2a,i0x8a,i0x94,i0x73,B_65,B_78,T_5a,T_b6,c_const_B_2d,c_const_T_c4,c_const_B_81,c_const_B_40,c_const_T_32,c_const_T_bc,c_const_B_6b,c_const_B_9,c_const_B_3b,c_const_B_c9,c_const_B_b2,c_const_B_14,c_const_B_76,c_const_B_29,c_const_T_5,P_const_30,P_const_9f,P_const_2c
4,140f77741820c02177597651dfea9fe881c1a73d8e4002a87d0148967cc0f029,29,0.985781990521327,,3632.0,3632.0,0.0,0.0,0.4347826086956521,0.5999333222203701,0.0295147573786893,0.851063829787234,0.2497655517349172,0.64,0.4722222222222222,1.0,0.0847457627118644,1.0,0.2530612244897959,0.019673224408136,0.023609443777511,0.0,1.0,0.0,0.75,0.0,0.0,0.0,1.0,0.5,0.0909090909090909,1.0,0.0,1.0,1.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
5,09942314d31dd2553f1e7f827d9e57ce8d811a8b9b7d8fe75fd372c4910b06db,11,0.6184834123222749,0.5413181818181818,3439.0,3461.0,22.0,11.909,0.4347826086956521,0.3332222037006168,0.0045022511255627,0.9574468085106383,0.1872460143794936,0.652,0.4722222222222222,0.9473684210526316,0.0847457627118644,0.031719532554257,0.0285714285714285,0.0030010003334444,0.0036014405762304,1.0,1.0,0.0,0.75,0.0,0.0,0.0,1.0,0.5,0.0909090909090909,1.0,0.0,1.0,1.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
5,e4615c5798e4279178bd1cfde95118076e87e25239e39b43291a6356b351bc37,12,0.5995260663507109,0.4196551724137931,3365.0,3394.0,29.0,12.17,0.4347826086956521,0.2332055342557093,0.0007503751875937969,0.5319148936170213,0.2497655517349172,0.652,0.4722222222222222,0.9473684210526316,0.0847457627118644,0.031719532554257,0.0285714285714285,0.0005001667222407468,0.0006002400960384152,1.0,1.0,0.0,0.75,0.0,0.0,0.0,1.0,0.5,0.0909090909090909,1.0,0.0,1.0,1.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
5,f38daef78503f7c81cef066904fe29c4b2acf6acd96a0153230d80f21cd0905d,11,0.6232227488151658,0.37215625,3365.0,3397.0,32.0,11.909,0.4347826086956521,0.4999166527754626,0.017008504252126,0.5319148936170213,0.2497655517349172,0.652,0.4722222222222222,0.9473684210526316,0.0847457627118644,0.031719532554257,0.0285714285714285,0.0113371123707902,0.0136054421768707,1.0,1.0,0.0,0.75,0.0,0.0,0.0,1.0,0.5,0.0909090909090909,1.0,0.0,1.0,1.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
9,0ee15df0e1233198be555cea609b7726e1dc914dfa3664a9ed67eca4dd6fb625,1,0.5708746230073246,,3464.0,3464.0,0.0,91.549,0.2173913043478261,1.0,0.0,0.2127659574468085,0.1872460143794936,0.7999999999999999,0.4722222222222222,0.9521531100478468,0.0,0.031719532554257,0.0163265306122449,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.5,0.0,0.0,0.0
9,0ee15df0e1233198be555cea609b7726e1dc914dfa3664a9ed67eca4dd6fb625,2,0.5687203791469194,,3464.0,3464.0,0.0,0.0,0.217391304347826,0.1155192532088681,5.002501250625313e-05,0.2127659574468085,0.1872460143794936,0.8,0.4722222222222222,0.9521531100478468,0.0,0.031719532554257,0.0163265306122449,3.334444814938313e-05,4.001600640256102e-05,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.5,0.0,0.0,0.0
9,0ee15df0e1233198be555cea609b7726e1dc914dfa3664a9ed67eca4dd6fb625,3,0.5471300684570826,,3463.0,3463.0,0.0,577.22,0.2173913043478261,0.115519253208868,0.00015007503751875948,0.2127659574468086,0.1872460143794933,0.7999999999999992,0.4722222222222216,0.952153110047846,0.0,0.0317195325542571,0.0163265306122448,0.00010003334444814932,0.0001200480192076832,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.5,0.0,0.0,0.0
9,0ee15df0e1233198be555cea609b7726e1dc914dfa3664a9ed67eca4dd6fb625,4,0.5492007390151816,193.38899966666668,3463.0,3466.0,3.0,580.166999,0.2173913043478261,0.115519253208868,0.00025012506253126534,0.2127659574468086,0.1872460143794933,0.7999999999999994,0.4722222222222216,0.952153110047846,0.0,0.0317195325542571,0.0163265306122448,0.0001667222407469155,0.0002000800320128052,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.5,0.0,0.0,0.0
9,0ee15df0e1233198be555cea609b7726e1dc914dfa3664a9ed67eca4dd6fb625,5,0.5563981042654029,37.055,3463.0,3464.0,1.0,37.055,0.217391304347826,0.1155192532088681,0.0003501750875437718,0.2127659574468085,0.1872460143794936,0.8,0.4722222222222222,0.9521531100478468,0.0,0.031719532554257,0.0163265306122449,0.00023341113704568192,0.00028011204481792715,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.5,0.0,0.0,0.0
9,0ee15df0e1233198be555cea609b7726e1dc914dfa3664a9ed67eca4dd6fb625,6,0.5558877141815529,146.45,3464.0,3468.0,4.0,585.8,0.2173913043478261,0.115519253208868,0.0005002501250625307,0.2127659574468086,0.1872460143794933,0.7999999999999992,0.4722222222222216,0.952153110047846,0.0,0.0317195325542571,0.0163265306122448,0.0003334444814938309,0.0004001600640256105,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.5,0.0,0.0,0.0


###After looking at the csv files for above cells, It is found that only observations from cell 4 and 9 is corrupt. Below filter captures those 66 observations.

In [15]:
fltr  = (cyc_agg_DF['rng_ocv'] < 70) & ( (cyc_agg_DF['dur_by_ocv'].isNull()) | (cyc_agg_DF['dur_by_ocv'] > 9) )
cyc_agg_DF = cyc_agg_DF.where( ~fltr )

cyc_agg_DF.count()

In [16]:
#write the cyc_agg_DF as csv file to use later 

#cyc_agg_DF.coalesce(1) \
#.orderBy("cell_no","protocol", "cycle") \
#.write.format("com.databricks.spark.csv") \
#.option("header", "true") \
#.save("/FileStore/tables/dir_agg_clean_DF.csv")