## L calculations using the formula $L = n \cdot a_3$, where n are integers starting with 0 and $a_3$ is $a_3 = \frac{\lambda}{2(\sin \theta_2 - \sin \theta_1)}$, using the same values for theta limits

In [None]:
import pandas as pd
# Assume calculate_common_L_grid is in the same file or imported

def compute_common_L_grid(
    df: pd.DataFrame,
    lambda_nm: float = 0.01249, # wavelength value, should be changed for your case
    two_theta1_deg: float = 3.0, # lower limit of your xrd pattern
    two_theta2_deg: float = 7.5, # upper limit of your xrd pattern
    n_max: int = 60
) -> pd.DataFrame:
    """
    Computes a common L-grid and broadcasts the values to all rows of a DataFrame.

    This function acts as a wrapper around `calculate_common_L_grid`
    to apply its results to a pandas DataFrame.

    Args:
        df: The input DataFrame.
        lambda_nm: Wavelength in nanometers.
        two_theta1_deg: The first 2θ gate in degrees.
        two_theta2_deg: The second 2θ gate in degrees.
        n_max: The maximum integer 'n' for the grid.

    Adds to DataFrame
    -----------------
    - 'a3_nm' (float): The calculated 'a3' spacing, same for all rows.
    - 'L_common_nm' (list[float]): The L-grid list [n*a3], same for all rows.

    Returns:
        A copy of the input DataFrame with 'a3_nm' and 'L_common_nm' columns added.
    """
    # 1. Perform the core calculation
    a3_nm, L_list = calculate_common_L_grid(
        lambda_nm=lambda_nm,
        two_theta1_deg=two_theta1_deg,
        two_theta2_deg=two_theta2_deg,
        n_max=n_max
    )

    # 2. Apply to the DataFrame
    out = df.copy()
    out['a3_nm'] = a3_nm

    # Create a *new* list for each row to prevent mutation issues
    # (This matches your original, safe implementation)
    out['L_common_nm'] = [list(L_list) if L_list is not None else None for _ in range(len(out))]

    return out

## Calculating A(L)

In [None]:
import os
import glob
import numpy as np
import pandas as pd

# ---------------------- Helpers ----------------------

def _uniform_q_and_intensity(theta_rad, inten, theta1_rad, theta2_rad, thetaB_rad, lambda_nm):
    """
    Slice by [theta1, theta2], convert to k and recenter to q=k-kB.
    Return uniform q in a symmetric window [-Q, Q] and baseline-corrected, area-normalized intensity.
    """
    # 1) Slice the peak window in theta (radians)
    mask = (theta_rad >= theta1_rad) & (theta_rad <= theta2_rad)
    if not np.any(mask):
        return None, None

    th = theta_rad[mask]
    I = inten[mask]

    # 2) Convert to k (1/nm) and recenter to q = k - kB
    k  = 2.0 * np.sin(th) / lambda_nm
    kB = 2.0 * np.sin(thetaB_rad) / lambda_nm
    q = k - kB

    # 3) Symmetric window around q=0: [-Q, Q]
    Q = min(np.max(q), -np.min(q))
    if not np.isfinite(Q) or Q <= 0:
        return None, None
    keep = (q >= -Q) & (q <= Q)
    q = q[keep]
    I = I[keep]

    # 4) Interpolate to a *uniform* q-grid, enforce an odd length so 0 is included
    if q.size < 3:
        return None, None
    N = max(3, int(q.size) | 1)  # make odd
    q_uniform = np.linspace(-Q, Q, N)
    I_uniform = np.interp(q_uniform, q, I)

    # 5) Baseline: straight line through the two ends -> subtract -> clip ≥ 0
    I0, I1   = I_uniform[0], I_uniform[-1]
    baseline = I0 + (I1 - I0) * (q_uniform - q_uniform[0]) / (q_uniform[-1] - q_uniform[0])
    I_corr   = I_uniform - baseline
    I_corr[I_corr < 0] = 0.0

    # 6) Area-normalize so ∑ I dq = 1  → ensures A(0)=1
    dq   = q_uniform[1] - q_uniform[0]
    area = np.sum(I_corr) * dq
    if area <= 0 or not np.isfinite(area):
        return None, None
    I_norm = I_corr / area

    return q_uniform, I_norm


