In [None]:
import datetime
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt
from tqdm import tqdm
import seaborn as sns
import pickle
import sys
import time

In [None]:
from tsfresh import extract_features
from tsfresh import select_features
from tsfresh.utilities.dataframe_functions import impute
from tsfresh.feature_extraction import ComprehensiveFCParameters, MinimalFCParameters
from tsfresh.utilities.dataframe_functions import roll_time_series

import dask.dataframe as dd

In [None]:
inv_df = pd.read_csv('all_inverters.csv')
# inv_df.head()

target_codes = [7006, 3511, 7502, 7501, 3504, 6448, 1500, 7704]
alarm_df = pd.read_csv('all_alarms.csv')
alarm_df = alarm_df[alarm_df["Error Code"].isin(target_codes)]
alarm_df = alarm_df[(alarm_df.hod >= 6) & (alarm_df.hod <= 18)]  # original (6,17)
print(alarm_df.shape)
inverters = sorted(alarm_df["Controller Name"].unique().tolist())

In [None]:
with open('inverter-labels.pkl', 'rb') as handle:
    inv_labels = pickle.load(handle)
    
label_df = {'inverters': [], 'positive': [], 'negative': []}
for inv in inv_labels.keys():
    x = inv_labels[inv]
    y = dict(x['label'].value_counts())
    label_df['inverters'].append(inv)
    if 1 in y:
        label_df['positive'].append(y[1])
    else:
        label_df['positive'].append(0)
    label_df['negative'].append(y[0])
label_df = pd.DataFrame(label_df)
label_df

In [None]:
# class ratio
mask = label_df['positive'] > 10
total = label_df[mask][['positive', 'negative']].apply(np.sum, axis=0)
100 * total['positive'] / (total['positive'] + total['negative'])

## Create rolling windows manually and extract features using tsfresh
    - use ray to parallelize operations on each window

In [None]:
import ray
ray.init()

In [None]:
# def create_tsfresh_features(df, colnames, ROLLING_WINDOWS):
#     start_step = 1
#     df["id"] = 1  # tsfresh requirement
#     for col in colnames:
#         for window in ROLLING_WINDOWS:
#             feats = tsfresh_features(df[[col, 'date', 'id']], window_size=window).reset_index(drop=True)
#             df = pd.concat([df, feats], axis=1)
#     return df    


@ray.remote(num_returns=1)
def extract_ts_features_(df, start, end, settings):
    df_ = df[start:end].copy()
    return extract_features(df_, 
                            column_id="id", 
                            column_sort="date",
                            default_fc_parameters=settings,
                            disable_progressbar=True,
                           )


def tsfresh_features(df, window_length, otype='spark'):
    df_ = df.head(window_length)
    settings = ComprehensiveFCParameters()
#     settings = MinimalFCParameters()
    
    tic = time.time()
    # returns a pandas DataFrame
    features = extract_features(df_, 
                                column_id="id", 
                                column_sort="date", 
                                default_fc_parameters=settings,
                                disable_progressbar=True,
                               )
    toc = time.time()
    print(toc - tic)
    num_features = features.shape[1]
    n = df.shape[0]
    print("Number of windows:", n - window_length)
    
    # initialize with Nan values
#     all_features = np.empty((n, num_features))
#     all_features[:] = np.nan
    all_features, dates = [], []
    for ii in tqdm(range(n)):
        if ii < window_length:
            start, end = 0, ii+1
        else:
            start, end = ii - window_length + 1, ii+1
        features = extract_ts_features_.remote(df, start, end, settings)
        
#         if otype == 'dask':
#             # dask
#             sd = dd.from_pandas(df_, npartitions=3)
#             features = extract_features(sd, 
#                                         column_id="id", 
#                                         column_sort="date",
#                                         default_fc_parameters=settings,
#                                         disable_progressbar=True,
#                                        )
#         elif otype == 'spark':
#             df_.rename(columns={'power': 'value'}, inplace=True)
#             df_['kind'] = 'power'
#             df_.to_csv('temp.csv', index=False)
#             df_ = sparkl.read.csv('temp.csv', header=True)
# #             df_.printSchema()
# #             df_.show()
# #             print(df_)
#             df_grouped = df_.groupby(["id", "kind"])
#             features = spark_feature_extraction_on_chunk(df_grouped, 
#                                                          column_id="id", 
#                                                          column_kind="kind",
#                                                          column_sort="date", 
#                                                          column_value="value",
#                                                          default_fc_parameters=MinimalFCParameters())
#             features.printSchema()
#             features.show() # does not work
#             # pivot does not work
# #             features = features.groupby("id").pivot("variable") #.sum("value")
#             print(features)
#             sys.exit("HERE")


