In [None]:
import numpy as np
import pandas as pd

from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.pipeline import make_pipeline
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from sklearn.preprocessing import StandardScaler

In [None]:
df_ndvi = pd.read_csv("data/train/NDVI.csv", encoding="windows-1251").drop(columns=["index"])
df_nir = pd.read_csv("data/train/B8A.csv", encoding="windows-1251").drop(columns=["index", "culture"]).add_suffix("_nir")
df_swir = pd.read_csv("data/train/B12.csv", encoding="windows-1251").drop(columns=["index", "culture"]).add_suffix("_swir")
df_red = pd.read_csv("data/train/B04.csv", encoding="windows-1251").drop(columns=["index", "culture"]).add_suffix("_red")
df_VegRedEdge = pd.read_csv("data/train/B05.csv", encoding="windows-1251").drop(columns=["index", "culture"]).add_suffix("_vegRedEdge")
df_blue = pd.read_csv("data/train/B02.csv", encoding="windows-1251").drop(columns=["index", "culture"]).add_suffix("_blue")
df_green = pd.read_csv("data/train/B03.csv", encoding="windows-1251").drop(columns=["index", "culture"]).add_suffix("_green")

labels = df_ndvi["culture"]
df_ndvi.drop(columns=["culture"], inplace=True)

data = pd.concat([df_ndvi, df_nir, df_swir, df_red, df_VegRedEdge, df_blue, df_green], axis=1)
data["121"].isna().sum()

In [None]:
class DataImputer(BaseEstimator, TransformerMixin):
    def __init__(self, impute_type: str = "mean", window_size: int = 5):
        self.impute_type = impute_type
        self.window_size = window_size + 1

    def fit(self, X: pd.DataFrame, y=None):
        return self

    def transform(self, X: pd.DataFrame, y=None) -> pd.DataFrame:
        data: pd.DataFrame = X.copy()
        if self.impute_type == "mean":
            for column in data.columns:
                data[column] = data[column].fillna(data[column].mean())
            return data

        if self.impute_type == "rolling_mean":
            for column in data.columns:
                data[column] = data[column].fillna(data[column].rolling(window=self.window_size, min_periods=1).mean())
            return data

        if self.impute_type == "ffil":
            return data.ffill()


In [None]:
class VegetationIndexAdder(BaseEstimator, TransformerMixin):
    def __init__(self, ndwi: bool = True, arvi: bool = True, sawi: bool = True, gemi: bool = True, ndre: bool = True, gndwi: bool = True, evi: bool = True, msavi: bool = True) -> None:
        super().__init__()

        self.N_DAYS = 26
        self.DAYS = ['121', '128', '135', '142', '149', '156', '163', '170', '177', '184', '191', '198', '205', 
                                '212', '219', '226', '233', '240', '247', '254', '261', '268', '275', '282', '289', '296']
        
        self.NDVI_START = 0
        self.NIR_START = 26
        self.SWIR_START = 52
        self.RED_START = 78
        self.VEG_REG_EDGE_START = 104
        self.BLUE_START = 130
        self.GREEN_START = 156

        self.ndwi = ndwi
        self.arvi = arvi
        self.sawi = sawi
        self.gemi = gemi
        self.ndre = ndre
        self.gndwi = gndwi
        self.evi = evi
        self.msavi = msavi



    def fit(self, X, y=None):
        return self
    
    def transform(self, X: pd.DataFrame, y=None):
        data = X.copy()

        for day in self.DAYS:
            nir = X[f"{day}_nir"]
            red = X[f"{day}_red"]
            blue = X[f"{day}_blue"]
            swir = X[f"{day}_swir"]
            veg_red_edge = X[f"{day}_vegRedEdge"]
            green = X[f"{day}_green"]

            discriminant = (2 * nir + 1) ** 2 - 8 * (nir * red)
            sqrt_value = np.sqrt(np.maximum(discriminant, 0))
            L = 1 - (2 * nir + 1 - sqrt_value) / 2

            E = (2 * (nir ** 2 - red ** 2) + 1.5 * nir + 0.5 * red) / (nir + red + 0.5)
            Rb = red - (red - blue)
            msavi_expression = (2 * nir + 1) ** 2 - 8 * (nir - red)

            if self.ndwi:
                data[f"{day}_ndwi"] = (nir - swir) / (nir + swir)
            if self.arvi:
                data[f"{day}_arvi"] = (nir - Rb) / (nir + Rb)
            if self.sawi:
                data[f"{day}_sawi"] = (nir - red) / (nir + red - L) * (1 + L)
            if self.gemi:
                data[f"{day}_gemi"] = E * (1 - 0.25 * E) - ((red - 0.125) / (1 - red))
            if self.ndre:
                data[f"{day}_ndre"] = (nir - veg_red_edge) / (nir + veg_red_edge)
            if self.gndwi:
                data[f"{day}_gndwi"] = (nir - green) / (nir + green)
            if self.evi:
                data[f"{day}_evi"] = 2.5 * (nir - red) / (nir + 6 * red - 7.5 * blue + 1)
            if self.msavi:
                data[f"{day}_msavi"] = (2 * nir + 1 - np.sqrt(np.abs(msavi_expression)))


            data = data.copy()
            # data.drop(columns=[f"{day}_nir", f"{day}_red", f"{day}_blue", f"{day}_swir", f"{day}_vegRedEdge", f"{day}_green"], inplace=True)


        return data


