# Classification for time series

## Libraries, functions and classes

### Import libraries

In [45]:
import numpy as np
import pandas as pd

# images and plots
import matplotlib.pyplot as plt
from skimage.io import imsave, imread
from osgeo import gdal
from google.colab import files # For download images and fiels

# for the lerning
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans

from sklearn.linear_model import LinearRegression
import statsmodels.api as sms

### Functions for filter

In [46]:
def Show_Image_List(image_list):
  for layer in image_list:
    plt.figure()
    plt.imshow(image_list[layer], 'gray')

In [47]:
def Calculate_Average(array):
    rows, cols = array.shape
    total = 0

    for i in range(rows):
        for j in range(cols):
            total += array[i][j]

    return total / (rows * cols)

In [48]:
def Calculate_Variance(array):
    rows, cols = array.shape
    total = 0
    total_squer = 0

    for i in range(rows):
        for j in range(cols):
            total += array[i][j]
            total_squer += array[i][j] * array[i][j]


    mean_squer = (total / (rows * cols)) * (total / (rows * cols))
    return total_squer / (rows * cols) - mean_squer

### Functions

In [49]:
# Receives an image after classification and a row and column number,
# if the pixel at this point is equal to zero, the function turns the zeros
# into ones and vice versa.
# By default the function takes the middle pixel

def Inverse_The_Classification_If_The_Pixel_Zero(_classify_image, _row = -1, _column = -1):
  if _row and _column < 0:
    num_of_row, num_of_column = _classify_image.shape
    _row = int( num_of_row / 2 )
    _column = int( num_of_column / 2 )

  if _classify_image[_row, _column] == 0:
    return ( _classify_image * (-1) + 1 )

  else:
    return _classify_image

In [50]:
def Count_Specific_Value_In_Image_List(image_list, val = 1):
  count_list = []
  for image in image_list:
    count_list.append(How_Many_Pixels_Are_Specific_Value(image, val))
  return count_list

In [51]:
def How_Many_Pixels_Are_Specific_Value(image, val = 1):
  pixels = 0
  rows, columns = image.shape

  for row in range(rows):
    for colum in range(columns):
      if image[row, colum] == val:
         pixels = pixels + 1

  return pixels

In [52]:
import numpy as np

def Filter(image, function, kernel_size = 3):

  # Creates a new array for the filter image
  rows_num, columns_num = image.shape
  filter_image = np.zeros((rows_num, columns_num))

  # Because this function ignores the edges it not go throw all indexes
  subtract = int(kernel_size / 2 - 0.5)
  for i in range(subtract, rows_num - subtract):
    for j in range(subtract, columns_num - subtract):
      filter_image[i, j] = function(image[i-subtract : i+subtract+1,  j-subtract :j+subtract+1])

  return filter_image

In [53]:
def Split_The_Feature_To_Components(feture):
   components = feture.split("_")

   if len(components) == 4:
      band = components[0] + '_' + components[1]
      func = components[2]
      size = int(components[3])

   if len(components) == 3:
      band = components[0]
      func = components[1]
      size = int(components[2])

   return (band, func, size)

In [54]:
def Converts_The_Function_Name_To_Function(func_name):

  if func_name == 'average':
    return Calculate_Average

  elif func_name == 'variance':
    return Calculate_Variance

  else:
    raise ValueError("Worng function name")

In [55]:
def Filter_By_Fitures(image, fetures_list):

  image_of_selected_features = Multispectral_Image()

  # this for saving the largest kernel size for cutting the edges of the filtered layers
  largest_kernel_size = 3

  for idx, feature in enumerate(fetures_list):

    # get the information to create the new feature
    band, function_name, size = Split_The_Feature_To_Components(feature)
    the_function = Converts_The_Function_Name_To_Function(function_name)

    # get the image of the band
    myImage = image.get_band(band)

    # creates the filtered image
    filter_image = Filter(myImage, function=the_function, kernel_size=size)

    # add the new layer
    new_layer_name = band + '_' + function_name + '_' + str(size)
    image_of_selected_features.add_band(filter_image, new_layer_name)

    # Update the largest kernel
    if  largest_kernel_size < size:
      largest_kernel_size = size

  # return the image cuting according to the largest kernel
  cut = int((largest_kernel_size - 1) / 2)
  image_of_selected_features.delete_rows_cols(cut)
  return image_of_selected_features

In [56]:
import numpy as np

def Convert_Image_Shape_For_Multispectral_Image(image_as_array):
  number_of_bands, height, width = image_as_array.shape
  rotated_image = np.zeros((height, width, number_of_bands))
  for i in range(number_of_bands):
    rotated_image[:, :, i] = image_as_array [i, :, :]
  return rotated_image


'''
This function get a year and try to read tif image with the name of the year.
Because the file name can have an index (for example file name can be "2022(1).tif")
the function try to read the image with meny inexes.
If the read was successful the function return image as Multispectral_Image object and True
If not it returns False and False
'''
from osgeo import gdal

