In [1]:
import pandas as pd

In [2]:
data_csv = [
    '../narrativewave/data/narrativewave_WTG_20201004_wtg10min_transform.csv',
    '../narrativewave/data/narrativewave_WTG_20201005_wtg10min_transform.csv',
    '../narrativewave/data/narrativewave_WTG_20201006_wtg10min_transform.csv',
    '../narrativewave/data/narrativewave_WTG_20201007_wtg10min_transform.csv',
    '../narrativewave/data/narrativewave_WTG_20201008_wtg10min_transform.csv',
    '../narrativewave/data/narrativewave_WTG_20201009_wtg10min_transform.csv',
    '../narrativewave/data/narrativewave_WTG_20201010_wtg10min_transform.csv'
]

In [4]:
df = pd.concat((pd.read_csv(f) for f in data_csv))

## Data Structure - Start

In [5]:
df.head()

Unnamed: 0,timestamp,tag,value
0,2020-10-03 05:10:00+00:00,/narrativewave/WTG01/TimeStamp,2020-10-03 05:10:00+00:00
1,2020-10-03 05:10:00+00:00,/narrativewave/WTG11/TimeStamp,2020-10-03 05:10:00+00:00
2,2020-10-03 05:10:00+00:00,/narrativewave/WTG10/TimeStamp,2020-10-03 05:10:00+00:00
3,2020-10-03 05:10:00+00:00,/narrativewave/WTG08/TimeStamp,2020-10-03 05:10:00+00:00
4,2020-10-03 05:10:00+00:00,/narrativewave/WTG07/TimeStamp,2020-10-03 05:10:00+00:00


In [5]:
df.describe()

Unnamed: 0,timestamp,tag,value
count,1061613,1061613,990941
unique,1010,1071,193600
top,2020-10-06 17:20:00+00:00,/narrativewave/WTG09/Gear_Bear_Temp_Avg,0
freq,1071,1010,108410


## Break tag "asset" and "column" into seperate columns

In [6]:
df['asset'] = df['tag'].apply(lambda x: x.split('/')[2])

In [7]:
df['column'] = df['tag'].apply(lambda x: x.split('/')[3].lower())

In [8]:
df.head()

Unnamed: 0,timestamp,tag,value,asset,column
0,2020-10-03 05:10:00+00:00,/narrativewave/WTG01/TimeStamp,2020-10-03 05:10:00+00:00,WTG01,timestamp
1,2020-10-03 05:10:00+00:00,/narrativewave/WTG11/TimeStamp,2020-10-03 05:10:00+00:00,WTG11,timestamp
2,2020-10-03 05:10:00+00:00,/narrativewave/WTG10/TimeStamp,2020-10-03 05:10:00+00:00,WTG10,timestamp
3,2020-10-03 05:10:00+00:00,/narrativewave/WTG08/TimeStamp,2020-10-03 05:10:00+00:00,WTG08,timestamp
4,2020-10-03 05:10:00+00:00,/narrativewave/WTG07/TimeStamp,2020-10-03 05:10:00+00:00,WTG07,timestamp


## Convert Timestamp to datetime

In [9]:
df['timestamp'] = pd.to_datetime(df['timestamp'])

## Delete duplicate timestamp column

In [10]:
df.drop(df[df.column == 'timestamp'].index, inplace=True)

## Make Wide

In [11]:
# Create pivot table
df_wide = df.pivot(index=['asset', 'timestamp'], values=['value'], columns="column")

In [12]:
# Rename columns
df_wide.columns = df_wide.columns.get_level_values(1)

In [13]:
# Reset index
df_wide.reset_index(inplace=True)

In [14]:
df_wide.tail()

column,asset,timestamp,amb_temp_avg,amb_winddir_abs_avg,amb_winddir_relative_avg,amb_windspeed_avg,amb_windspeed_std,blds_pitchangle_avg,cont_hub_temp_avg,cont_top_temp_avg,...,hyd_oil_temp_avg,nac_direction_avg,nac_direction_avg_proxy_neighbor_mean,nac_temp_avg,predictedpowertree,spin_temp_avg,sys_logs_firstactalarmno,sys_logs_firstactalarmpar1,sys_logs_firstactalarmpar2,sys_stats_trbstat
16846,WTG17,2020-10-10 04:40:00+00:00,13,192.8999939,0.0,8.899999619,1.399999976,-2.400000095,30,31,...,44,192.8999939,194.3999939,29,2120.640869,24,0,0,0,1
16847,WTG17,2020-10-10 04:50:00+00:00,13,189.1000061,-0.100000001,8.300000191,1.100000024,-3.099999905,30,31,...,45,189.1999969,191.0666656,30,1790.206421,24,0,0,0,1
16848,WTG17,2020-10-10 05:00:00+00:00,13,188.6000061,-1.299999952,7.900000095,1.200000048,-2.900000095,30,31,...,44,189.8000031,191.1666718,30,1575.293579,24,0,0,0,1
16849,WTG17,2020-10-10 05:10:00+00:00,13,189.1000061,0.200000003,8.5,1.200000048,-3.0,30,31,...,44,189.0,191.2333374,30,2009.526733,24,0,0,0,1
16850,WTG17,2020-10-10 05:20:00+00:00,13,191.1000061,0.400000006,8.899999619,1.399999976,-3.0,30,31,...,43,190.6999969,193.1333313,30,2120.640869,24,0,0,0,1