In [None]:
pipe = make_pipeline(
    DataImputer(impute_type="mean"),
    VegetationIndexAdder(),
    HistGradientBoostingClassifier()
)

x_train, x_test, y_train, y_test = train_test_split(data, labels, test_size=0.3, random_state=0)

pipe.fit(x_train, y_train)

In [None]:
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV

param_grid = {
    "max_iter": [100, 200, 300],
    "l2_regularization": [0, 0.3, 0.5, 0.7, 1]
}

grid_search = GridSearchCV(HistGradientBoostingClassifier(), param_grid=param_grid, cv=5, n_jobs=-1, scoring="f1", verbose=4)
grid_search.fit(x_train, y_train)
print(grid_search.best_score_)
grid_search.best_params_

In [None]:
predictions = pipe.predict(x_test)
print(classification_report(y_test, predictions, digits=5))

In [None]:
import pickle
import dill
import joblib

info = {
    "model": pipe,
    "column_suffixes": {
        "ndvi": "",
        "B8A": "_nir",
        "B12": "_swir",
        "B04": "_red",
        "B05": "_vegRedEdge",
        "B02": "_blue",
        "B03": "_green",
    },
    "f1_score": 0.989
}

with open("models/pickle_model.pkl", "wb") as f:
    pickle.dump(info, f)

with open("models/dill_model.pkl", "wb") as f:
    dill.dump(info, f)

joblib.dump(info, "models/joblim_model.pkl")

In [None]:
with open("models/pickle_model.pkl", "rb") as f:
    unpickle_info = pickle.load(f)

unpickle_model = unpickle_info["model"]

print(unpickle_info["column_suffixes"])
unpickle_model

In [None]:
open_df_ndvi = pd.read_csv("data/test/test_public/NDVI.csv", sep=";", encoding="windows-1251").drop(columns=["index"])
open_df_nir = pd.read_csv("data/test/test_public/B8A.csv", sep=";", encoding="windows-1251").drop(columns=["index"]).add_suffix("_nir")
open_df_swir = pd.read_csv("data/test/test_public/B12.csv", sep=";", encoding="windows-1251").drop(columns=["index"]).add_suffix("_swir")
open_df_red = pd.read_csv("data/test/test_public/B04.csv", sep=";", encoding="windows-1251").drop(columns=["index"]).add_suffix("_red")
open_df_VegRedEdge = pd.read_csv("data/test/test_public/B05.csv", sep=";", encoding="windows-1251").drop(columns=["index"]).add_suffix("_vegRedEdge")
open_df_blue = pd.read_csv("data/test/test_public/B02.csv", sep=";", encoding="windows-1251").drop(columns=["index"]).add_suffix("_blue")
open_df_green = pd.read_csv("data/test/test_public/B03.csv", sep=";", encoding="windows-1251").drop(columns=["index"]).add_suffix("_green")

open_data = pd.concat([open_df_ndvi, open_df_nir, open_df_swir, open_df_red, open_df_VegRedEdge, open_df_blue, open_df_green], axis=1)

In [None]:
import csv
predictions: list[str] = unpickle_model.predict(open_data).tolist()

with open("answers/classification_openset.csv", mode="w", newline="", encoding="windows-1251") as file:
    writer = csv.writer(file)

    writer.writerow(["culture"])
    for item in predictions:
        writer.writerow([item])

In [9]:
import csv
predictions: list[str] = unpickle_model.predict(open_data).tolist()

with open("answers/classification_openset.csv", mode="w", newline="", encoding="windows-1251") as file:
    writer = csv.writer(file)

    writer.writerow(["culture"])
    for item in predictions:
        writer.writerow([item])

In [10]:
close_df_ndvi = pd.read_csv("data/test/test_closed/NDVI.csv", sep=";", encoding="windows-1251").drop(columns=["index"])
close_df_nir = pd.read_csv("data/test/test_closed/B8A.csv", sep=";", encoding="windows-1251").drop(columns=["index"]).add_suffix("_nir")
close_df_swir = pd.read_csv("data/test/test_closed/B12.csv", sep=";", encoding="windows-1251").drop(columns=["index"]).add_suffix("_swir")
close_df_red = pd.read_csv("data/test/test_closed/B04.csv", sep=";", encoding="windows-1251").drop(columns=["index"]).add_suffix("_red")
close_df_VegRedEdge = pd.read_csv("data/test/test_closed/B05.csv", sep=";", encoding="windows-1251").drop(columns=["index"]).add_suffix("_vegRedEdge")
close_df_blue = pd.read_csv("data/test/test_closed/B02.csv", sep=";", encoding="windows-1251").drop(columns=["index"]).add_suffix("_blue")
close_df_green = pd.read_csv("data/test/test_closed/B03.csv", sep=";", encoding="windows-1251").drop(columns=["index"]).add_suffix("_green")

close_data = pd.concat([close_df_ndvi, close_df_nir, close_df_swir, close_df_red, close_df_VegRedEdge, close_df_blue, close_df_green], axis=1)


In [11]:
predictions: list[str] = unpickle_model.predict(close_data).tolist()

with open("answers/classification_closedset.csv", mode="w", newline="", encoding="windows-1251") as file:
    writer = csv.writer(file)

    writer.writerow(["culture"])
    for item in predictions:
        writer.writerow([item])