def Read_Year_Image(year, index_max = 5, file_end = 'tif', bands_names = np.array(['blue', 'green', 'red', 'NIR', 'SWIR_1', 'SWIR_2'])):

  # try read the image WITHOUT index

  image_file_name = str(year) + '.' + file_end
  try:
    image_file = gdal.Open(image_file_name)
    multi_layer_image = image_file.ReadAsArray()
    image = Multispectral_Image(Convert_Image_Shape_For_Multispectral_Image(multi_layer_image), bands_names)
    return image, True

  except:
    # try to read the image wite index
    for idx in range(1, index_max):
      image_file_name = str(year) + '(' + str(idx) + ')' + '.' + file_end
      try:
        image_file = gdal.Open(image_file_name)
        multi_layer_image = image_file.ReadAsArray()
        imege = Multispectral_Image(Convert_Image_Shape_For_Multispectral_Image(multi_layer_image), bands_names)
        return imege, True
      except:
        continue

    # if not succeed to read any image
    return False, False

In [57]:
import matplotlib.pyplot as plt

def Display_Images_Side_By_Side(images1, images2, titles=None, image_size=(4, 4)):
    num_images1 = len(images1)
    num_images2 = len(images2)
    if num_images1 != num_images2:
        raise ValueError("Number of images in both lists must be the same.")
    if titles is not None and len(titles) != num_images1:
        raise ValueError("Number of titles must match the number of images.")

    fig, axs = plt.subplots(nrows=num_images1, ncols=2, figsize=(2 * image_size[0], image_size[1] * num_images1))

    for i in range(num_images1):
        axs[i, 0].imshow(images1[i], 'gray')
        axs[i, 0].axis('off')
        if titles is not None:
            axs[i, 0].set_title(titles[i])

        axs[i, 1].imshow(images2[i],  'gray')
        axs[i, 1].axis('off')
        if titles is not None:
            axs[i, 1].set_title(titles[i])

    plt.tight_layout()
    plt.show()


### Multispectral Image

In [58]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

class Multispectral_Image:

    def __init__(self, data=None, band_names=None):
        if (data is None and band_names is not None) or (data is not None and band_names is None):
            raise ValueError("Both image data and band names must be provided or both should be None.")

        self.data = data    # a 3D numpy array with shape (height, width, num_bands)
        self.band_names = band_names    # a 1D numpy array of strings with length num_bands
        self.shape = None if data is None else data.shape[:2]   # a tuple containing the image height and width


    def get_band(self, band_name):
        band_idx = np.where(self.band_names == band_name)[0]
        if len(band_idx) == 0:
            raise ValueError(f"Band {band_name} not found")
        return self.data[:, :, band_idx[0]]


    def get_pixel(self, x, y):
        return self.data[y, x, :]


    def set_pixel(self, x, y, pixel_data):
        self.data[y, x, :] = pixel_data


    def add_band(self, new_band_data, band_name):
        if self.shape is None:
            self.data = new_band_data
            self.band_names = band_name
            self.shape = self.data.shape
        else:
            if new_band_data.shape != (self.shape[0], self.shape[1]):
                raise ValueError("New band data must have the same dimensions as existing image data")
            self.data = np.dstack((self.data, new_band_data))
            self.band_names = np.append(self.band_names, band_name)


    def add_band_from_1d_array(self, new_band_data, band_name):
        if new_band_data.shape[0] != self.shape[0] * self.shape[1]:
            raise ValueError("New band data must have the same number of pixels as existing image data")
        new_band_data = new_band_data.reshape((self.shape[0], self.shape[1]))
        self.add_band(new_band_data, band_name)


    def remove_band(self, band_name):
        band_idx = np.where(self.band_names == band_name)[0]
        if len(band_idx) == 0:
            raise ValueError(f"Band {band_name} not found")

        self.data = np.delete(self.data, band_idx, axis=2)
        self.band_names = np.delete(self.band_names, band_idx)


    def to_dataframe(self):
        df_data = {}
        for i in range(len(self.band_names)):
            df_data[self.band_names[i]] = self.data[:, :, i].ravel()
        return pd.DataFrame(df_data)


    def plot_band(self, band_name):
        band_idx = np.where(self.band_names == band_name)[0]
        if len(band_idx) == 0:
            raise ValueError(f"Band {band_name} not found")
        plt.imshow(self.data[:, :, band_idx[0]], cmap='gray')
        plt.title(band_name)
        plt.show()


    def plot_all_bands(self):
        fig, axs = plt.subplots(nrows=len(self.band_names), figsize=(3, 3 * len(self.band_names)))
        for i, band_name in enumerate(self.band_names):
            axs[i].imshow(self.get_band(band_name), cmap='gray')
            axs[i].set_title(band_name)
            axs[i].axis('off')
        plt.tight_layout()
        plt.show()


    def delete_rows_cols(self, x):
        if x <= 0:
            raise ValueError("Number of rows and columns to delete must be greater than zero")
        new_shape = (self.shape[0] - 2 * x, self.shape[1] - 2 * x)
        new_data = self.data[x:self.shape[0]-x, x:self.shape[1]-x, :]

        self.data = new_data
        self.shape = new_shape


    def crop_layers(self, start_col_idx, end_col_idx, start_row_idx, end_row_idx):
        if start_col_idx < 0 or end_col_idx >= self.shape[1]:
            raise ValueError(f"Invalid column index: {start_col_idx}-{end_col_idx}")
        if start_row_idx < 0 or end_row_idx >= self.shape[0]:
            raise ValueError(f"Invalid row index: {start_row_idx}-{end_row_idx}")
        new_shape = (end_row_idx - start_row_idx + 1, end_col_idx - start_col_idx + 1, self.shape[2])
        new_data = self.data[start_row_idx:end_row_idx+1, start_col_idx:end_col_idx+1, :]

        self.data = new_data
        self.shape = new_shape

