In [54]:
from pathlib import Path

base_dir = Path().resolve()
default_zip_path = base_dir / "Lung_classification.zip"

if default_zip_path.is_file():
    zip_file_path = default_zip_path
else:
    user_input = input(
        "Zip file not found at default location:\n"
        f"  {default_zip_path}\n"
        "Enter full path to the ZIP file (or leave blank to cancel):\n"
    ).strip()

    if not user_input:
        raise FileNotFoundError(
            f"Zip file not found at default location {default_zip_path} and no alternative path was provided.",
        )

    zip_file_path = Path(user_input).expanduser()
    if not zip_file_path.is_file():
        raise FileNotFoundError(
            f"Zip file not found at {zip_file_path}. Please check the path and run this cell again.",
        )

print(f"Found zip file at: {zip_file_path.resolve()}")

Found zip file at: C:\Users\91823\Disease_predictor\Lung_classification.zip


In [None]:
import zipfile
from pathlib import Path

base_dir = Path().resolve()

try:
    zip_file_path
except NameError:
    zip_file_path = base_dir / "Lung_classification.zip"

zip_file_path = Path(zip_file_path)

if not zip_file_path.is_file():
    raise FileNotFoundError(
        f"Zip file not found at {zip_file_path}. Run the first cell to choose the correct path.",
    )

extract_folder_path = base_dir / "classification_folder"
extract_folder_path.mkdir(parents=True, exist_ok=True)

with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
    zip_ref.extractall(extract_folder_path)

print(f"Files extracted to {extract_folder_path.resolve()}")

Files extracted to C:\Users\91823\Disease_predictor\classification_folder


In [None]:
from pathlib import Path

base_dir = Path().resolve()
extract_folder_path = base_dir / "classification_folder"

if not extract_folder_path.is_dir():
    raise FileNotFoundError(
        f"Extraction folder not found at {extract_folder_path}. Run the extraction cell first.",
    )

extracted_files = [p.name for p in extract_folder_path.iterdir()]
print(extracted_files)

['Classification']


In [None]:
from pathlib import Path

base_dir = Path().resolve()
classification_folder = base_dir / "classification_folder" / "Classification"

if not classification_folder.is_dir():
    raise FileNotFoundError(
        f"Classification folder not found at {classification_folder}. Check the extracted folder structure.",
    )

files_in_classification = [p.name for p in classification_folder.iterdir()]
print(files_in_classification)

['test_classification_json', 'test_classification_wav', 'train_classification_json', 'train_classification_wav']


In [None]:
from pathlib import Path

base_dir = Path().resolve()
classification_folder = base_dir / "classification_folder" / "Classification"

train_json_dir = classification_folder / "train_classification_json"
test_json_dir = classification_folder / "test_classification_json"
train_wav_dir = classification_folder / "train_classification_wav"
test_wav_dir = classification_folder / "test_classification_wav"

train_json_files = [p.name for p in train_json_dir.rglob("*.json")]
test_json_files = [p.name for p in test_json_dir.rglob("*.json")]
train_wav_files = [p.name for p in train_wav_dir.rglob("*.wav")]
test_wav_files = [p.name for p in test_wav_dir.rglob("*.wav")]

print("Train JSON Files:", train_json_files)
print("Test JSON Files:", test_json_files)
print("Train WAV Files:", train_wav_files)
print("Test WAV Files:", test_wav_files)

