
# Petrophysical Analysis from Well Logs ⛽📊

Clean, modular workflow to process LAS/CSV well logs, run QC, standardize curves, and compute key petrophysical properties:
- **Vshale (GR-based)**
- **Porosity** (density–neutron crossplot)
- **Water saturation** (Archie’s equation)

Includes composite log plots, crossplots, and CSV exports.


In [None]:

# === Dependencies ===
# Note: No internet here. Install locally if needed:
# pip install pandas numpy matplotlib lasio scipy

import math
import io
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

try:
    import lasio
except Exception:
    lasio = None  # Only needed for LAS input


## Configuration & Constants

In [None]:

# Standard curve aliases used to normalize varying vendor names
CURVE_ALIASES = {
    "DEPTH": ["DEPTH", "DEPT", "MD", "TDEP", "TDEPTH"],
    "GR":    ["GR","GAMMA","GAM","CGR","SGR"],
    "RHOB":  ["RHOB","RHOZ","DEN","DENS","DENSITY"],
    "NPHI":  ["NPHI","NPOR","PHIN","NPHI_C"],
    "DT":    ["DT","DTC","AC","SONIC"],
    "RT":    ["RT","RESD","RDEP","AT10","LLD","RT_HRLT"]
}

# Archie defaults (example values; adjust to your basin/formation)
ARCHIE = {
    "a": 1.0,
    "m": 2.0,
    "n": 2.0,
    "Rw": 0.035  # ohm·m
}

# Output folder
OUTPUT_DIR = Path("results")
OUTPUT_DIR.mkdir(exist_ok=True)


## Utility Functions

In [None]:

def _find_column(df: pd.DataFrame, keys):
    cols_lower = {c.lower(): c for c in df.columns}
    for k in keys:
        # strict match / dotted -> underscored match
        for c in df.columns:
            if c.upper() == k.upper() or c.replace('.', '_').upper() == k.upper():
                return c
        # relaxed: substring contains
        for low, orig in cols_lower.items():
            if k.lower() in low:
                return orig
    return None


