In [1]:
import torch
import ltn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader, random_split

import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, f1_score
from sklearn.model_selection import train_test_split

KeyboardInterrupt: 

## Data Preparation

In [None]:
s_data = pd.read_csv('src\data\Stud_E-mobility_data_staticLimit.csv')

In [None]:
s_data = s_data[['_time','GARAGE_EXTERNAL_POWER', 'DEMAND_LIMIT',
      #  'DEMAND_LIMIT_INDICATOR', 
       'BATTERY_SOC', 'BATTERY_DISCHARGE_POWER',
       'BATTERY_CHARGED_ENERGY', 'BATTERY_DISCHARGED_ENERGY', 'PV_POWER',
       'PV_ENERGY'
    ]]

In [None]:
def label_charging(row):
    if row["BATTERY_SOC"] > 80:
        return "Fully Covered by Local Battery"
    elif 40 <= row["BATTERY_SOC"] < 80:
        if row["GARAGE_EXTERNAL_POWER"] > row["DEMAND_LIMIT"]:
            return "Partially Covered by Local Battery"
        else:
            return "Battery Charged from Grid"
    elif 15 <= row["BATTERY_SOC"] <= 40:
        if row["GARAGE_EXTERNAL_POWER"] > row["DEMAND_LIMIT"]:
            return "Partially Covered by Local Battery"
        else:
            return "Battery Charged from Grid"
    elif row["BATTERY_SOC"] < 15:
        return "Battery Discharge Stopped due to Battery Health"
    else:
        print(row["BATTERY_SOC"])
        print(row["GARAGE_EXTERNAL_POWER"])
        return "Unknown"

# Apply the labeling function to create the new column "DRAWN_FROM"
s_data["DRAWN_FROM"] = s_data.apply(label_charging, axis=1)

In [None]:
features = s_data.drop(['_time','DRAWN_FROM', 'BATTERY_DISCHARGE_POWER', 'BATTERY_CHARGED_ENERGY',  'BATTERY_DISCHARGED_ENERGY', 'GARAGE_EXTERNAL_POWER'], axis=1)
target = s_data['DRAWN_FROM']

## LTN Functions

In [None]:

class Dataset:

  def __init__(self, samples, labels, batch_size = 32):

    self.samples = samples
    self.labels = labels

    self.batch_size = batch_size

    self.length = int(np.ceil(samples.shape[0]/batch_size))

    self.indices = np.arange(samples.shape[0]) 

  def __getitem__(self, i):

    i0 = i*self.batch_size
    i1 = min((i + 1)*self.batch_size, self.samples.shape[0])

    index = self.indices[i0:i1]

    return self.samples[index], self.labels[index]

  def __len__(self):
    return self.length

  def shuffle(self):
    self.indices = np.random.permutation(self.indices)

In [None]:

class SubNetworkTF(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3):
        super().__init__()
        ks = (kernel_size, kernel_size)
        self.f = nn.Sequential(
            # Adjust the number of input and output channels
            nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=ks, stride=1, padding=0),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.ReLU(),
        )

    def forward(self, x):
        return self.f(x)

class NetworkTF(nn.Module):
    def __init__(self):
        super().__init__()

        self.f = nn.Sequential(
            # The first block takes 1 input channel and produces 16 output channels
            SubNetworkTF(in_channels=1, out_channels=16),
            # The second block takes 16 input channels and produces 64 output channels
            SubNetworkTF(in_channels=16, out_channels=64),

            # Add a convolution layer with kernel size of 4 and 10 output channels
            nn.Conv2d(in_channels=64, out_channels=10, kernel_size=(4, 4), stride=1, padding=0),
            
            # Flatten the output of the last convolution layer
            nn.Flatten(),
        )

    def forward(self, x):
        return self.f(x)



In [None]:

def fit(model, number_of_epochs, train_data, train_labels, val_data, val_labels):
    # Define the CrossEntropyLoss and SGD optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)  

    # Lists to store training and validation losses
    training_losses = []
    validation_losses = []

    best_model = None
    best_val_loss = float('inf')  # Initialize with a large value

    for epoch in range(number_of_epochs):
        # Set the model to training mode
        model.train()

        # Forward pass
        train_outputs = model(train_data)
        train_loss = criterion(train_outputs, train_labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        train_loss.backward()
        optimizer.step()

        # Set the model to evaluation mode
        model.eval()

        # Forward pass for validation
        with torch.no_grad():
            val_outputs = model(val_data)
            val_loss = criterion(val_outputs, val_labels)

        # Save training and validation losses
        training_losses.append(train_loss.item())
        validation_losses.append(val_loss.item())

        # Update best model if current validation loss is lower
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model = model

        print(f'Epoch [{epoch + 1}/{number_of_epochs}], '
              f'Training Loss: {train_loss.item():.4f}, '
              f'Validation Loss: {val_loss.item():.4f}')

    return best_model, training_losses, validation_losses

In [None]:
# Define a function to convert labels to one-hot encoding
def one_hot_encode(labels, num_classes):
    return F.one_hot(labels, num_classes=num_classes)

def preprocess_data(samples, labels):
    print(labels)
    labels = torch.Tensor(labels)  # Convert labels to PyTorch Tensor
    labels_one_hot = one_hot_encode(labels.long(), num_classes=3)  # Assuming 3 classes
    return torch.Tensor(samples.values), labels_one_hot  # Convert DataFrame to numpy array before converting to Tensor

In [None]:
# split the data into training and validation sets
train_samples, val_samples, train_labels, val_labels = train_test_split(features, target, test_size=0.2, random_state=42)

In [None]:
from sklearn.preprocessing import LabelEncoder

# Instantiate the encoder
encoder = LabelEncoder()

# Fit and transform the labels
train_labels = encoder.fit_transform(train_labels)
val_labels = encoder.transform(val_labels)

# Now you can preprocess the data
train_samples, train_labels = preprocess_data(train_samples, train_labels)
val_samples, val_labels = preprocess_data(val_samples, val_labels)

In [None]:
train_dataset = TensorDataset(train_samples, train_labels)
val_dataset = TensorDataset(val_samples, val_labels)

In [None]:
tf_model = NetworkTF()
x,y = train_dataset[0]
vx,vy = val_dataset[0]
# y = y.argmax(dim=1)
# vy = vy.argmax(dim=1)


In [None]:
# num_epochs = 10
# best_pytorch_model, pytorch_train_losses, pytorch_val_losses = fit(tf_model, num_epochs, x, y, vx, vy)


# Logic Tensor Networks