def _A_on_L(q, I_norm, L_nm):
    """
    Compute A(L) = ∫ I(q) cos(2π L q) dq ≈ sum I(q) cos(2π L q) dq for L in nm, q in 1/nm.
    Returns (A_list, lnA_list) where lnA is NaN wherever A <= 0.
    """
    if q is None or I_norm is None or len(L_nm) == 0:
        return None, None

    q      = np.asarray(q, float)
    I_norm = np.asarray(I_norm, float)
    L_arr  = np.asarray(L_nm, float)
    dq     = q[1] - q[0]

    # Broadcast over L: cos(2π L q)
    cos_matrix = np.cos(2.0 * np.pi * np.outer(L_arr, q))
    A = cos_matrix @ (I_norm * dq)  # (len(L),)

    # Numerical safety: allow zeros, set lnA to NaN where A <= 0
    A = np.where(A < 0, 0.0, A)
    lnA = np.where(A > 0, np.log(A), np.nan)

    return A.tolist(), lnA.tolist()


# ---------------------- Main ----------------------

def compute_A_for_all_peaks_commonL(
    df: pd.DataFrame,
    dat_dir: str,
    lambda_nm: float = 0.01249,
    # If L_common_nm is missing, we will build it from these 2θ gates:
    two_theta1_deg: float = 3.0,
    two_theta2_deg: float = 7.5,
    n_max: int = 60,
    # Optional raw scan truncation (2θ ≤ tth_max_deg). Set to None to disable.
    tth_max_deg: float | None = 7.5,
) -> pd.DataFrame:
    """
    Compute A(L) and ln A(L) for peaks 1..4 on a single *shared* L-grid.

    Inputs
    ------
    df : DataFrame with one row per scan. Required columns:
      - x{i}                (2θ center in degrees) for i=1..4
      - Option A (preferred): per-peak windows in radians
          theta1_p{i}_rad, theta2_p{i}_rad
        OR
      - Option B (fallback): none of the above → we use common gates:
          2θ ∈ [two_theta1_deg, two_theta2_deg] for every peak.

      - Optional: L_common_nm (list of floats). If present, it is used directly.
        If absent, it is created from:
          a3 = λ / [2 (sin(θ2) - sin(θ1))],  with θ = (2θ)/2
          L_n = n * a3 for n=0..n_max

    dat_dir : folder with one .dat per row, sorted lexicographically and aligned
              to df order. Each .dat has two columns: "2θ(deg)  I".

    Outputs (added columns)
    -----------------------
      - 'a3_nm' (float, only if L_common_nm was created here)
      - 'L_common_nm' (list[float])  # the shared L-grid (same list in all rows)
      - 'A_p{i}_list', 'lnA_p{i}_list' for i=1..4 (lists evaluated on L_common_nm)
    """
    out = df.copy()

    # --- 0) Ensure or create the shared L-grid --------------------------------
    if 'L_common_nm' in out.columns and isinstance(out['L_common_nm'].iat[0], (list, np.ndarray)):
        L_common = np.asarray(out['L_common_nm'].iat[0], float)
    else:
        theta1_rad = np.deg2rad(two_theta1_deg / 2.0)
        theta2_rad = np.deg2rad(two_theta2_deg / 2.0)
        delta = np.sin(theta2_rad) - np.sin(theta1_rad)
        if delta == 0:
            raise ValueError("sin(theta2) - sin(theta1) == 0; cannot build common L-grid.")
        a3_nm = lambda_nm / (2.0 * abs(delta))
        n = np.arange(0, n_max + 1, dtype=int)
        L_common = n * a3_nm
        out['a3_nm'] = a3_nm
        out['L_common_nm'] = [L_common.tolist() for _ in range(len(out))]

    # --- 1) Collect and align .dat files --------------------------------------
    files = sorted(glob.glob(os.path.join(dat_dir, "*.dat")))
    if len(files) < len(out):
        raise ValueError(f"Found {len(files)} .dat files, but dataframe has {len(out)} rows.")
    files = files[:len(out)]

    # Prepare output columns
    for p in range(1, 5):
        out[f"A_p{p}_list"] = None
        out[f"lnA_p{p}_list"] = None

    # --- 2) Row-wise processing ------------------------------------------------
    for idx, path in enumerate(files):
        # Load dat file: first two columns -> tth_deg, I_raw
        dat = pd.read_csv(
            path, sep=r"\s+", header=None, comment="#",
            names=["tth_deg", "I_raw"], engine="python"
        )
        if tth_max_deg is not None:
            dat = dat[dat["tth_deg"] <= float(tth_max_deg)]
        if dat.empty:
            continue

        theta_rad_all = np.deg2rad(dat["tth_deg"].to_numpy(dtype=float) / 2.0)
        I_all = dat["I_raw"].to_numpy(dtype=float)

        for p in range(1, 5):
            # Peak center (2θ deg) -> θB (rad)
            if f"x{p}" not in out.columns:
                continue
            x2theta_deg = out.at[idx, f"x{p}"]
            if not np.isfinite(x2theta_deg):
                continue
            thetaB_rad = np.deg2rad(x2theta_deg / 2.0)

            # Peak window: prefer per-peak theta1/theta2 if present, else fallback to common gates
            if f"theta1_p{p}_rad" in out.columns and f"theta2_p{p}_rad" in out.columns:
                th1 = out.at[idx, f"theta1_p{p}_rad"]
                th2 = out.at[idx, f"theta2_p{p}_rad"]
                if not (np.isfinite(th1) and np.isfinite(th2)):
                    th1 = np.deg2rad(two_theta1_deg / 2.0)
                    th2 = np.deg2rad(two_theta2_deg / 2.0)
            else:
                th1 = np.deg2rad(two_theta1_deg / 2.0)
                th2 = np.deg2rad(two_theta2_deg / 2.0)

            q_uniform, I_norm = _uniform_q_and_intensity(
                theta_rad_all, I_all, th1, th2, thetaB_rad, lambda_nm
            )
            if q_uniform is None:
                continue

            A_list, lnA_list = _A_on_L(q_uniform, I_norm, L_common)
            out.at[idx, f"A_p{p}_list"] = A_list
            out.at[idx, f"lnA_p{p}_list"] = lnA_list

    return out


