In [None]:
# My Kaggle ID: Anish Panicker(ap2938)
import numpy as np
import pandas as pd
import os
import glob
import h5py
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping
np.random.seed(42)
tf.random.set_seed(42)



In [None]:

base_dir = '/kaggle/input/w9-itr1/'
labelled_dir = os.path.join(base_dir, 'labelled', 'labelled_data')
unlabelled_dir = os.path.join(base_dir, 'unlabelled_data', 'unlabelled_data')
test_dir = os.path.join(base_dir, 'test', 'test')
def load_data_from_directory_dynamic(directory_path, labelled=True, max_samples=None):
    H_Re_list = []
    H_Im_list = []
    SNR_list = []
    Pos_list = [] if labelled else None
    total_loaded = 0

    file_paths_h5 = glob.glob(os.path.join(directory_path, '*.h5')) + glob.glob(os.path.join(directory_path, '*.hdf5'))

    if file_paths_h5:
        print("Loading data from HDF5 files...")
        for file_path in file_paths_h5:
            with h5py.File(file_path, 'r') as f:
                num_samples = f['H_Re'].shape[0]
                if max_samples:
                    remaining = max_samples - total_loaded
                    if remaining <= 0:
                        break
                    load_count = min(remaining, num_samples)
                else:
                    load_count = num_samples

                # Slice the datasets
                H_Re = f['H_Re'][:load_count]
                H_Im = f['H_Im'][:load_count]
                SNR = f['SNR'][:load_count]
                H_Re_list.append(H_Re)
                H_Im_list.append(H_Im)
                SNR_list.append(SNR)
                if labelled and 'Pos' in f.keys():
                    Pos = f['Pos'][:load_count]
                    Pos_list.append(Pos)

                total_loaded += load_count
                print(f"Loaded {load_count} samples from {file_path}. Total loaded: {total_loaded}")

                if max_samples and total_loaded >= max_samples:
                    break
    else:
        raise ValueError(f"No supported data files found in directory '{directory_path}'.")

    if not H_Re_list:
        raise ValueError(f"No valid data found in directory '{directory_path}'.")

    H_Re = np.concatenate(H_Re_list, axis=0)
    H_Im = np.concatenate(H_Im_list, axis=0)
    SNR = np.concatenate(SNR_list, axis=0)
    if labelled:
        if not Pos_list:
            raise ValueError("Labelled data is missing 'Pos' key in some files.")
        Pos = np.concatenate(Pos_list, axis=0)
        return H_Re, H_Im, SNR, Pos
    else:
        return H_Re, H_Im, SNR
print("Loading labelled data...")
H_Re_labelled, H_Im_labelled, SNR_labelled, Pos_labelled = load_data_from_directory_dynamic(labelled_dir, labelled=True)
print(f"Labelled data: H_Re={H_Re_labelled.shape}, H_Im={H_Im_labelled.shape}, SNR={SNR_labelled.shape}, Pos={Pos_labelled.shape}")
unlabelled_sample_size = 4000
print(f"\nLoading unlabelled data with up to {unlabelled_sample_size} samples...")
H_Re_unlabelled, H_Im_unlabelled, SNR_unlabelled = load_data_from_directory_dynamic(unlabelled_dir, labelled=False, max_samples=unlabelled_sample_size)
print(f"Unlabelled data: H_Re={H_Re_unlabelled.shape}, H_Im={H_Im_unlabelled.shape}, SNR={SNR_unlabelled.shape}")
print("\nLoading test data...")
H_Re_test, H_Im_test, SNR_test = load_data_from_directory_dynamic(test_dir, labelled=False)
print(f"Test data: H_Re={H_Re_test.shape}, H_Im={H_Im_test.shape}, SNR={SNR_test.shape}")
def preprocess_data_simplified(H_Re, H_Im, SNR):
    H_Re_mean = np.mean(H_Re, axis=-1)
    H_Im_mean = np.mean(H_Im, axis=-1)
    SNR_mean = np.mean(SNR, axis=-1)
    H_complex = H_Re_mean + 1j * H_Im_mean
    H_mag = np.abs(H_complex)
    SNR_norm = (SNR_mean - np.mean(SNR_mean)) / np.std(SNR_mean)
    SNR_expanded = np.expand_dims(SNR_norm, axis=-1)
    SNR_tiled = np.tile(SNR_expanded, (1, 1, H_mag.shape[2]))
    H_final = np.expand_dims(H_mag, axis=-1)
    H_final = np.concatenate([H_final, SNR_tiled[..., np.newaxis]], axis=-1)
    H_final[H_final == 0] = 1e-6
    H_final = (H_final - np.mean(H_final)) / np.std(H_final)

    return H_final