#         all_features[ii,:] = features
        all_features.append(features)
#         print(ii, df_.shape, features.shape)
        dates.append(df.iloc[ii]['date'])
    
    new_features = ray.get(all_features)
    return new_features, dates


In [None]:
# windows = [x*12*24 for x in [1, 2, 3, 7, 14, 21, 30]]
windows = [x*12*24 for x in [1]]
TIMESTAMP_COL_NAME = 'date'
data = []
for inverter in inv_labels.keys():
    x = inv_labels[inverter]
    y = dict(x['label'].value_counts())
    if 1 in y and y[1] > 10:
        features = ['IN.GMRX.CHAR.'+inverter+'.Active Power (kW)', 
                'IN.GMRX.CHAR.WS-20 MW.Module Temperature (°C)',
                'IN.GMRX.CHAR.WS-20 MW.POA Irradiance (w/m²)',
                'IN.GMRX.CHAR.WS-5 MW.Module Temperature (°C)',
                'IN.GMRX.CHAR.WS-5 MW.POA Irradiance (w/m²)'
               ]
        columns = ['date'] + features
        inv_df_i = inv_df[columns].copy()
        inv_df_i['date'] = pd.to_datetime(inv_df_i["date"])
        inv_df_i.rename(columns={'IN.GMRX.CHAR.'+inverter+'.Active Power (kW)': 'power',
                                'IN.GMRX.CHAR.WS-20 MW.Module Temperature (°C)': 'temp1',
                                'IN.GMRX.CHAR.WS-20 MW.POA Irradiance (w/m²)': 'rad1',
                                'IN.GMRX.CHAR.WS-5 MW.Module Temperature (°C)': 'temp2',
                                'IN.GMRX.CHAR.WS-5 MW.POA Irradiance (w/m²)': 'rad2'}, inplace=True)
        inv_df_i['hour'] = inv_df_i.date.dt.hour
#         df_try = inv_df_i.head(10000)
        # without tsfresh features - it takes 7 seconds
        # df = inv_df_i[['power']].copy()
        df = inv_df_i[['date', 'power']].copy()
        # df = df.head(10000)
        # df = df.reset_index(drop=False)
        # df.rename(columns={'power': 'value'}, inplace=True)
        df['id'] = 1
        # df['kind'] = 'power'
        print(df.columns)
        tic = time.time()
        extracted_features, dates = tsfresh_features(df, window_length=12*24)
        # extracted_features = np.concatenate(extracted_features, axis=0 )
        extracted_features = pd.concat(extracted_features, axis=0)
        extracted_features['dates'] = dates
        toc = time.time()
        print(toc - tic)

        
        
#         df_ = create_features(inv_df_i, colnames=['power', 'temp1', 'rad1'], ROLLING_WINDOWS=windows)
#         toc = time.time()
        sys.exit(toc-tic)
        df_ = x.merge(df_, on='date', how='left')
        y = df_['label'].value_counts()
        print(inverter, x.shape[0], df_.shape[0], y[1], y[0])
        data.append(df_)
    else:
        continue
data = pd.concat(data, axis=0)
data.shape

In [None]:
import time
from tsfresh.feature_extraction.feature_calculators import (
    abs_energy, absolute_sum_of_changes, agg_autocorrelation, 
    agg_linear_trend, approximate_entropy, ar_coefficient, 
    augmented_dickey_fuller, autocorrelation, benford_correlation, 
    binned_entropy, c3, change_quantiles, cid_ce, count_above, 
    count_above_mean, count_below, count_below_mean, cwt_coefficients, 
    energy_ratio_by_chunks, fft_aggregated, fft_coefficient, first_location_of_maximum, 
    first_location_of_minimum, fourier_entropy, friedrich_coefficients
)

In [None]:
from numba import jit

