In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os




In [None]:
import matplotlib.pyplot as plt
from skimage.io import imread
from skimage.transform import resize
from sklearn.model_selection import train_test_split

In [None]:
 
def resize_all(src, pklname, include, width=150, height=None):
     
    height = height if height is not None else width
     
    data = dict()
    data['description'] = 'resized ({0}x{1})concrete images in rgb'.format(int(width), int(height))
    data['label'] = []
    data['filename'] = []
    data['data'] = []   
     
    pklname = f"{pklname}_{width}x{height}px.pkl"
 
    # read all images in PATH, resize and write to DESTINATION_PATH
    for subdir in os.listdir(src):
        if subdir in include:
            print(subdir)
            current_path = os.path.join(src, subdir)
 
            for file in os.listdir(current_path):
                if file[-3:] in {'jpg', 'png'}:
                    im = imread(os.path.join(current_path, file))
                    im = resize(im, (width, height)) #[:,:,::-1]
                    data['label'].append(subdir)
                    data['filename'].append(file[:-4])
                    data['data'].append(im)
    return data
 


In [None]:
data_path = '/kaggle/input/concrete-and-pavement-crack-images'
os.listdir(data_path)

In [None]:
base_name = 'concrete_crack'
width = 100

# WE INCLUDE ALL FILE: WE HAVE QUITE SOME VARIETY OF ROCK WE WILL USE THEM ALL
include = os.listdir(data_path)
 
data = resize_all(src=data_path, pklname=base_name, width=width, include=include)

In [None]:
from collections import Counter
print('number of samples: ', len(data['data']))
print('keys: ', list(data.keys()))
print('description: ', data['description'])
print('image shape: ', data['data'][0].shape)
print('labels:', np.unique(data['label']))
print('filename:', np.unique(data['filename']))
Counter(data['label'])

In [None]:
labels = np.unique(data['label'])


fig, axes = plt.subplots(1, len(labels))
fig.set_size_inches(30,8)
fig.tight_layout()
 

for ax, label in zip(axes, labels):
    idx = data['label'].index(label)
     
    ax.imshow(data['data'][idx])
    ax.axis('off')
    ax.set_title(label)


In [None]:
X = np.array(data['data'])
y = np.array(data['label'])

In [None]:

X_train, X_test, y_train, y_test = train_test_split(
    X, 
    y, 
    test_size=0.2, 
    shuffle=True,
    random_state=42,
    stratify=data['label']
)

In [None]:
def plot_bar(y, loc='left', relative=True):
    width = 0.35
    if loc == 'left':
        n = -0.5
    elif loc == 'right':
        n = 0.5
     
    # calculate counts per type and sort, to ensure their order
    unique, counts = np.unique(y, return_counts=True)
    sorted_index = np.argsort(unique)
    unique = unique[sorted_index]
     
    if relative:
        # plot as a percentage
        counts = 100*counts[sorted_index]/len(y)
        ylabel_text = '% count'
    else:
        # plot counts
        counts = counts[sorted_index]
        ylabel_text = 'count'
         
    xtemp = np.arange(len(unique))
    
    plt.bar(xtemp + n*width, counts, align='center', alpha=.7, width=width)
    plt.xticks(xtemp, unique, rotation=45)
    plt.xlabel('equipment type')
    plt.ylabel(ylabel_text)
    
 
plt.figure(figsize=(49,15))
plt.suptitle('relative amount of photos per type')
plot_bar(y_train, loc='left')
plot_bar(y_test, loc='right')
plt.legend([
    f'train ({len(y_train)} photos)', 
    f'test ({len(y_test)} photos)'
]);

In [None]:
import joblib
from sklearn.base import BaseEstimator, TransformerMixin

from sklearn.linear_model import SGDClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import VotingClassifier

from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import skimage

from skimage.feature import hog
from skimage.io import imread
from skimage.transform import rescale
 
