In [1]:
import os
import json
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.applications import DenseNet121
from tensorflow.keras.applications.densenet import preprocess_input
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing import image

# --- 1. Parameters ---
IMAGE_SIZE = (224, 224)
BATCH_SIZE = 16
MAX_VOCAB = 10000
MAX_LEN = 50
EMBED_DIM = 256
LSTM_UNITS = 512
EPOCHS = 20
ANNOTATION_FILE = 'archive (5)/annotation.json'  # matches your original
IMAGE_BASE_PATH = 'archive (5)/images/images' 

In [2]:
with open(ANNOTATION_FILE, 'r') as f:
    data = json.load(f)['train']

image_paths = []
reports = []
for item in data:
    img_path = os.path.join(IMAGE_BASE_PATH, item['image_path'][0])
    if os.path.exists(img_path):
        image_paths.append(img_path)
        reports.append(item['report'])

In [13]:
tokenizer = Tokenizer(num_words=MAX_VOCAB, oov_token='<unk>')
tokenizer.fit_on_texts(reports)
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

In [15]:
len(tokenizer.word_index)

1425

In [16]:

def add_special_tokens(seq):
    return [tokenizer.word_index.get('<start>', MAX_VOCAB-2)] + seq + [tokenizer.word_index.get('<end>', MAX_VOCAB-1)]

# convert reports to sequences
seqs = tokenizer.texts_to_sequences(reports)
seqs = [s[:MAX_LEN-2] for s in seqs]  # Reserve 2 spaces for <start>/<end>
seqs_in = pad_sequences([[tokenizer.word_index.get('<start>', MAX_VOCAB-2)] + s for s in seqs], maxlen=MAX_LEN, padding='post')
seqs_out = pad_sequences([s + [tokenizer.word_index.get('<end>', MAX_VOCAB-1)] for s in seqs], maxlen=MAX_LEN, padding='post')


In [17]:
def load_and_preprocess(img_path):
    img = image.load_img(img_path, target_size=IMAGE_SIZE)
    img_array = image.img_to_array(img)
    img_array = preprocess_input(img_array)
    return img_array

# Extract features using pre-trained DenseNet121 (imagenet weights, no top)
# This step *precomputes* features for all images for faster training.
from tensorflow.keras.models import load_model
base_cnn = load_model('densenet121_notop_imagenet.h5')
global_avg_pool = tf.keras.layers.GlobalAveragePooling2D()



In [20]:
import os
import numpy as np

print("Extracting image features...")

# Load and preprocess images
all_img_arrays = np.array([load_and_preprocess(p) for p in image_paths])

# Extract features directly (features are already (batch_size, 1024) because GlobalAveragePooling2D is inside base_cnn)
all_img_features = base_cnn.predict(all_img_arrays, batch_size=BATCH_SIZE, verbose=1)

print("Image features shape:", all_img_features.shape)  # Should print: (num_images, 1024)


Extracting image features...
Image features shape: (2069, 1024)


In [19]:
base_cnn.summary()