## logA vs K^2C -> slope is -ML^2ln(R_e/L), intercept is logA_s(L)

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# ------------------------
# User inputs / constants
# ------------------------
# wavelength in nm (example: 99.3 keV -> ~0.012486 nm). Change to yours.
WAVELENGTH_NM = 0.012486  # [nm]

# Mean contrast factors per peak (order must match x1..x4 and A_p1_list..A_p4_list)
C_MEAN = np.array([0.141, 0.28485, 0.14105, 0.141], dtype=float)
"""
The contrast value C_MEAN is set to a fixed value, just to test the script. It should be calculated and fitted for temperature as shown in
modified Williamson-Hall method. Contrast factor values can be calculated from ANIZC website.
"""

# Column names (change if yours differ)
X_COLS = ('x1', 'x2', 'x3', 'x4')  # peak centers in 2θ degrees
A_COLS = ('A_p1_list', 'A_p2_list', 'A_p3_list', 'A_p4_list')  # lists of A(L) per peak
LCOL   = 'L_common_nm'  # shared L-grid column (list of floats, same for all rows)


# ------------------------
# Helpers
# ------------------------
def two_theta_deg_to_K(two_theta_deg, lambda_nm):
    """
    Convert 2θ (degrees) -> K [1/nm]
    K = 2 * sin(theta) / lambda, with theta = (2θ)/2 in radians.
    """
    theta_rad = 0.5 * np.deg2rad(two_theta_deg)
    return 2.0 * np.sin(theta_rad) / lambda_nm