## Classification of the images

In [29]:
# Initialize values

# pache name for saving the results
pach_name = 'round southeast to geometry'

# model
model = 'k-means'

# yaers
first_year = 1986
last_year = 2022

# wich layers to create
fetures = ['green_average_3', 'red_average_3', 'NIR_average_3', 'green_average_5',
           'red_average_5', 'NIR_average_7','blue_variance_7',
           'green_variance_7', 'red_variance_7', 'SWIR_1_variance_7']

# croping the images
start_row = 10
end_row = 100
start_colmun = 20
end_colmun = 105

# for duck- 10, 100, 20, 105
# for west to duck- 0, 120, 10, 105

In [59]:
classified_images = []
years_list = []
blue_bands = []

for year in range(first_year, last_year + 1):

  image, is_year_exists = Read_Year_Image(year)

  if is_year_exists == True:

    # filtering by the fitures and convert to DataFrame
    filter_image = Filter_By_Fitures(image, fetures)
    data = filter_image.to_dataframe()

    # feture scaling
    data = StandardScaler().fit_transform(data)

    # predict clssifection and add to "classified_images"
    classfier = KMeans(n_clusters=2, n_init=10).fit(data)
    classified_images.append(Inverse_The_Classification_If_The_Pixel_Zero(classfier.predict(data).reshape(filter_image.shape)))
    years_list.append(year)
    blue_bands.append(image.get_band('blue'))

## Show Results

In [None]:
# display the classified images next to one of the bands

Display_Images_Side_By_Side(classified_images, blue_bands, years_list)

In [None]:
# plot bare sand area in percentage from first year (1986)

bare_sand_pixels = Count_Specific_Value_In_Image_List(classified_images)
bare_sand_percentage = []
for number_of_pixels in bare_sand_pixels:
  bare_sand_percentage.append( number_of_pixels /  bare_sand_pixels[0] * 100)
plt.plot(years_list, bare_sand_percentage, marker = 'o', color = 'purple')
plt.title('Bare sand percentage')
plt.xlabel('Year')
plt.ylabel('Percentage from 1986')

In [None]:
# plot linear regression for percentage of bare sand vs. year

years_array = np.array(years_list).reshape(-1, 1)
bare_sand_percentage_array = np.array(bare_sand_percentage).reshape(-1, 1)

linear_regressor = LinearRegression().fit(years_array, bare_sand_percentage_array)
y_axis_for_fit_line = linear_regressor.predict(years_array)

r_square = linear_regressor.score(years_array, bare_sand_percentage_array)

graph = plt.figure()
ax = plt.axes()
ax.scatter(years_list, bare_sand_percentage, color = 'purple')
ax.plot(years_list, y_axis_for_fit_line.flatten().tolist(), '--')
ax.set_title('Bare Sand Percentage')
ax.set_ylabel('Percentage from 1986')
ax.set_xlabel('Year')

years_array_with_constant = sms.add_constant(years_array)
est = sms.OLS(bare_sand_percentage_array, years_array_with_constant)
est2 = est.fit()
print(est2.summary())

## Save Results

In [None]:
# save the linear regression plot

file_name = pach_name + '-bare sand change over time' + '.png'
graph.savefig(file_name)
files.download(file_name)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [66]:
# save the patches change over time

file_name = pach_name + ' change over time data-kmeans' + '.CSV'

the_classification_data = {'year': years_list,
                           'Bare Sand Percentage from 1986': pd.Series(bare_sand_percentage),
                           'coefficient': pd.Series(linear_regressor.coef_.ravel()),
                           'coefficient p-value': pd.Series(est2.pvalues[0]),
                           'intercept': pd.Series([linear_regressor.intercept_[0]]),
                           'intercept p-value': pd.Series([est2.pvalues[1]]),
                           'R^2': pd.Series(r_square)}

pd.DataFrame(the_classification_data).to_csv(file_name, index=False)
files.download(file_name)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
# save the classfy imaeges

for i, image in enumerate(classified_images):
  # change the image to 0 and 255
  image = np.where(image == 1, 255, image)
  image_name = str(years_list[i]) + '.tif'
  imsave(image_name, np.uint8(image))
  files.download(image_name)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>