Model: "densenet121"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 zero_padding2d (ZeroPaddin  (None, 230, 230, 3)          0         ['input_1[0][0]']             
 g2D)                                                                                             
                                                                                                  
 conv1/conv (Conv2D)         (None, 112, 112, 64)         9408      ['zero_padding2d[0][0]']      
                                                                                                  
 conv1/bn (BatchNormalizati  (None, 112, 112, 64)         256       ['conv1/conv[0][0]']

In [21]:
def tf_map_fn(features, seq_in, seq_out):
    # Features are already extracted, just return them.
    return (features, seq_in), seq_out

dataset = tf.data.Dataset.from_tensor_slices((all_img_features, seqs_in, seqs_out))
dataset = (dataset.shuffle(1024)
                  .map(tf_map_fn, num_parallel_calls=tf.data.AUTOTUNE)
                  .batch(BATCH_SIZE)
                  .prefetch(tf.data.AUTOTUNE))


In [24]:
dataset

<_PrefetchDataset element_spec=((TensorSpec(shape=(None, 1024), dtype=tf.float32, name=None), TensorSpec(shape=(None, 50), dtype=tf.int32, name=None)), TensorSpec(shape=(None, 50), dtype=tf.int32, name=None))>

In [25]:
from tensorflow.keras import layers, models

# --- 6. Model definition ---

# 1️⃣ Encoder Input: DenseNet feature vector (shape: (batch_size, 1024))
encoder_input = layers.Input(shape=(all_img_features.shape[1],), name='encoder_input')  # (None, 1024)
encoder_dense = layers.Dense(EMBED_DIM, activation='relu', name='encoder_dense')(encoder_input)  # (None, EMBED_DIM)

# 2️⃣ Decoder Input: Token sequences (shape: (batch_size, MAX_LEN))
decoder_input = layers.Input(shape=(MAX_LEN,), name='decoder_input')  # (None, MAX_LEN)

# 3️⃣ Embedding Layer: For text tokens
embedding = layers.Embedding(input_dim=MAX_VOCAB, output_dim=EMBED_DIM, mask_zero=True, name='decoder_embedding')(decoder_input)

# 4️⃣ LSTM Decoder:
# To use image features as initial state, we project encoder_dense to both initial hidden and cell states
decoder_lstm = layers.LSTM(LSTM_UNITS, return_sequences=True, return_state=True, name='decoder_lstm')

# Project encoder_dense to match LSTM hidden and cell states (both of shape: (batch_size, LSTM_UNITS))
state_h = layers.Dense(LSTM_UNITS, activation='tanh', name='init_state_h')(encoder_dense)
state_c = layers.Dense(LSTM_UNITS, activation='tanh', name='init_state_c')(encoder_dense)

decoder_output_seq, _, _ = decoder_lstm(embedding, initial_state=[state_h, state_c])

# 5️⃣ Output Layer: Project to vocabulary size
decoder_dense_output = layers.Dense(MAX_VOCAB, activation='softmax', name='decoder_output')(decoder_output_seq)

# 6️⃣ Full Model
model = models.Model(inputs=[encoder_input, decoder_input], outputs=decoder_dense_output)

# 7️⃣ Compile
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# 8️⃣ Summary
model.summary()


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 encoder_input (InputLayer)  [(None, 1024)]               0         []                            
                                                                                                  
 decoder_input (InputLayer)  [(None, 50)]                 0         []                            
                                                                                                  
 encoder_dense (Dense)       (None, 256)                  262400    ['encoder_input[0][0]']       
                                                                                                  
 decoder_embedding (Embeddi  (None, 50, 256)              2560000   ['decoder_input[0][0]']       
 ng)                                                                                          

In [26]:
history = model.fit(
    dataset,
    epochs=EPOCHS,
    verbose=1  # You can set verbose=2 for more compact logs
)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [27]:
history = model.fit(
    dataset,
    epochs=5,
    verbose=1  
)
#extra 10 epochs

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [37]:
history = model.fit(
    dataset,
    epochs=5,
    verbose=1  
)
#extra 10 epochs

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


In [44]:
model.save('cxr_report_generator_model.h5')

  saving_api.save_model(


In [39]:
def generate_report(img_path):
    # 1️⃣ Load and preprocess the image
    img_array = np.expand_dims(load_and_preprocess(img_path), axis=0)  # shape: (1, 224, 224, 3)
    
    # 2️⃣ Extract pre-trained DenseNet features (already (1, 1024) because of pooling inside base_cnn)
    img_feat = base_cnn.predict(img_array, verbose=0)  # shape: (1, 1024)
    
    # 3️⃣ Pass through encoder Dense layer to get encoder feature (1, EMBED_DIM)
    encoder_dense_layer = model.get_layer('encoder_dense')
    enc_feat = encoder_dense_layer(img_feat)  # shape: (1, EMBED_DIM)
    
    # 4️⃣ Map encoder output to initial LSTM hidden and cell states
    state_h_layer = model.get_layer('init_state_h')
    state_c_layer = model.get_layer('init_state_c')
    state_h = state_h_layer(enc_feat)  # shape: (1, LSTM_UNITS)
    state_c = state_c_layer(enc_feat)  # shape: (1, LSTM_UNITS)
    
    # 5️⃣ Start sequence generation
    report = ['<start>']
    
    for _ in range(MAX_LEN):
        # Convert current report words to sequence of token ids
        seq = tokenizer.texts_to_sequences([report])
        seq = pad_sequences(seq, maxlen=MAX_LEN, padding='post')
        
        # Predict next word probabilities
        preds = model.predict([img_feat, seq], verbose=0)[0]  # (MAX_LEN, VOCAB)
        
        # Pick the next word based on the last time step
        next_id = np.argmax(preds[len(report) - 1])  # last non-padded token
        next_word = tokenizer.index_word.get(next_id, '<unk>')
        
        report.append(next_word)
        
        # Stop if end token generated
        if next_word == '<end>':
            break
    
    # 6️⃣ Return report without special tokens
    return ' '.join(report[1:-1])


In [40]:
print("Predicted",generate_report(image_paths[2]))
print("Actual=",reports[2])

Predicted there is a calcified granuloma in the right lower lobe there is no pneumothorax or pleural effusion the cardiac and mediastinal contours are within normal limits there is no focal air space opacity to suggest a pneumonia there is a calcified granuloma at the left upper lobe there are
Actual= Left lower lobe calcified granuloma. Heart size normal. No pleural effusion or pneumothorax. Mild medial right atelectasis. Mild emphysema.


In [43]:
import numpy as np
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# Make sure to install required libraries if not done yet:
# pip install rouge-score nltk

smooth_fn = SmoothingFunction().method4

# Example: Evaluate on first N images (you can choose N based on your dataset size)
N = 100  # Evaluate on first 100 samples
bleu_scores = []
rouge_scores = []

for i in range(N):
    img_path = image_paths[i]
    reference_report = reports[i].lower()  # Ground truth text
    generated_report = generate_report(img_path).lower()  # Model prediction
    
    # Tokenize reference and generated reports
    reference_tokens = [reference_report.split()]
    generated_tokens = generated_report.split()
    
    # 1️⃣ BLEU Score (unigram to 4-gram average)
    bleu = sentence_bleu(reference_tokens, generated_tokens, smoothing_function=smooth_fn)
    bleu_scores.append(bleu)
    


# --- Final Evaluation Metrics ---
avg_bleu = np.mean(bleu_scores)
#
print(f"✅ Average BLEU Score over {N} samples: {avg_bleu:.4f}")
#


KeyboardInterrupt: 