def wa_inner_fits_for_row(
    row,
    wavelength_nm,
    c_mean,
    x_cols,
    a_cols,
    lcol=LCOL,
    # --- plotting controls ---
    save_plots=True,
    output_dir="wa_inner_plots",
    save_every_n=500,
    dpi=140
):
    """
    For a single row:
      - Read the shared L-grid from row[lcol].
      - Compute K_j and X_j = K_j^2 * C_j  (fixed across L)
      - For each L_k, fit log A_j(L_k) vs X_j across peaks j=1..4
          * quadratic polyfit (deg=2) -> use constant & linear terms (a0, a1)
          * strict linear fit (deg=1)
      - Save every `save_every_n`-th plot with both fits overlaid (if save_plots=True).
    Returns a dict of arrays aligned with the per-row L grid.
    """
    # Ensure output directory exists if plotting
    if save_plots:
        os.makedirs(output_dir, exist_ok=True)

    # --- 0) Get the shared L-grid from the row ---
    if lcol not in row or row[lcol] is None:
        raise ValueError(f"Missing shared L grid column '{lcol}' in row {row.name}.")
    L_grid = np.asarray(row[lcol], dtype=float)
    if L_grid.ndim != 1 or L_grid.size == 0:
        raise ValueError(f"Row {row.name}: '{lcol}' must be a non-empty 1D list/array.")

    # --- 1) Build X_j = K^2 * C_mean (j=1..4), fixed over L ---
    two_theta = np.array([row[col] for col in x_cols], dtype=float)  # [deg]
    if not np.all(np.isfinite(two_theta)):
        raise ValueError(f"Row {row.name}: non-finite 2θ center in {x_cols}.")
    if len(c_mean) != len(x_cols):
        raise ValueError("C_MEAN length must match number of peaks.")
    K = two_theta_deg_to_K(two_theta, wavelength_nm)                 # [1/nm]
    X = (K**2) * c_mean                                              # [1/nm^2] * (dimensionless)

    # --- 2) Get A_j(L) lists, validate lengths vs L_grid ---
    A_lists = []
    for idx_p, col in enumerate(a_cols, start=1):
        if col not in row or row[col] is None:
            raise ValueError(f"Row {row.name}: missing A(L) list in column '{col}'.")
        Aj = np.asarray(row[col], dtype=float)
        if Aj.ndim != 1 or Aj.size != L_grid.size:
            raise ValueError(
                f"Row {row.name}: A_p{idx_p}_list length ({Aj.size}) != L_common length ({L_grid.size})."
            )
        A_lists.append(Aj)

    nL = L_grid.size

    # --- 3) Prepare outputs ---
    m_poly2 = np.full(nL, np.nan, dtype=float)  # slope from quadratic fit (linear term)
    c_poly2 = np.full(nL, np.nan, dtype=float)  # intercept from quadratic fit (constant term)
    m_lin   = np.full(nL, np.nan, dtype=float)  # slope from strict linear fit
    c_lin   = np.full(nL, np.nan, dtype=float)  # intercept from strict linear fit

    # File-safe row ID
    row_id = str(row.name).replace(os.sep, "_").replace(" ", "_")

    # --- 4) Loop over L_k and fit across peaks (j) ---
    for k in range(nL):
        # y_j = log A_j(L_k); require all finite and > 0
        y_vals = np.array([Aj[k] for Aj in A_lists], dtype=float)
        if (y_vals <= 0).any() or (~np.isfinite(y_vals)).any():
            # invalid for log; leave NaNs
            continue
        y = np.log(y_vals)
        x = X

        # Quadratic polyfit: y ~ a2*x^2 + a1*x + a0
        coeff2 = np.polyfit(x, y, deg=2)
        a2, a1, a0 = coeff2
        m_poly2[k] = a1
        c_poly2[k] = a0

        # Strict linear fit: y ~ m*x + c
        m1, c1 = np.polyfit(x, y, deg=1)
        m_lin[k] = m1
        c_lin[k] = c1

        # ---- Save every Nth plot (overlay data + both fits) ----
        if save_plots and (k % save_every_n == 0):
            x_fit = np.linspace(x.min(), x.max(), 200)
            y_fit_lin = m1 * x_fit + c1
            y_fit_quad = (a2 * x_fit**2) + (a1 * x_fit) + a0

            plt.figure(figsize=(6.5, 4.5))
            plt.scatter(x, y, label=r'$\log A_j(L_k)$ data', zorder=3)
            plt.plot(x_fit, y_fit_lin, label='Linear fit', linewidth=1.6)
            plt.plot(x_fit, y_fit_quad, label='Quadratic fit', linewidth=1.6)
            plt.xlabel(r'$X = K^2 C$  [1/nm$^2$]')
            plt.ylabel(r'$\log A_j(L)$')
            plt.title(f'Row {row_id} | L[{k}] = {L_grid[k]:.4f} nm')
            plt.grid(True, linestyle='--', alpha=0.35)
            plt.legend()
            plt.tight_layout()

            fname = os.path.join(
                output_dir,
                f"row{row_id}_Lidx{k:05d}_Lnm{L_grid[k]:.4f}.png"
            )
            plt.savefig(fname, dpi=dpi)
            plt.close()

    return {
        'L_grid': np.array(L_grid, dtype=float),  # [nm]
        'm_poly2': m_poly2,  # slope at each L_k (quadratic fit's linear term)
        'c_poly2': c_poly2,  # intercept at each L_k (quadratic fit's constant)
        'm_lin': m_lin,      # slope at each L_k (strict linear fit)
        'c_lin': c_lin,      # intercept at each L_k (strict linear fit)
        'K_values': K,       # per-peak K [1/nm]
        'X_K2C': X,          # per-peak X = K^2 * C_mean
    }