Train JSON Files: ['40069321_15.3_0_p1_981.json', '40069321_15.3_0_p2_982.json', '40069321_15.3_0_p3_983.json', '40069321_15.3_0_p4_984.json', '40138127_14.7_0_p1_137.json', '40138127_14.7_0_p3_139.json', '40138127_14.7_0_p4_140.json', '40490865_8.4_1_p1_1884.json', '40490865_8.4_1_p2_1900.json', '40490865_8.4_1_p3_1916.json', '40490865_8.4_1_p4_1932.json', '40638274_9.7_1_p1_1753.json', '40638274_9.7_1_p1_1763.json', '40638274_9.7_1_p1_1789.json', '40638274_9.7_1_p1_1940.json', '40638274_9.7_1_p2_1684.json', '40638274_9.7_1_p2_1719.json', '40638274_9.7_1_p2_1801.json', '40638274_9.7_1_p2_1828.json', '40638274_9.7_1_p3_1708.json', '40638274_9.7_1_p3_1751.json', '40638274_9.7_1_p3_1765.json', '40638274_9.7_1_p3_1844.json', '40638274_9.7_1_p3_1924.json', '40638274_9.7_1_p3_1971.json', '40638274_9.7_1_p4_1718.json', '40638274_9.7_1_p4_1727.json', '40638274_9.7_1_p4_1739.json', '40638274_9.7_1_p4_1777.json', '40638274_9.7_1_p4_1814.json', '40638274_9.7_1_p4_1875.json', '40638274_9.7_1_p4_1

In [55]:
from pathlib import Path
import json

import librosa
import soundfile as sf

base_dir = Path().resolve()
classification_root = base_dir / "classification_folder" / "Classification"


def _find_pair_paths(target_id, split="train"):
    target_id = str(target_id)
    if split not in {"train", "test"}:
        raise ValueError("split must be 'train' or 'test'")
    json_root = classification_root / f"{split}_classification_json"
    wav_root = classification_root / f"{split}_classification_wav"
    if not json_root.exists() or not wav_root.exists():
        raise FileNotFoundError(f"Expected {json_root} and {wav_root} to exist.")
    json_matches = list(json_root.rglob(f"*_{target_id}.json"))
    wav_matches = list(wav_root.rglob(f"*_{target_id}.wav"))
    if not json_matches:
        raise FileNotFoundError(f"No JSON file found for id {target_id} in {json_root}.")
    if not wav_matches:
        raise FileNotFoundError(f"No WAV file found for id {target_id} in {wav_root}.")
    if len(json_matches) > 1:
        print(f"Warning: multiple JSON matches for id {target_id}, using the first one.")
    if len(wav_matches) > 1:
        print(f"Warning: multiple WAV matches for id {target_id}, using the first one.")
    return json_matches[0], wav_matches[0]


def _detect_time_unit(events, duration_sec, sr):
    if not events:
        return "samples"
    max_end = max(int(e["end"]) for e in events)
    sec_from_samples = max_end / sr
    sec_from_ms = max_end / 1000.0
    diff_samples = abs(sec_from_samples - duration_sec)
    diff_ms = abs(sec_from_ms - duration_sec)
    if diff_samples < diff_ms:
        return "samples"
    return "ms"


def clip_segments_for_id(target_id, split="train", output_root=None):
    json_path, wav_path = _find_pair_paths(target_id, split=split)
    y, sr = librosa.load(wav_path, sr=None)
    duration_sec = len(y) / sr
    with open(json_path, "r") as f:
        meta = json.load(f)
    events = meta.get("event_annotation", [])
    if not events:
        print(f"No events in {json_path.name}, nothing to clip.")
        return []
    unit = _detect_time_unit(events, duration_sec, sr)
    if output_root is None:
        output_root = base_dir / "clips"
    clips_info = []
    for idx, ev in enumerate(events, start=1):
        start_val = int(ev["start"])
        end_val = int(ev["end"])
        if unit == "samples":
            start_sample = start_val
            end_sample = end_val
        elif unit == "ms":
            start_sample = int(start_val / 1000.0 * sr)
            end_sample = int(end_val / 1000.0 * sr)
        else:
            start_sample = int(start_val * sr)
            end_sample = int(end_val * sr)
        start_sample = max(0, start_sample)
        end_sample = min(len(y), end_sample)
        if end_sample <= start_sample:
            continue
        label = ev.get("type") or meta.get("record_annotation") or "Unknown"
        label_dir_name = label.replace(" ", "_")
        out_dir = output_root / label_dir_name
        out_dir.mkdir(parents=True, exist_ok=True)
        out_name = f"{wav_path.stem}_seg{idx}_{label_dir_name}.wav"
        out_path = out_dir / out_name
        sf.write(out_path, y[start_sample:end_sample], sr)
        clips_info.append(
            {
                "id": str(target_id),
                "segment_index": idx,
                "label": label,
                "start_sample": start_sample,
                "end_sample": end_sample,
                "output_path": str(out_path),
            },
        )
    print(f"Created {len(clips_info)} clips for id {target_id} ({split}).")
    return clips_info


clips_1684 = clip_segments_for_id(1684, split="train")

Created 4 clips for id 1684 (train).


In [None]:
import numpy as np
import pandas as pd
import librosa

def compute_band_energy_for_clip(path, low_f=100.0, high_f=2000.0):
    y_clip, sr = librosa.load(path, sr=None)
    Y = np.fft.rfft(y_clip)
    freqs = np.fft.rfftfreq(len(y_clip), d=1.0 / sr)
    mag = np.abs(Y)
    band_mask = (freqs >= low_f) & (freqs <= high_f)
    band_energy = float(np.sum(mag[band_mask] ** 2))
    return band_energy, sr


def build_features_from_clips(clips, low_f=100.0, high_f=2000.0):
    rows = []
    for c in clips:
        energy, sr = compute_band_energy_for_clip(c["output_path"], low_f=low_f, high_f=high_f)
        rows.append({
            "id": c["id"],
            "segment_index": c["segment_index"],
            "label": c["label"],
            "low_f": low_f,
            "high_f": high_f,
            "band_energy": energy,
        })
    return pd.DataFrame(rows)


features_1684 = build_features_from_clips(clips_1684, low_f=100.0, high_f=2000.0)
features_1684.head()

Unnamed: 0,id,segment_index,label,low_f,high_f,band_energy
0,1684,1,Fine Crackle,100.0,2000.0,95.465996
1,1684,2,Fine Crackle,100.0,2000.0,292.063202
2,1684,3,Fine Crackle,100.0,2000.0,209.004105
3,1684,4,Fine Crackle,100.0,2000.0,158.049301


In [None]:
from pathlib import Path
import pandas as pd

base_dir = Path().resolve()
classification_root = base_dir / "classification_folder" / "Classification"
train_json_dir = classification_root / "train_classification_json"


def _extract_id_from_json_path(json_path):
    """Extract the numeric id from a JSON filename like ..._1684.json."""
    stem = json_path.stem
    parts = stem.split("_")
    if not parts:
        return None
    last = parts[-1]
    try:
        return int(last)
    except ValueError:
        return None


def build_features_for_many_ids(max_ids=50, low_f=100.0, high_f=2000.0):
    seen_ids = []
    all_clips = []
    for json_path in train_json_dir.rglob("*.json"):
        if len(seen_ids) >= max_ids:
            break
        tid = _extract_id_from_json_path(json_path)
        if tid is None or tid in seen_ids:
            continue
        try:
            clips = clip_segments_for_id(tid, split="train")
        except FileNotFoundError:

            continue
        if not clips:
            continue
        all_clips.extend(clips)
        seen_ids.append(tid)

    print(f"Built clips for {len(seen_ids)} ids, total segments: {len(all_clips)}")
    features = build_features_from_clips(all_clips, low_f=low_f, high_f=high_f)
    print("Label distribution in features_all:")
    print(features["label"].value_counts())
    return features


features_all = build_features_for_many_ids(max_ids=50, low_f=100.0, high_f=2000.0)
features_all.head()

No events in 40069321_15.3_0_p1_981.json, nothing to clip.
No events in 40069321_15.3_0_p2_982.json, nothing to clip.
No events in 40069321_15.3_0_p3_983.json, nothing to clip.
No events in 40069321_15.3_0_p4_984.json, nothing to clip.
No events in 40138127_14.7_0_p1_137.json, nothing to clip.
Created 1 clips for id 139 (train).
Created 1 clips for id 140 (train).
Created 4 clips for id 1884 (train).
Created 4 clips for id 1900 (train).
Created 3 clips for id 1916 (train).
Created 5 clips for id 1932 (train).
Created 1 clips for id 1753 (train).
No events in 40638274_9.7_1_p1_1763.json, nothing to clip.
Created 1 clips for id 1789 (train).
Created 2 clips for id 1940 (train).
Created 4 clips for id 1684 (train).
Created 6 clips for id 1719 (train).
Created 4 clips for id 1801 (train).
Created 1 clips for id 1828 (train).
Created 3 clips for id 1708 (train).
Created 1 clips for id 1751 (train).
Created 3 clips for id 1765 (train).
Created 3 clips for id 1844 (train).
Created 2 clips for

Unnamed: 0,id,segment_index,label,low_f,high_f,band_energy
0,139,1,Normal,100.0,2000.0,2961.624756
1,140,1,Normal,100.0,2000.0,21335.933594
2,1884,1,Normal,100.0,2000.0,240.56665
3,1884,2,Normal,100.0,2000.0,316.10614
4,1884,3,Normal,100.0,2000.0,203.682816


In [None]:
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, classification_report

try:
    df = features_all.copy()
    print("Using features_all for training (multiple ids).")
except NameError:
    df = features_1684.copy()
    print("features_all not found; using features_1684 (single id).")

X = df[["band_energy"]].values
y = df["label"].values

unique_labels = np.unique(y)
print("Unique labels:", unique_labels)

if len(unique_labels) < 2:
    print("Only one class present in the data. Add clips/features from more ids and labels before training a real classifier.")
else:
    le = LabelEncoder()
    y_enc = le.fit_transform(y)
    X_train, X_test, y_train, y_test = train_test_split(
        X, y_enc, test_size=0.25, random_state=42, stratify=y_enc
    )
    clf = RandomForestClassifier(n_estimators=50, random_state=42)
    clf.fit(X_train, y_train)
    train_acc = clf.score(X_train, y_train)
    test_acc = clf.score(X_test, y_test)
    print("Train accuracy:", train_acc)
    print("Test accuracy:", test_acc)

    y_pred = clf.predict(X_test)
    print("\nConfusion matrix (rows=true, cols=predicted, encoded labels):")
    print(confusion_matrix(y_test, y_pred))
    print("\nLabel mapping (encoded index -> original label):")
    for idx, label in enumerate(le.classes_):
        print(f"  {idx}: {label}")
    print("\nClassification report:")
    print(classification_report(y_test, y_pred, target_names=le.classes_))

Using features_all for training (multiple ids).
Unique labels: ['Fine Crackle' 'Normal' 'Wheeze']
Train accuracy: 1.0
Test accuracy: 0.9142857142857143

Confusion matrix (rows=true, cols=predicted, encoded labels):
[[ 2  2  0]
 [ 1 29  0]
 [ 0  0  1]]

Label mapping (encoded index -> original label):
  0: Fine Crackle
  1: Normal
  2: Wheeze

Classification report:
              precision    recall  f1-score   support

Fine Crackle       0.67      0.50      0.57         4
      Normal       0.94      0.97      0.95        30
      Wheeze       1.00      1.00      1.00         1

    accuracy                           0.91        35
   macro avg       0.87      0.82      0.84        35
weighted avg       0.91      0.91      0.91        35



In [None]:
from pathlib import Path
import json

base_dir = Path().resolve()
classification_root = base_dir / "classification_folder" / "Classification"
train_json_dir = classification_root / "train_classification_json"

def inspect_label_distribution(max_files=200):
    label_counts = {}
    checked = 0
    for json_path in train_json_dir.rglob("*.json"):
        if checked >= max_files:
            break
        with open(json_path, "r") as f:
            meta = json.load(f)
        rec_label = meta.get("record_annotation")
        if rec_label:
            label_counts[rec_label] = label_counts.get(rec_label, 0) + 1
        for ev in meta.get("event_annotation", []):
            ev_label = ev.get("type")
            if ev_label:
                label_counts[ev_label] = label_counts.get(ev_label, 0) + 1
        checked += 1
    print(f"Scanned {checked} JSON files.")
    print("Label counts in these files:")
    for lbl, cnt in label_counts.items():
        print(f"  {lbl}: {cnt}")
    return label_counts

label_counts_sample = inspect_label_distribution(max_files=200)

Scanned 200 JSON files.
Label counts in these files:
  Poor Quality: 17
  Normal: 628
  DAS: 31
  Fine Crackle: 83
  CAS & DAS: 4
  Wheeze: 39
  CAS: 12
  Coarse Crackle: 2
  Wheeze+Crackle: 1


In [None]:
import joblib
artifact = {
    "model": clf,
    "label_encoder": le,
}
joblib.dump(artifact, "lung_model.joblib")
print("Saved model to lung_model.joblib")

Saved model to lung_model.joblib
