In [2]:
import re
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

print(tf.__version__)

2.15.0


In [3]:
try:
  tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
  print('Device:', tpu.master())
  tf.config.experimental_connect_to_cluster(tpu)
  tf.tpu.experimental.initialize_tpu_system(tpu)
  strategy = tf.distribute.experimental.TPUStrategy(tpu)
except:
  strategy = tf.distribute.get_strategy()
print('Number of replicas:', strategy.num_replicas_in_sync)

BATCH_SIZE = 16 * strategy.num_replicas_in_sync
IMAGE_SIZE = [150, 150]
EPOCHS = 25


Number of replicas: 1


In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
filenames = tf.io.gfile.glob(str('chest_xray/train/*/*'))
filenames.extend(tf.io.gfile.glob(str('chest_xray/val/*/*')))
filenames2 = tf.io.gfile.glob(str('chest_xray/test/*/*'))

train_filenames, val_filenames = train_test_split(filenames, test_size=0.2)
test_filenames = filenames2

print("Training and validating images count: " + str(len(filenames)))
print("Testing images count: " + str(len(filenames2)))
print("Training images count: " + str(len(train_filenames)))
print("Validating images count: " + str(len(val_filenames)))
NormalCount = len([filename for filename in train_filenames if "NORMAL" in filename])
print("Normal images count in training set: " + str(NormalCount))

PneumoniaCount = len([filename for filename in train_filenames if "PNEUMONIA" in filename])
print("Pneumonia images count in training set: " + str(PneumoniaCount))


ValueError: With n_samples=0, test_size=0.2 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.

In [None]:
lables = np.array([str(tf.strings.split(item, os.path.sep)[-1].numpy())[2:-1]
  for item in tf.io.gfile.glob(str("chest_xray/train/*"))])
lables2 = np.array([str(tf.strings.split(item, os.path.sep)[-1].numpy())[2:-1]
  for item in tf.io.gfile.glob(str("chest_xray/val/*"))])
print(lables)
print(lables2)

def get_label(file_path):
  parts = tf.strings.split(file_path, os.path.sep)
  return parts[-2] == "PNEUMONIA"

def decode_img(img):
  img = tf.image.decode_jpeg(img, channels=3)
  img = tf.image.convert_image_dtype(img, tf.float32)
  return tf.image.resize(img, IMAGE_SIZE)

def process_path(file_path):
  label = get_label(file_path)
  # load the raw data from the file as a string
  img = tf.io.read_file(file_path)
  img = decode_img(img)
  return img, label


In [None]:
for image, label in train_ds.take(3):
  print("Image shape: ", image.numpy().shape)
  print("Label: ", label.numpy())

def prepare_for_training(ds, cache=True, shuffle_buffer_size=1000):
 	if cache:
    if isinstance(cache, str):
      ds = ds.cache(cache)
  else:
    ds = ds.cache()

  ds = ds.shuffle(buffer_size=shuffle_buffer_size)
  ds = ds.repeat()
  ds = ds.batch(BATCH_SIZE)
  ds = ds.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

  return ds


test_list_ds = tf.data.Dataset.list_files(str('chest_xray/test/*/*'))
test_ds = test_list_ds.map(process_path, num_parallel_calls=tf.data.experimental.AUTOTUNE)
test_ds = test_ds.batch(BATCH_SIZE)

train_ds = prepare_for_training(train_ds)
val_ds = prepare_for_training(val_ds)
image_batch, label_batch = next(iter(train_ds))


In [None]:
def show_batch(image_batch, label_batch):
      plt.figure(figsize=(10,10))
      for n in range(25):
          ax = plt.subplot(5,3,n+1)
          plt.imshow(image_batch[n])
          if label_batch[n]:
              plt.title("PNEUMONIA")
          else:
              plt.title("NORMAL")
          plt.axis("off")
show_batch(image_batch.numpy(), label_batch.numpy())


In [None]:
model = keras.Sequential([
            keras.layers.Flatten(input_shape=(150, 150, 3)),
            keras.layers.Dense(512, activation='relu'),
            keras.layers.Dropout(0.3),
            keras.layers.Dense(128, activation='relu'),
            keras.layers.Dropout(0.2),
            keras.layers.Dense(32, activation='relu'),
            keras.layers.Dense(2, activation='softmax'),
])

model.compile(optimizer = 'adam',
              loss = 'sparse_categorical_crossentropy',
              metrics = ['accuracy'])


In [None]:
history = model.fit(
    train_ds,
    epochs = 30,
    steps_per_epoch = len(train_filenames) // BATCH_SIZE,
    validation_data=val_ds,
    validation_steps= len(val_filenames) // BATCH_SIZE,
)


In [None]:
fig, ax = plt.subplots(1, 2, figsize= (20, 3))
ax = ax.ravel()
for i, met in enumerate(['accuracy', 'loss']):
  ax[i].plot(history.history[met])
  ax[i].plot(history.history['val_' + met])
  ax[i].set_title('Model {}'.format(met))
  ax[i].set_xlabel('epochs')
  ax[i].set_ylabel(met)
  ax[i].legend(['train', 'val'])


In [None]:
loss, accuracy = model.evaluate(test_ds)
print('Точность распознавания на проверочных данных:', accuracy)
print('Метрики потери:', loss)

In [None]:
model = keras.Sequential([
            keras.layers.Conv2D(32, (3,3), activation='relu', padding='same', input_shape=(150,150,3)),
            keras.layers.MaxPool2D((2,2), strides=2),
            keras.layers.Conv2D(64, (3,3), activation ='relu', padding='same'),
            keras.layers.MaxPool2D((2,2), strides=2),
            keras.layers.Flatten(),

            keras.layers.Dense(512, activation='relu'),
            keras.layers.Dropout(0.5),
            keras.layers.Dense(128, activation='relu'),
            keras.layers.Dropout(0.3),
            keras.layers.Dense(32, activation='relu'),
            keras.layers.Dropout(0.2),
            keras.layers.Dense(2, activation='softmax')
])


In [None]:
loss, accuracy = model.evaluate(test_ds)
print('Точность распознавания на проверочных данных:', accuracy)
print('Метрики потери:', loss)