# ------------------------
# Apply to a DataFrame `df`
# (Assumes `df` has columns: x1..x4, A_p1_list..A_p4_list, and L_common_nm)
# ------------------------
def compute_wa_inner_fits(
    df: pd.DataFrame,
    save_plots=True,
    output_dir="wa_inner_plots",
    save_every_n=500,
    dpi=140
) -> pd.DataFrame:
    """
    Adds these columns per row:
      - 'L_grid' : np.ndarray of L values [nm] (copied from 'L_common_nm')
      - 'm_poly2', 'c_poly2', 'm_lin', 'c_lin' : np.ndarray per row, aligned with L_grid
      - 'K_values', 'X_K2C' : diagnostic arrays per row
    Also saves every `save_every_n`th plot with both fits, if save_plots=True.
    """
    # Quick pre-checks
    missing = [col for col in (*X_COLS, *A_COLS, LCOL) if col not in df.columns]
    if missing:
        raise KeyError(f"Missing required columns: {missing}")

    results = df.apply(
        lambda row: pd.Series(
            wa_inner_fits_for_row(
                row,
                wavelength_nm=WAVELENGTH_NM,
                c_mean=C_MEAN,
                x_cols=X_COLS,
                a_cols=A_COLS,
                lcol=LCOL,
                save_plots=save_plots,
                output_dir=output_dir,
                save_every_n=save_every_n,
                dpi=dpi
            )
        ),
        axis=1
    )

    # Concatenate results back
    for col in ['L_grid', 'm_poly2', 'c_poly2', 'm_lin', 'c_lin', 'K_values', 'X_K2C']:
        df[col] = results[col]
    return df


## Cut-off radius ($R_e$) and dislcoation density ($\rho$) calculation

In [None]:
import numpy as np
import pandas as pd
from scipy.optimize import curve_fit

# -----------------------------
# WA outer fit on Y(L) vs L
#   Y(L) = -m(L)/L^2
#   Model: Y(L) = M * ln(Re / L)
#   Then:  rho = 2M / (pi*b^2)
# Keep L in nm, b in nm -> rho in nm^-2 (×1e18 for m^-2)
# -----------------------------

B_NM_DEFAULT = 0.248  # Burgers vector (nm)

def _model_Y(L, M, Re):
    return M * np.log(Re / L)