## Add year and month columns - for partition

In [15]:
# Add year and month columns for partition
df_wide["year"] = df_wide["timestamp"].apply(lambda x: x.year)

In [16]:
df_wide["month"] = df_wide["timestamp"].apply(lambda x: x.month)

In [18]:
df_wide.head()

column,asset,timestamp,amb_temp_avg,amb_winddir_abs_avg,amb_winddir_relative_avg,amb_windspeed_avg,amb_windspeed_std,blds_pitchangle_avg,cont_hub_temp_avg,cont_top_temp_avg,...,nac_direction_avg_proxy_neighbor_mean,nac_temp_avg,predictedpowertree,spin_temp_avg,sys_logs_firstactalarmno,sys_logs_firstactalarmpar1,sys_logs_firstactalarmpar2,sys_stats_trbstat,year,month
0,WTG01,2020-10-03 05:10:00+00:00,9,309.6000061,2.900000095,,0.300000012,-2.799999952,27,27,...,285.7333374,21,784.7553101,22,0,0,0,3,2020,10
1,WTG01,2020-10-03 05:20:00+00:00,9,315.2999878,5.0,,0.400000006,24.20000076,27,27,...,292.2000122,22,289.7566833,22,309,6,0,4,2020,10
2,WTG01,2020-10-03 05:30:00+00:00,9,318.7999878,-0.600000024,,0.300000012,78.0,27,28,...,292.2000122,23,-16.01966286,20,309,6,0,4,2020,10
3,WTG01,2020-10-03 05:40:00+00:00,9,315.8999939,-3.5,,0.300000012,78.0,27,28,...,293.4333496,24,-16.01966286,20,309,6,0,4,2020,10
4,WTG01,2020-10-03 05:50:00+00:00,9,314.1000061,-1.899999976,,0.400000006,78.0,27,28,...,297.9333191,25,-16.01966286,20,309,6,0,4,2020,10


In [28]:
df_wide.head().to_dict(orient="records")