In [None]:
tsfresh_functions = [
#                      abs_energy, 
#                      absolute_sum_of_changes, 
#                      agg_autocorrelation, 
#                      agg_linear_trend, 
#                      approximate_entropy, 
#                      augmented_dickey_fuller, 
#                      autocorrelation, 
#                      benford_correlation, 
#                      binned_entropy, 
#                      c3, 
#                      change_quantiles, 
#                      cid_ce, 
#                      count_above, count_below, energy_ratio_by_chunks,
#                      ar_coefficient, cwt_coefficients, fft_coefficient, friedrich_coefficients,
#                      count_above_mean, 
#                      count_below_mean, 
#                      fft_aggregated, 
#                      first_location_of_maximum, 
#                      first_location_of_minimum, 
#                      fourier_entropy, 
                    ]

# @jit(nopython=False)
def abs_energy(x):
    return np.dot(x, x)

ts_funs = [abs_energy]
# np_funs = [np.mean, np.std, 'min', 'max', 'median', 'skew', 'kurt', 'sum']
np_funs = [np.mean, np.std, 'min', 'max', 'median']

def rolling_features(df, start_step, window_size, funcs):
#     df = df.reset_index(drop=False)
#     df.rename(columns={'power': 'value'}, inplace=True)
#     df['id'] = 1
#     df['kind'] = 'power'
#     df.to_csv('temp.csv', index=False)
#     df = sparkl.read.csv('temp.csv', header=True)
#     df.printSchema()
#     df.show()
#     sys.exit("Here")
    features = df.shift(start_step).rolling(window_size, min_periods=window_size).agg(funcs)
    features.columns = ["{}_{}{}".format(x[0], x[1], str(int(window_size/12/24))+'d') for x in features.columns]
    return features

def create_features(df, colnames, ROLLING_WINDOWS):
    # Feature engineering
    df["day"] = df[TIMESTAMP_COL_NAME].apply(lambda x: x.day)
    df["dayofweek"] = df[TIMESTAMP_COL_NAME].apply(lambda x: x.dayofweek)
    df["weekofyear"] = df[TIMESTAMP_COL_NAME].apply(lambda x: x.isocalendar()[1])
    df["month"] = df[TIMESTAMP_COL_NAME].apply(lambda x: x.month)
    
    # assume each record is the first day of forecast period so shift rolling calcs back by 1
    start_step = 1 
    for col in colnames:
        for window in ROLLING_WINDOWS:
            feats = rolling_features(df[[col]], 
                                     start_step=1, 
                                     window_size=window, 
                                     funcs=np_funs).reset_index(drop=True)
            df = pd.concat([df, feats], axis=1)
    return df

windows = [x*12*24 for x in [1, 2, 3, 7, 14, 21, 30]]
TIMESTAMP_COL_NAME = 'date'
data = []
for inverter in inv_labels.keys():
    x = inv_labels[inverter]
    y = dict(x['label'].value_counts())
    if True in y and y[True] > 10:
        features = ['IN.GMRX.CHAR.'+inverter+'.Active Power (kW)', 
                'IN.GMRX.CHAR.WS-20 MW.Module Temperature (°C)',
                'IN.GMRX.CHAR.WS-20 MW.POA Irradiance (w/m²)',
                'IN.GMRX.CHAR.WS-5 MW.Module Temperature (°C)',
                'IN.GMRX.CHAR.WS-5 MW.POA Irradiance (w/m²)'
               ]
        columns = ['date'] + features
        inv_df_i = inv_df[columns].copy()
        inv_df_i['date'] = pd.to_datetime(inv_df_i["date"])
        inv_df_i.rename(columns={'IN.GMRX.CHAR.'+inverter+'.Active Power (kW)': 'power',
                                'IN.GMRX.CHAR.WS-20 MW.Module Temperature (°C)': 'temp1',
                                'IN.GMRX.CHAR.WS-20 MW.POA Irradiance (w/m²)': 'rad1',
                                'IN.GMRX.CHAR.WS-5 MW.Module Temperature (°C)': 'temp2',
                                'IN.GMRX.CHAR.WS-5 MW.POA Irradiance (w/m²)': 'rad2'}, inplace=True)
        inv_df_i['hour'] = inv_df_i.date.dt.hour
        tic = time.time()
        df_ = create_features(inv_df_i, colnames=['power', 'temp1', 'rad1'], ROLLING_WINDOWS=windows)
        toc = time.time()
#         print(toc - tic)
#         sys.exit()
        df_ = x.merge(df_, on='date', how='left')
        df_['inverter'] = inverter
        y = df_['label'].value_counts()
        print(inverter, x.shape[0], df_.shape[0], y[1], y[0], toc-tic)
        data.append(df_)
    else:
        continue
data = pd.concat(data, axis=0)
data.shape