print("Preprocessing data with simplified method...")
H_labelled = preprocess_data_simplified(H_Re_labelled, H_Im_labelled, SNR_labelled)
H_unlabelled = preprocess_data_simplified(H_Re_unlabelled, H_Im_unlabelled, SNR_unlabelled)
H_test = preprocess_data_simplified(H_Re_test, H_Im_test, SNR_test)
print(f"Preprocessed Labelled Data Shape: {H_labelled.shape}")
print(f"Preprocessed Unlabelled Data Shape: {H_unlabelled.shape}")
print(f"Preprocessed Test Data Shape: {H_test.shape}")



In [None]:

def build_optimized_autoencoder(input_shape):
    input_layer = layers.Input(shape=input_shape)
    x = layers.Conv2D(8, (3,3), activation='relu', padding='same')(input_layer)
    x = layers.MaxPooling2D((2,2), padding='same')(x)
    x = layers.Conv2D(16, (3,3), activation='relu', padding='same')(x)
    encoded = layers.MaxPooling2D((2,2), padding='same')(x)
    x = layers.Conv2D(16, (3,3), activation='relu', padding='same')(encoded)
    x = layers.UpSampling2D((2,2))(x)
    x = layers.Conv2D(8, (3,3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2,2))(x)
    decoded = layers.Conv2D(input_shape[-1], (3,3), activation='sigmoid', padding='same')(x)

    autoencoder = models.Model(input_layer, decoded)
    encoder = models.Model(input_layer, encoded)
    return autoencoder, encoder

input_shape = H_unlabelled.shape[1:]
autoencoder, encoder = build_optimized_autoencoder(input_shape)
autoencoder.compile(optimizer='adam', loss='mse')
H_ua_train, H_ua_val = train_test_split(H_unlabelled, test_size=0.1, random_state=42)
print("\nTraining the autoencoder...")
autoencoder.fit(
    H_ua_train, H_ua_train,
    epochs=20,
    batch_size=32,
    shuffle=True,
    validation_data=(H_ua_val, H_ua_val),
    callbacks=[EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)]
)
print("\nExtracting encoded features...")
encoded_labelled = encoder.predict(H_labelled, batch_size=32)
encoded_test = encoder.predict(H_test, batch_size=32)
encoded_labelled_flat = encoded_labelled.reshape(encoded_labelled.shape[0], -1)
encoded_test_flat = encoded_test.reshape(encoded_test.shape[0], -1)
X = encoded_labelled_flat
y = Pos_labelled

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)




In [None]:
def build_optimized_regressor(input_dim):
    model = models.Sequential()
    model.add(layers.Dense(64, activation='relu', input_dim=input_dim))
    model.add(layers.Dense(32, activation='relu'))
    model.add(layers.Dense(16, activation='relu'))
    model.add(layers.Dense(3))
    model.compile(optimizer='adam', loss='mse', metrics=['mae'])
    return model

regressor = build_optimized_regressor(X_train.shape[1])
regressor.summary()
print("\nTraining the regressor...")
regressor.fit(
    X_train, y_train,
    epochs=20,
    batch_size=32,
    validation_data=(X_val, y_val),
    callbacks=[EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)]
)

In [None]:
print("\nPredicting on test data...")
y_test_pred = regressor.predict(encoded_test_flat)
submission = pd.DataFrame({
    'id': np.arange(len(y_test_pred)),
    'x': y_test_pred[:, 0],
    'y': y_test_pred[:, 1],
    'z': y_test_pred[:, 2]
})
submission.to_csv('submission1.csv', index=False)
print("Submission file 'submission1.csv' created successfully.")