[{'asset': 'WTG01',
  'timestamp': Timestamp('2020-10-03 05:10:00+0000', tz='UTC'),
  'amb_temp_avg': '9',
  'amb_winddir_abs_avg': '309.6000061',
  'amb_winddir_relative_avg': '2.900000095',
  'amb_windspeed_avg': nan,
  'amb_windspeed_std': '0.300000012',
  'blds_pitchangle_avg': '-2.799999952',
  'cont_hub_temp_avg': '27',
  'cont_top_temp_avg': '27',
  'cont_vcp_temp_avg': '42',
  'gear_bear_temp_avg': '67',
  'gear_bear_temp_avg_prediction': '63.71371841',
  'gear_bear_tempb_avg': '61',
  'gear_bear_tempb_avg_prediction': '56.20113373',
  'gear_bear_tempc_avg': '68',
  'gear_bear_tempc_avg_prediction': '63.09113312',
  'gear_filter_afterinlinepressureoil_avg': '2.099999905',
  'gear_filter_beforeinlinepressureoil_avg': '2.200000048',
  'gear_mainbear_tempnre_avg_prediction': '0.001664484',
  'gear_mainbear_tempre_avg': '36',
  'gear_mainbear_tempre_avg_prediction': '37.17858887',
  'gear_oil_tempinlet_avg': '54',
  'gen_bear2_temp_avg': '37',
  'gen_bear2_temp_avg_prediction': '37

In [37]:
df_wide.loc[df_wide['timestamp'] <= '4-Oct-20']

column,asset,timestamp,amb_temp_avg,amb_winddir_abs_avg,amb_winddir_relative_avg,amb_windspeed_avg,amb_windspeed_std,blds_pitchangle_avg,cont_hub_temp_avg,cont_top_temp_avg,...,nac_direction_avg_proxy_neighbor_mean,nac_temp_avg,predictedpowertree,spin_temp_avg,sys_logs_firstactalarmno,sys_logs_firstactalarmpar1,sys_logs_firstactalarmpar2,sys_stats_trbstat,year,month
0,WTG01,2020-10-03 05:10:00+00:00,9,309.6000061,2.900000095,,0.300000012,-2.799999952,27,27,...,285.7333374,21,784.7553101,22,0,0,0,3,2020,10
1,WTG01,2020-10-03 05:20:00+00:00,9,315.2999878,5,,0.400000006,24.20000076,27,27,...,292.2000122,22,289.7566833,22,309,6,0,4,2020,10
2,WTG01,2020-10-03 05:30:00+00:00,9,318.7999878,-0.600000024,,0.300000012,78,27,28,...,292.2000122,23,-16.01966286,20,309,6,0,4,2020,10
3,WTG01,2020-10-03 05:40:00+00:00,9,315.8999939,-3.5,,0.300000012,78,27,28,...,293.4333496,24,-16.01966286,20,309,6,0,4,2020,10
4,WTG01,2020-10-03 05:50:00+00:00,9,314.1000061,-1.899999976,,0.400000006,78,27,28,...,297.9333191,25,-16.01966286,20,309,6,0,4,2020,10
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
15954,WTG17,2020-10-03 23:20:00+00:00,12,340.2000122,0.300000012,4.599999905,0.200000003,78,28,31,...,342.8333435,25,-16.01966286,22,309,6,0,4,2020,10
15955,WTG17,2020-10-03 23:30:00+00:00,12,343.5,4.900000095,4.699999809,0.200000003,78,28,28,...,342.8333435,25,-16.01966286,22,309,6,0,4,2020,10
15956,WTG17,2020-10-03 23:40:00+00:00,12,345.1000061,-0.899999976,4.599999905,0.200000003,78,28,24,...,342.8333435,27,-16.01966286,22,309,6,0,4,2020,10
15957,WTG17,2020-10-03 23:50:00+00:00,12,346.5,-0.899999976,4.300000191,0.200000003,78,27,24,...,342.8333435,27,-16.01966286,22,309,6,0,4,2020,10


# Spark

In [19]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [22]:
sc = spark.sparkContext

In [23]:
sdf = spark.createDataFrame(df_wide)

In [30]:
sdf.take(1)

[Row(asset='WTG01', timestamp=datetime.datetime(2020, 10, 3, 1, 10), amb_temp_avg='9', amb_winddir_abs_avg='309.6000061', amb_winddir_relative_avg='2.900000095', amb_windspeed_avg=None, amb_windspeed_std='0.300000012', blds_pitchangle_avg='-2.799999952', cont_hub_temp_avg='27', cont_top_temp_avg='27', cont_vcp_temp_avg='42', gear_bear_temp_avg='67', gear_bear_temp_avg_prediction='63.71371841', gear_bear_tempb_avg='61', gear_bear_tempb_avg_prediction='56.20113373', gear_bear_tempc_avg='68', gear_bear_tempc_avg_prediction='63.09113312', gear_filter_afterinlinepressureoil_avg='2.099999905', gear_filter_beforeinlinepressureoil_avg='2.200000048', gear_mainbear_tempnre_avg_prediction='0.001664484', gear_mainbear_tempre_avg='36', gear_mainbear_tempre_avg_prediction='37.17858887', gear_oil_tempinlet_avg='54', gen_bear2_temp_avg='37', gen_bear2_temp_avg_prediction='37.10264206', gen_bear_temp_avg='47', gen_bear_temp_avg_prediction='45.04673004', gen_phase1_temp_avg='62', gen_phase1_temp_avg_pre

In [31]:
sdf.write.option("header", True).partitionBy("asset").mode("overwrite").parquet("data/parquet")

## Partition

In [25]:
df_wide.to_parquet('../narrativewave/data/paraquet', partition_cols=['asset', 'year', 'month'])

## Reading Partition

In [26]:
test = pd.read_parquet('../narrativewave/data/paraquett')

In [27]:
test.head(2).to_dict('records')

[{'timestamp': Timestamp('2020-10-03 05:10:00+0000', tz='UTC'),
  'amb_temp_avg': '9',
  'amb_winddir_abs_avg': '309.6000061',
  'amb_winddir_relative_avg': '2.900000095',
  'amb_windspeed_avg': None,
  'amb_windspeed_std': '0.300000012',
  'blds_pitchangle_avg': '-2.799999952',
  'cont_hub_temp_avg': '27',
  'cont_top_temp_avg': '27',
  'cont_vcp_temp_avg': '42',
  'gear_bear_temp_avg': '67',
  'gear_bear_temp_avg_prediction': '63.71371841',
  'gear_bear_tempb_avg': '61',
  'gear_bear_tempb_avg_prediction': '56.20113373',
  'gear_bear_tempc_avg': '68',
  'gear_bear_tempc_avg_prediction': '63.09113312',
  'gear_filter_afterinlinepressureoil_avg': '2.099999905',
  'gear_filter_beforeinlinepressureoil_avg': '2.200000048',
  'gear_mainbear_tempnre_avg_prediction': '0.001664484',
  'gear_mainbear_tempre_avg': '36',
  'gear_mainbear_tempre_avg_prediction': '37.17858887',
  'gear_oil_tempinlet_avg': '54',
  'gen_bear2_temp_avg': '37',
  'gen_bear2_temp_avg_prediction': '37.10264206',
  'gen_