class RGB2GrayTransformer(BaseEstimator, TransformerMixin):
 
    def __init__(self):
        pass
 
    def fit(self, X, y=None):
        
        return self
 
    def transform(self, X, y=None):
        
        return np.array([skimage.color.rgb2gray(img) for img in X])
     
 
class HogTransformer(BaseEstimator, TransformerMixin):
 
    def __init__(self, y=None, pixels_per_cell=(8, 8),
                cells_per_block=(3, 3)):
        self.y = y
        self.pixels_per_cell = pixels_per_cell
        self.cells_per_block = cells_per_block
 
    def fit(self, X, y=None):
        return self
 
    def transform(self, X, y=None):
 
        try: # parallel
            return np.array([hog(img, pixels_per_cell=self.pixels_per_cell, 
    cells_per_block=self.cells_per_block) for img in X])
        except:
            return np.array([hog(img, pixels_per_cell=self.pixels_per_cell, 
    cells_per_block=self.cells_per_block) for img in X])

In [None]:
full_pipeline = Pipeline([
    ("rgb_to_gray", RGB2GrayTransformer()),
    ("Hog_transformer", HogTransformer(pixels_per_cell=(8, 8),
                cells_per_block=(3, 3))),
    ("StandardScaler", StandardScaler()),
])
full_pipeline_with_predevtor = Pipeline([
    ("full_pipeline", full_pipeline),
    ("sgd", SGDClassifier(random_state=42, max_iter=1000, tol=1e-3)),
])
full_pipeline_with_predevtor.fit(X_train, y_train)
y_pred = full_pipeline_with_predevtor.predict(X_test)

In [None]:
print(np.array(y_pred == y_test)[:25])
print('')
print('Percentage correct: ', 100*np.sum(y_pred == y_test)/len(y_test))

In [None]:
linear_model = LogisticRegression()
svc_model = SVC()
ranf_model = RandomForestClassifier(random_state=42, n_jobs=-1
                                       , max_leaf_nodes=16 , n_estimators=500)

In [None]:
trans_data = full_pipeline.fit_transform(X_train, y_train)
joblib.dump(trans_data, 'trans_data_concrete_crack.pkl')

In [None]:
full_pipeline_with_predevtor = Pipeline([
    ("full_pipeline", full_pipeline),
    ("linear_model", linear_model),
])
full_pipeline_with_predevtor.fit(X_train, y_train)
y_pred = full_pipeline_with_predevtor.predict(X_test)
#linear_model.fit(trans_data, y_train)
#y_pred = linear_model.predict(X_test)

print(np.array(y_pred == y_test)[:25])
print('')
print('Percentage correct: ', 100*np.sum(y_pred == y_test)/len(y_test))

In [None]:
full_pipeline_with_predevtor = Pipeline([
    ("full_pipeline", full_pipeline),
    ("svc_model", svc_model),
])
full_pipeline_with_predevtor.fit(X_train, y_train)
y_pred = full_pipeline_with_predevtor.predict(X_test)
#svc_model.fit(trans_data, y_train)
#y_pred = svc_model.predict(X_test)

print(np.array(y_pred == y_test)[:25])
print('')
print('Percentage correct: ', 100*np.sum(y_pred == y_test)/len(y_test))

In [None]:

ranf_model.fit(trans_data, y_train)
y_pred = ranf_model.predict(X_test)

print(np.array(y_pred == y_test)[:25])
print('')
print('Percentage correct: ', 100*np.sum(y_pred == y_test)/len(y_test))

In [None]:
sgd =  SGDClassifier(random_state=42, max_iter=1000, tol=1e-3)
voting_cfr = VotingClassifier(
estimators=[('sgd', sgd), ('svc_model', svc_model), ('ranf_model', ranf_model)],
    voting='hard'
)
voting_cfr.fit(trans_data, y_train)
y_pred = ranf_model.predict(X_test)

print(np.array(y_pred == y_test)[:25])
print('')
print('Percentage correct: ', 100*np.sum(y_pred == y_test)/len(y_test))