In [None]:
import pickle
with open('inverter-data-v02.pkl', 'wb') as handle:
    pickle.dump(data, handle)

In [None]:
sys.getsizeof(data)//1024//1024//1024

In [None]:
df_

In [None]:

df = inv_df_i[['power', 'date']].copy()
df["id"] = 1
df = df.head(80000)
df = df.reset_index(drop=False)

# try dask
# df = dd.from_pandas(df, npartitions=2)

tic = time.time()
window = 12*24
df_rolled = roll_time_series(df, 
                             column_id='id', 
                             column_sort="index", 
                             rolling_direction=1, 
                             min_timeshift = window - 1, 
                             max_timeshift = window - 1,
                             disable_progressbar=False,
                             n_jobs = 5,
                            )

toc = time.time()
print(toc - tic)
df_rolled.drop(columns=['index'], inplace=True)
settings = MinimalFCParameters()
features = extract_features(df_rolled, 
                            column_id="id", 
                            column_sort="date", 
                            default_fc_parameters=settings,
                            disable_progressbar=False,
                           )
tac = time.time()
print(tac - toc)

In [None]:
from pyspark.sql import SparkSession

sparkl = SparkSession.builder.appName('Ops').getOrCreate()

In [None]:
df = inv_df_i[['date', 'power']].copy()
df.rename(columns={'power': 'value'}, inplace=True)
df['id'] = 1
df['kind'] = 'power'
df.to_csv('temp.csv', index=False)
df = sparkl.read.csv('temp.csv', header=True)
df.printSchema()
df.show()

In [None]:
from tsfresh.convenience.bindings import spark_feature_extraction_on_chunk

# from pyspark import SparkContext
# sc = SparkContext("local", "First App")

#Create PySpark SparkSession
# spark = SparkSession.builder \
#     .master("local[1]") \
#     .appName("SparkByExamples.com") \
#     .getOrCreate()
#Create PySpark DataFrame from Pandas
# sparkDF=sparkl.createDataFrame(df) 
df_grouped = df.groupby(["id", "kind"])
features = spark_feature_extraction_on_chunk(df_grouped, column_id="id", column_kind="kind",
                                             column_sort="date", column_value="value",
                                             default_fc_parameters=MinimalFCParameters())
features.printSchema()

In [None]:
# features.show()
# features = features.groupby("id").pivot("variable").sum("value")
features.write.csv("temp_output.csv")

In [None]:
df = inv_df_i[['date', 'power']].copy()
df = df.reset_index(drop=False)
df.rename(columns={'power': 'value'}, inplace=True)
df['id'] = 1
df['kind'] = 'power'
df.to_parquet('temp.pqt', index=False)
df = dd.read_parquet('temp.pqt', header=True)

window = 12*24
df_rolled = roll_time_series(df, 
                             column_id='id', 
                             column_sort="index", 
                             rolling_direction=1, 
                             min_timeshift = window - 1, 
                             max_timeshift = window - 1,
                             disable_progressbar=False,
                             n_jobs = 5,
                            )


# from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk
# df_grouped = df.groupby(["id", "kind"])

# features = dask_feature_extraction_on_chunk(df_grouped,
#                                             column_id="id",
#                                             column_kind="kind",
#                                             column_sort="date",
#                                             column_value="value")
# features = features.categorize(columns=["variable"])
# features = features.reset_index(drop=True) \
#             .pivot_table(index="id", columns="variable", values="value", aggfunc="mean")
# features.to_csv("temp_output.csv")

In [None]:
extracted_features.shape

In [None]:
df_rolled

In [None]:
df_rolled.id.value_counts()

In [None]:
df_rolled[df_rolled.id==(1,288)]

In [None]:
df = inv_df_i[['power', 'date']].copy()
df["id"] = 1
df = df.head(100)
df = df.reset_index(drop=False)

df_rolled = roll_time_series(df, column_id='id', column_sort="index", 
                             rolling_direction=1, 
                             min_timeshift = 9, 
                             max_timeshift = 9)

In [None]:
df_rolled[df_rolled.id==(1,10)]

In [None]:
df_rolled.id.value_counts()

In [None]:
df_rolled.head(20)

In [None]:
# df_rolled.drop(columns=['index'], inplace=True)
settings = MinimalFCParameters()
features = extract_features(df_rolled, 
                            column_id="id", 
                            column_sort="date", 
                            default_fc_parameters=settings,
                            disable_progressbar=False,
                           )

In [None]:
features