def _initial_guess_Y(L, Y):
    Lmax = float(np.max(L))
    Re0 = 5.0 * Lmax  # start with Re well above data range
    denom = np.log(Re0 / L)
    denom[~np.isfinite(denom) | (denom <= 0)] = np.nan
    M0 = np.nanmedian(Y / denom)
    if not np.isfinite(M0) or M0 <= 0:
        M0 = 1e-4
    return M0, Re0

def _fit_Y_vs_L(L, m_series, b_nm=B_NM_DEFAULT,
                require_positive_Y=True, L_min=None, L_max=None):
    """
    Fits Y(L) = -m(L)/L^2 to Y = M * ln(Re/L).
    Returns (rho_nm^-2, Re_nm, R2).
    """
    L = np.asarray(L, dtype=float)
    m = np.asarray(m_series, dtype=float)

    mask = np.isfinite(L) & np.isfinite(m) & (L > 0)
    if L_min is not None:
        mask &= (L >= float(L_min))
    if L_max is not None:
        mask &= (L <= float(L_max))
    if mask.sum() < 4:
        return np.nan, np.nan, np.nan

    L_use = L[mask]
    Y = -m[mask] / (L_use**2)

    if require_positive_Y:
        keep = np.isfinite(Y) & (Y > 0)
    else:
        keep = np.isfinite(Y)
    L_use, Y = L_use[keep], Y[keep]
    if L_use.size < 4:
        return np.nan, np.nan, np.nan

    # Initial guesses & bounds
    M0, Re0 = _initial_guess_Y(L_use, Y)
    lower = [0.0, 1.01 * float(np.max(L_use))]     # M>=0, Re > max L
    upper = [np.inf, 1e6 * float(np.max(L_use))]

    try:
        popt, _ = curve_fit(
            _model_Y, L_use, Y, p0=[M0, Re0],
            bounds=(lower, upper), maxfev=20000
        )
        M, Re = popt
        Yfit = _model_Y(L_use, M, Re)
        ss_res = np.sum((Y - Yfit)**2)
        ss_tot = np.sum((Y - np.mean(Y))**2)
        R2 = 1.0 - ss_res/ss_tot if ss_tot > 0 else np.nan

        rho_nm2 = (2.0 * M) / (np.pi * (b_nm**2))
        return rho_nm2, float(Re), R2
    except Exception:
        return np.nan, np.nan, np.nan

def wa_outer_fit_Y_vs_L_YvsLcols(df, b_nm=B_NM_DEFAULT,
                                 require_positive_Y=True, L_min=None, L_max=None):
    """
    Applies the Y vs L (nonlinear) WA outer fit to both m_lin and m_poly2.
    Saves to NEW columns that make it explicit this fit uses L (not ln L):
      - 'rho_mlin_nm^-2_YvsL',   'Re_mlin_nm_YvsL',   'r2_mlin_YvsL'
      - 'rho_mpoly2_nm^-2_YvsL', 'Re_mpoly2_nm_YvsL', 'r2_mpoly2_YvsL'
    """
    def _row_fit(row):
        out = {}

        rho1, Re1, R21 = _fit_Y_vs_L(
            row['L_grid'], row['m_lin'], b_nm=b_nm,
            require_positive_Y=require_positive_Y, L_min=L_min, L_max=L_max
        )
        out['rho_mlin_nm^-2_YvsL'] = rho1
        out['Re_mlin_nm_YvsL']     = Re1
        out['r2_mlin_YvsL']        = R21

        rho2, Re2, R22 = _fit_Y_vs_L(
            row['L_grid'], row['m_poly2'], b_nm=b_nm,
            require_positive_Y=require_positive_Y, L_min=L_min, L_max=L_max
        )
        out['rho_mpoly2_nm^-2_YvsL'] = rho2
        out['Re_mpoly2_nm_YvsL']     = Re2
        out['r2_mpoly2_YvsL']        = R22

        return pd.Series(out)

    results = df.apply(_row_fit, axis=1)
    return pd.concat([df, results], axis=1)