def alias_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Rename various vendor curve names to a compact standard set."""
    mapping = {}
    for canonical, aliases in CURVE_ALIASES.items():
        col = _find_column(df, aliases)
        if col:
            mapping[col] = canonical
    return df.rename(columns=mapping).copy()


def load_logs(path: str | Path) -> pd.DataFrame:
    """Load LAS or CSV into a DataFrame, ensure DEPTH is a regular column."""
    path = Path(path)
    if path.suffix.lower() == ".las":
        if lasio is None:
            raise RuntimeError("lasio not installed. Install it to read LAS files.")
        las = lasio.read(str(path))
        df = las.df().reset_index()
        # Ensure depth column is named DEPTH
        if "DEPT" in df.columns and "DEPTH" not in df.columns:
            df = df.rename(columns={"DEPT": "DEPTH"})
        elif df.columns[0].lower() not in ("depth", "dept"):
            df = df.rename(columns={df.columns[0]: "DEPTH"})
    else:
        df = pd.read_csv(path)
        # Try to locate depth column
        depth_col = _find_column(df, CURVE_ALIASES["DEPTH"])
        if depth_col and depth_col != "DEPTH":
            df = df.rename(columns={depth_col: "DEPTH"})
    return alias_columns(df)


def normalize_units(df: pd.DataFrame, depth_unit: str = "m") -> pd.DataFrame:
    """Heuristic unit normalization for depth, DT, RHOB, NPHI."""
    out = df.copy()
    if "DEPTH" in out.columns and depth_unit.lower().startswith("f"):
        out["DEPTH"] = out["DEPTH"].astype(float) * 0.3048  # ft → m

    if "DT" in out.columns:
        dt = out["DT"].astype(float)
        # Typical sandstone DT ~55 us/ft; if within [35,120], assume us/ft -> convert to us/m
        med = np.nanmedian(dt)
        if 35 < med < 120:
            out["DT"] = dt * 3.28084  # us/ft → us/m

    if "RHOB" in out.columns:
        rh = out["RHOB"].astype(float)
        # If absurdly high, likely kg/m3 -> convert to g/cc
        if np.nanmedian(rh) > 10:
            out["RHOB"] = rh / 1000.0

    if "NPHI" in out.columns:
        nphi = out["NPHI"].astype(float)
        # If in percent, convert to fraction
        if np.nanmax(nphi) > 1.5:
            out["NPHI"] = nphi / 100.0

    return out


def clip_outliers(df: pd.DataFrame, cols, lower_q=0.01, upper_q=0.99) -> pd.DataFrame:
    out = df.copy()
    for c in cols:
        if c in out.columns:
            s = out[c].astype(float)
            lo, hi = s.quantile(lower_q), s.quantile(upper_q)
            out[c] = s.clip(lo, hi)
    return out


def interpolate_to_step(df: pd.DataFrame, step_m: float = 0.1524) -> pd.DataFrame:
    """Reindex to uniform depth sampling (default 0.5 ft ~ 0.1524 m)."""
    out = df.copy().sort_values("DEPTH")
    depth = out["DEPTH"].astype(float).values
    if len(depth) < 2:
        return out
    new_depth = np.arange(depth.min(), depth.max() + step_m, step_m)
    out = out.set_index("DEPTH").reindex(new_depth)
    out.index.name = "DEPTH"
    out = out.interpolate(method="index").reset_index()
    return out


## Petrophysical Calculations

In [None]:

def calc_vsh_gr(df: pd.DataFrame, gr_clean=None, gr_shale=None) -> pd.DataFrame:
    """Linear GR-based Vsh; if bounds not provided, infer from percentiles."""
    out = df.copy()
    if "GR" not in out.columns:
        return out
    gr = out["GR"].astype(float)
    gmin = np.nanpercentile(gr, 5) if gr_clean is None else gr_clean
    gmax = np.nanpercentile(gr, 95) if gr_shale is None else gr_shale
    vsh = (gr - gmin) / (gmax - gmin + 1e-9)
    out["VSH"] = vsh.clip(0, 1)
    return out


def calc_porosity_density_neutron(df: pd.DataFrame, rhob_ma=2.65, rhob_fl=1.0) -> pd.DataFrame:
    """Combine density- and neutron-derived porosities.

    Density porosity: phi_d = (rho_ma - rhob)/(rho_ma - rho_fl)

    Neutron porosity already in fraction. Final phi = mean(phi_d, nphi).
    """
    out = df.copy()
    if "RHOB" in out.columns:
        rhob = out["RHOB"].astype(float)
        phi_d = (rhob_ma - rhob) / (rhob_ma - rhob_fl + 1e-9)
        out["PHI_D"] = phi_d.clip(0, 0.45)
    if "NPHI" in out.columns:
        out["PHI_N"] = out["NPHI"].astype(float).clip(0, 0.45)
    if "PHI_D" in out.columns and "PHI_N" in out.columns:
        out["PHI_T"] = np.nanmean(out[["PHI_D","PHI_N"]].values, axis=1)
    elif "PHI_D" in out.columns:
        out["PHI_T"] = out["PHI_D"]
    elif "PHI_N" in out.columns:
        out["PHI_T"] = out["PHI_N"]
    return out


def calc_sw_archie(df: pd.DataFrame, a=1.0, m=2.0, n=2.0, Rw=0.035) -> pd.DataFrame:
    """Archie water saturation: Sw^n = (a*Rw)/(phi^m * Rt)"""
    out = df.copy()
    if "PHI_T" not in out.columns or "RT" not in out.columns:
        return out
    phi = out["PHI_T"].astype(float).clip(1e-4, 1.0)
    Rt = out["RT"].astype(float).clip(1e-4, None)
    F = a / (phi ** m)  # formation factor
    Sw_n = (F * Rw) / (Rt + 1e-9)
    Sw = np.power(Sw_n.clip(1e-6, 1e6), 1.0 / n)
    out["SW"] = Sw.clip(0, 1)
    return out


## Plotting Functions

In [None]:

def plot_composite_logs(df: pd.DataFrame, title="Composite Logs"):
    required = [c for c in ["GR","RT","RHOB","NPHI","DT"] if c in df.columns]
    n = max(1, len(required))
    fig, axes = plt.subplots(1, n, figsize=(2.8*n + 1, 8), sharey=True)
    if n == 1:
        axes = [axes]
    y = df["DEPTH"]
    for ax, c in zip(axes, required):
        ax.plot(df[c], y)
        ax.set_xlabel(c)
        ax.invert_yaxis()
        ax.grid(True, ls=':')
    axes[0].set_ylabel("Depth (m)") if n else None
    fig.suptitle(title)
    plt.show()


def plot_crossplots(df: pd.DataFrame):
    if "PHI_T" in df.columns and "SW" in df.columns:
        plt.figure(figsize=(5,5))
        plt.scatter(df["PHI_T"], df["SW"], s=4, alpha=0.6)
        plt.xlabel("Total Porosity (fraction)")
        plt.ylabel("Water Saturation (fraction)")
        plt.grid(True, ls=':')
        plt.title("Porosity vs. Sw")
        plt.show()
    if "GR" in df.columns and "VSH" in df.columns:
        plt.figure(figsize=(5,5))
        plt.scatter(df["GR"], df["VSH"], s=4, alpha=0.6)
        plt.xlabel("Gamma Ray")
        plt.ylabel("Vsh")
        plt.grid(True, ls=':')
        plt.title("GR vs. Vsh")
        plt.show()


## End-to-End Workflow Example

In [None]:

# === Set your input file here ===
# Supports .las or .csv. Example:
# input_path = "data/my_well.las"
# input_path = "data/my_well.csv"
input_path = None  # <- replace with your file path

if input_path:
    df = load_logs(input_path)
    df = normalize_units(df, depth_unit="m")
    df = clip_outliers(df, cols=[c for c in ["GR","RT","RHOB","NPHI","DT"] if c in df.columns])
    df = interpolate_to_step(df, step_m=0.1524)

    # Calculations
    df = calc_vsh_gr(df)
    df = calc_porosity_density_neutron(df)
    df = calc_sw_archie(df, **ARCHIE)

    # Plotting
    plot_composite_logs(df, title=f"Composite Logs — {Path(input_path).name}")
    plot_crossplots(df)

    # Export summaries
    summary_cols = [c for c in ["DEPTH","GR","RT","RHOB","NPHI","DT","VSH","PHI_T","SW"] if c in df.columns]
    out_csv = OUTPUT_DIR / (Path(input_path).stem + "_petrophysics.csv")
    df[summary_cols].to_csv(out_csv, index=False)
    print(f"Saved: {out_csv}")


else:
    print("Set 'input_path' to your LAS/CSV path and re-run this cell.")


## Quick Statistics

In [None]:

def quick_stats(df: pd.DataFrame):
    cols = [c for c in ["VSH","PHI_T","SW"] if c in df.columns]
    if not cols:
        print("Run the workflow first.")
        return
    stats = df[cols].describe().T
    print(stats)

# Example:
# quick_stats(df)
