## Load data

In [53]:
data_lake_account_name = 'synapseaikorcen'
file_system_name = 'synapseaikorcen'

In [12]:
%%pyspark

for name in ["RW3", "RW4", "RW5", "RW6"]:
    print(f"start saving as table: {name}")
    spark.read.format("csv").load(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/battery/orig/{name}.csv",sep = ',',header=True).write.mode("overwrite").saveAsTable(f"default.battery_orig_{name}")
    print(f"finished saving as table: {name}")


In [26]:
%%sql
select * from battery_orig_rw3 limit 1000


In [57]:
%%sql
select 'rw3', min(_c0) from battery_orig_rw3 where capacity <= 1.6
union all
select 'rw4', min(_c0) from battery_orig_rw4 where capacity <= 1.6
union all
select 'rw5', min(_c0) from battery_orig_rw5 where capacity <= 1.6
union all
select 'rw6', min(_c0) from battery_orig_rw6 where capacity <= 1.6


In [25]:
%%sql
select * from battery_orig_rw3
where _c0 between 1260000 and 1261000

In [69]:
%%sql
select 'rw3' unit,
		_c0,
        AVG(capacity) OVER (
			ORDER BY _c0
			ROWS BETWEEN 15 PRECEDING AND 15 FOLLOWING
		) MovAvg
 from battery_orig_rw3
where _c0 between 1260000 and 1261000

In [20]:
%%sql
select 'rw3' file, min(time), max(time), min(capacity), max(capacity), count(*) cnt from battery_orig_rw3
union all
select 'rw4' file, min(time), max(time), min(capacity), max(capacity), count(*) cnt from battery_orig_rw4
union all
select 'rw5' file, min(time), max(time), min(capacity), max(capacity), count(*) cnt from battery_orig_rw5
union all
select 'rw6' file, min(time), max(time), min(capacity), max(capacity), count(*) cnt from battery_orig_rw6

## Explore and transform data

### Try visualizing moving average 

In [65]:
%%sql
SELECT * FROM
(
	SELECT
		'rw3' unit,
		_c0
		,AVG(capacity) OVER (
			ORDER BY _c0
			ROWS BETWEEN 15 PRECEDING AND 15 FOLLOWING
		) MovAvg
	FROM battery_orig_rw3
	WHERE _c0 < 1300000

	union ALL
	SELECT
		'rw4' unit,
		_c0
		,AVG(capacity) OVER (
			ORDER BY _c0
			ROWS BETWEEN 15 PRECEDING AND 15 FOLLOWING
		) MovAvg
	FROM battery_orig_rw4
	WHERE _c0 < 1300000
	
	union ALL
	SELECT
		'rw5' unit,
		_c0
		,AVG(capacity) OVER (
			ORDER BY _c0
			ROWS BETWEEN 15 PRECEDING AND 15 FOLLOWING
		) MovAvg
	FROM battery_orig_rw5
	WHERE _c0 < 1300000
	
	union ALL
	SELECT
		'rw6' unit,
		_c0
		,AVG(capacity) OVER (
			ORDER BY _c0
			ROWS BETWEEN 15 PRECEDING AND 15 FOLLOWING
		) MovAvg
	FROM battery_orig_rw6
	WHERE _c0 < 1300000
) a
WHERE _c0 % (10000000 / 1000) = 0


### Observation

- 31 Cycle 이동평균이 1.6 (기준치 2.0으로 봤을 때 80% 수준)에 도달하는 것은 128만 사이클보다 나중임.

- 다만 이동평균을 고려하지 않으면 위에서 capacity < 1.6 인 min(_c0)에서 봤듯이 이보다 이전에 발생함.

## Baseline 확인


In [67]:
%%sql
SELECT 'rw3', capacity, * FROM battery_orig_rw3 WHERE _c0 = 0
UNION ALL
SELECT 'rw4', capacity, * FROM battery_orig_rw3 WHERE _c0 = 0
UNION ALL
SELECT 'rw5', capacity, * FROM battery_orig_rw3 WHERE _c0 = 0
UNION ALL
SELECT 'rw6', capacity, * FROM battery_orig_rw3 WHERE _c0 = 0


### Observation

모두 같은 값이므로 지금은 고민하지 말고 상수 값으로 봄 

In [53]:
import pandas as pd
import numpy as np
sql_statement = '''
SELECT
	'rw3' unit,
	_c0
	,AVG(capacity) OVER (
		ORDER BY _c0
		ROWS BETWEEN 100 PRECEDING AND 100 FOLLOWING
	) FifteenCycleMovAvg
FROM battery_orig_rw3
WHERE _c0 < 5000

union ALL
SELECT
	'rw4' unit,
	_c0
	,AVG(capacity) OVER (
		ORDER BY _c0
		ROWS BETWEEN 100 PRECEDING AND 100 FOLLOWING
	) FifteenCycleMovAvg
FROM battery_orig_rw4
WHERE _c0 < 5000

'''
df_data = spark.sql(sql_statement).toPandas()
display(df_data)


### Observation

- 1000 사이클 이전에는 줄어들었다가 다시 원복되는 과정이 있는데, 이 구간에서도 fail를 예측해야 하는지? 
- 패턴이 다른 구간과 다르므로 이 구간의 예측이 불필요하다면 1000 사이클 이전은 제외하고 나머지 데이터만으로 진행하는 것이 괜찮을지? 

In [91]:
sql_statement = '''
SELECT
	'rw3' unit, _c0 cycle, time, voltage, current, temperature, capacity
FROM battery_orig_rw3
'''
df_data = spark.sql(sql_statement).toPandas()
df_data.describe()

In [97]:
df_data.min()

In [98]:
df_data.max()

### time 컬럼의 속성 확인

In [77]:
%%sql 
select time, int(time) int_time, time - int(time) from battery_orig_rw3 limit 100

In [78]:
%%sql 
select distinct time - int(time) from battery_orig_rw3 limit 100

In [79]:
%%sql
select count(distinct int(time)) from battery_orig_rw3


In [80]:
%%sql
select count(distinct time) from battery_orig_rw3


In [85]:
%%sql
select int_time, count(*)
from 
(
select time, int(time) int_time from battery_orig_rw3
) a
group by int_time
having count(*) > 1
limit 100

In [86]:
%%sql
select * from battery_orig_rw3 where int(time) = 262183

## Observation

- 위에서 볼 수 있듯이 한 사이클에 여러 값이 매우 가까운 time 값으로 존재할 수 있음
- current값이 0인 경우도 있는데 평균으로 요약하면 왜곡이 될지??
- 0이 누락된 값인지 실측이 0으로 된 건지 판단이 필요. 누락이라면 imputation이 필요할 수 있으나, 다른 레코드에서 음수도 있는 것을 보아 정상값일 수도 있음
- 또는 이 4건을 1건으로 요약하지 말고 그대로 사용할 수도 있는지 봐야 함. 다만 이후 AutoML에서는 time이 키 값이 되어야 하므로 단수차이가 별도 값으로 표현되는지 봐야 함

In [99]:
%%sql
select * from battery_orig_rw3 where int(time) = 792764

이 내용을 보면 int(time) 기준으로 평균으로 구해도 될 것 같아 보이기도 하나, current 0 자체가 의미 있는 데이터로 보여 요약 하지 않고 그대로 진행. 

## Prepare data

Automated ML Forecasting은 현재 Datetime 데이터를 기준으로 해야 하므로, time을 Base datetime 기준으로 변형하도록 함. 향후 결과 해석시에도 역으로 변형 필요함.
time의 단위는 second로 봄 (NASA https://c3.nasa.gov/dashlink/resources/133/ 참고: 확인 필요)

In [49]:
import pandas as pd
import numpy as np
sql_statement = '''
SELECT from_unixtime(unix_timestamp(to_date("2020-01-01")) + 1)
'''

df_data = spark.sql(sql_statement)
display(df_data)
# type(df_data)

In [50]:
import pandas as pd
import numpy as np
sql_statement = '''
SELECT
	'rw3' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime
FROM battery_orig_rw3
WHERE _c0 < 200
'''

df_data = spark.sql(sql_statement)
display(df_data)

# with Pandas:

# df_data = spark.sql(sql_statement).toPandas()

# # print(df_data['time'])

# df_data['datetime'] = df_data['time'].apply(cycle_to_datetime)
# # print(df_data['datetime'])

# # print(df_data['datetime'].apply(datetime_to_cycle))




# # with Spark DataFrame
# from pyspark.sql.functions import col


# cycle_to_datetime = spark.udf.register("cycle_to_datetime", cycle_to_datetime)

# new_df = df_data.select(cycle_to_datetime(col("time")))
# display(new_df)



# df_data = spark.sql(sql_statement)
# df_data.withColumn('datetime', cycle_to_datetime(col('time')))

# display(df_data)





In [51]:
df_data.describe().show()

대용량이면 PySpark Dataframe으로 하는 것이 Pandas Dataframe보다 성능상 나음.

In [None]:
# display(df_data['time'].apply(cycle_to_datetime))
# display(df_data)


# df_data['homeOwnership'] = df_data['homeOwnership'].replace('nan', np.nan).fillna(0)
# df_data['isJointApplication'] = df_data['isJointApplication'].replace('nan', np.nan).fillna(0)


# df_data['dtiRatio'] = df_data['dtiRatio'].astype(np.float32)
# df_data['annualIncome'] = df_data['annualIncome'].astype(np.float32)
# df_data['lengthCreditHistory'] = df_data['lengthCreditHistory'].astype(np.float32)
# df_data['numTotalCreditLines'] = df_data['numTotalCreditLines'].astype(np.float32)
# df_data['numOpenCreditLines'] = df_data['numOpenCreditLines'].astype(np.float32)
# df_data['numOpenCreditLines1Year'] = df_data['numOpenCreditLines1Year'].astype(np.float32)
# df_data['revolvingBalance'] = df_data['revolvingBalance'].astype(np.float32)
# df_data['revolvingUtilizationRate'] = df_data['revolvingUtilizationRate'].astype(np.float32)
# df_data['numDerogatoryRec'] = df_data['numDerogatoryRec'].astype(np.float32)
# df_data['numDelinquency2Years'] = df_data['numDelinquency2Years'].astype(np.float32)
# df_data['numChargeoff1year'] = df_data['numChargeoff1year'].astype(np.float32)
# df_data['numInquiries6Mon'] = df_data['numInquiries6Mon'].astype(np.float32)
# df_data['loanAmount'] = df_data['loanAmount'].astype(np.float32)
# df_data['interestRate'] = df_data['interestRate'].astype(np.float32)
# df_data['monthlyPayment'] = df_data['monthlyPayment'].astype(np.float32)

# df_data['incomeVerified'] = df_data['incomeVerified'].astype(np.int64)
# df_data['isJointApplication'] = df_data['isJointApplication'].astype(np.int64)

# #df_data.columns

# df_data = df_data.replace(float('nan'), None)

# df_data_sp = spark.createDataFrame(df_data)
# df_data_sp.write.mode("overwrite").saveAsTable("default.creditrisk_data")

# # data_lake_account_name = 'ncsynapsews11acc'
# # file_system_name = 'ncsynapsews11fs'
# df_data_sp.coalesce(1).write.option('header', 'true').mode('overwrite').csv(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/creditrisk/automlprepareddata/')

# drop_cols = ['memberId', 'loanId', 'date','grade']
# df_data = df_data.drop(drop_cols, axis=1)

## Data Qualify check

### Duplicate Index

In [56]:
import pandas as pd
import numpy as np
sql_statement = '''
SELECT 'rw3' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime FROM battery_orig_rw3
'''
df_data = spark.sql(sql_statement)

In [60]:
df1=df_data.groupBy("datetime").count().filter("count > 1")
df1.show()

In [83]:
import pandas as pd
import numpy as np
sql_statement = '''
SELECT unix_timestamp(to_timestamp("2020-01-01 00:00:00.000000")) + 2.6 
'''
df_data = spark.sql(sql_statement)
display(df_data)

# SELECT from_unixtime(unix_timestamp(to_timestamp("2020-01-01 00:00:00.000000")) + 2.6, "yyyy-MM-dd HH:mm:ss.SSSSSSS") 
# 

### Observation

second 단위 이하로는 표현이 안되는 것 같음. 우선은 초 단위로 중복이 없도록 요약을 해야 하겠음.

In [66]:
import pandas as pd
import numpy as np
sql_statement = '''
SELECT 'rw3' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime FROM battery_orig_rw3
WHERE from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) BETWEEN to_timestamp('2020-02-04 04:49:06') AND to_timestamp('2020-02-04 04:49:07')
'''
df_data = spark.sql(sql_statement)
display(df_data)

### Remove Duplicate

In [98]:
import pandas as pd
import numpy as np
sql_statement = '''

SELECT unit, datetime, MIN(cycle) min_cycle, AVG(voltage) avg_voltage, AVG(current) avg_current, AVG(temperature) avg_temperature, AVG(capacity) avg_capacity
FROM (
    SELECT 'rw3' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime
    FROM battery_orig_rw3
    /* WHERE from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) BETWEEN to_timestamp('2020-02-04 04:49:06') AND to_timestamp('2020-02-04 04:49:07') */
) a
GROUP BY unit, datetime
ORDER BY 1, 2
'''
df_data = spark.sql(sql_statement)
display(df_data)

In [100]:
df1=df_data.groupBy("unit", "datetime").count().filter("count > 1")
df1.show()

## Save transformed data


In [104]:
import pandas as pd
import numpy as np
sql_statement = '''
SELECT unit, datetime, MIN(cycle) min_cycle, AVG(voltage) avg_voltage, AVG(current) avg_current, AVG(temperature) avg_temperature, AVG(capacity) avg_capacity
FROM (
    SELECT 'rw3' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime
    FROM battery_orig_rw3
) a
GROUP BY unit, datetime

UNION ALL

SELECT unit, datetime, MIN(cycle) min_cycle, AVG(voltage) avg_voltage, AVG(current) avg_current, AVG(temperature) avg_temperature, AVG(capacity) avg_capacity
FROM (
    SELECT 'rw4' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime
    FROM battery_orig_rw4
) a
GROUP BY unit, datetime

UNION ALL

SELECT unit, datetime, MIN(cycle) min_cycle, AVG(voltage) avg_voltage, AVG(current) avg_current, AVG(temperature) avg_temperature, AVG(capacity) avg_capacity
FROM (
    SELECT 'rw5' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime
    FROM battery_orig_rw5
) a
GROUP BY unit, datetime

UNION ALL

SELECT unit, datetime, MIN(cycle) min_cycle, AVG(voltage) avg_voltage, AVG(current) avg_current, AVG(temperature) avg_temperature, AVG(capacity) avg_capacity
FROM (
    SELECT 'rw6' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime
    FROM battery_orig_rw6
) a
GROUP BY unit, datetime
ORDER BY 1, 2
'''
df_data = spark.sql(sql_statement)
df_data.write.format("parquet").mode("overwrite").save(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/battery/transformed/dataset_all")


In [101]:
import pandas as pd
import numpy as np
sql_statement = '''
SELECT unit, datetime, MIN(cycle) min_cycle, AVG(voltage) avg_voltage, AVG(current) avg_current, AVG(temperature) avg_temperature, AVG(capacity) avg_capacity
FROM (
    SELECT 'rw3' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime
    FROM battery_orig_rw3
) a
GROUP BY unit, datetime
ORDER BY 1, 2
'''
df_data = spark.sql(sql_statement).write.format("parquet").mode("overwrite").save(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/battery/transformed/dataset_rw3")

sql_statement = '''
SELECT unit, datetime, MIN(cycle) min_cycle, AVG(voltage) avg_voltage, AVG(current) avg_current, AVG(temperature) avg_temperature, AVG(capacity) avg_capacity
FROM (
    SELECT 'rw4' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime
    FROM battery_orig_rw4
) a
GROUP BY unit, datetime
ORDER BY 1, 2
'''
df_data = spark.sql(sql_statement).write.format("parquet").mode("overwrite").save(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/battery/transformed/dataset_rw4")

sql_statement = '''
SELECT unit, datetime, MIN(cycle) min_cycle, AVG(voltage) avg_voltage, AVG(current) avg_current, AVG(temperature) avg_temperature, AVG(capacity) avg_capacity
FROM (
    SELECT 'rw5' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime
    FROM battery_orig_rw5
) a
GROUP BY unit, datetime
ORDER BY 1, 2
'''
df_data = spark.sql(sql_statement).write.format("parquet").mode("overwrite").save(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/battery/transformed/dataset_rw5")

sql_statement = '''
SELECT unit, datetime, MIN(cycle) min_cycle, AVG(voltage) avg_voltage, AVG(current) avg_current, AVG(temperature) avg_temperature, AVG(capacity) avg_capacity
FROM (
    SELECT 'rw6' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime
    FROM battery_orig_rw6
) a
GROUP BY unit, datetime
ORDER BY 1, 2
'''
df_data = spark.sql(sql_statement).write.format("parquet").mode("overwrite").save(f"abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/battery/transformed/dataset_rw6")


In [97]:
import pandas as pd
import numpy as np
sql_statement = '''
SELECT unit, datetime, MIN(cycle) min_cycle, AVG(voltage) avg_voltage, AVG(current) avg_current, AVG(temperature) avg_temperature, AVG(capacity) avg_capacity
FROM (
    SELECT 'rw3' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime
    FROM battery_orig_rw3
) a
GROUP BY unit, datetime
ORDER BY 1, 2
'''
df_data = spark.sql(sql_statement)
display(df_data)

## 마지막 datetime 확인

In [None]:
import pandas as pd
import numpy as np
sql_statement = '''
SELECT unit, MIN(datetime), MAX(datetime)
FROM (

    SELECT unit, datetime, MIN(cycle) min_cycle, AVG(voltage) avg_voltage, AVG(current) avg_current, AVG(temperature) avg_temperature, AVG(capacity) avg_capacity
    FROM (
        SELECT 'rw3' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime
        FROM battery_orig_rw3
    ) a
    GROUP BY unit, datetime
) a
GROUP BY unit
ORDER BY 1
'''
df_data = spark.sql(sql_statement)
display(df_data)

In [None]:
import pandas as pd
import numpy as np
sql_statement = '''
SELECT unit, datetime, MIN(cycle) min_cycle, AVG(voltage) avg_voltage, AVG(current) avg_current, AVG(temperature) avg_temperature, AVG(capacity) avg_capacity
FROM (
    SELECT 'rw3' unit, _c0 cycle, time, voltage, current, temperature, capacity, from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) datetime
    FROM battery_orig_rw3

    WHERE from_unixtime(unix_timestamp(to_date("2020-01-01")) + time) between '2020-05-31 02:00:00' and '2020-05-31 02:05:00'
) a
GROUP BY unit, datetime

'''
df_data = spark.sql(sql_statement)
df_data.write.mode("overwrite").saveAsTable("sample_for_test")
display(df_data)


처음에 test 용 데이터를 별도로 떼두는 것이 맞음. 지금은 임시로 마지막 구간에서 일부 데이터 가져와서 시간을 이후로 조정해서 테스트에 활용

## Tips

- check missing values (10 sec based)

- play with rolloing windows of 10 sec

- add a calculated column, difference_capaciy
  - try to predict the above value
  - accumulate prediction

