# K Index Calculation

In [None]:
import datetime as dt
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pooch
from viresclient import SwarmRequest
import ipywidgets as widgets

from warnings import filterwarnings
filterwarnings(action="ignore")

# Data dependencies (pooch caches this in ~/.cache/pooch/)
esk_k_ind_file = pooch.retrieve(
    "https://raw.githubusercontent.com/MagneticEarth/IAGA_SummerSchool2019/master/data/external/k_inds/esk/2003.esk",
    known_hash="233246e167a212cd1afa33ff2fe130fbc308cd2ae7971c6c2afcd363c9775c18"
)

## Calculating K-indices for a single observatory

The K-index is a global geomagnetic activity index devised by Julius Bartels in 1938 to give a simple measure of the degree of geomagnetic disturbance during each 3-hour (UT) interval. It uses data from ground-based magnetometers to assign a number in the range 0-9 to interval, with K=0 indicating very little geomagnetic activity and K=9 representing an extreme geomagnetic storm. The K-index was introduced at the time of photographic recording, when magnetograms recorded variations in the horizontal geomagnetic field elements declination (D) and horizontal intensity (H), and in the vertical intensity (Z). To derive a K-index an observer would fit, by eye, a quiet-time curve to the records of D and H and measure the range (maximum-minimum) of the deviation of the recording from the curve. The K-index was then assigned according to a conversion table, with the greatest range in D and H 'winning'. The north component (X) may be used instead of H, and the east component (Y) instead of D (X and Y will be used in the examples below). The vertical component Z is not used because it is liable to contamination by local induced currents.

The conversion from range in nanoteslas to index is quasi-logarithmic. The conversion table varies with latitude in an attempt to normalise the K-index distribution for observatories at different latitudes. The table for Eskdalemuir is shown below.

| K | 0 | 1 |2|3|4|5|6|7|8|9|
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
| Lower bound (nT) | 0 | 8 |15|30|60|105|180|300|500|750|

This means that, for instance, K=2 if the measured range is in the interval \[15, 30\] nT.

There was a VERY long debate in IAGA Division V about algorithms that could adequately reproduce the K-indices that an experienced observer would assign. The algorithms and code approved by IAGA are available at the International Service for Geomagnetic Indices: <http://isgi.unistra.fr/softwares.php>. (Also <http://isgi.unistra.fr/what_are_kindices.php>)

### Example

In the following cells, we **illustrate** the process. A simple approach is to assume the so-called regular daily variation $S_R$ is made up of 24h, 12h and 8h signals (and, possibly, higher harmonics). A Fourier analysis can be used to investigate this. The functions in the cell below calculate Fourier coefficients from a data sample, and then synthesise a Fourier series using the coefficients.

Some days this simple approach seems to work well, on others it's obviously wrong. Think about another approach you might take.

We will then attempt to calculate K-indices for the day selected by computing the Fourier series up the the number of harmonics selected by subtracting the synthetic harmonic signal from the data, then calculating 3-hr ranges and converting these into the corresponding K-index. The functions to do are also in the following cell.

In [None]:
def fourier(v, nhar):
    npts    = len(v)
    f       = 2.0/npts
    t       = np.linspace(0, npts, npts, endpoint=False)*2*np.pi/npts
    vmn     = np.mean(v)
    v       = v - vmn
    cofs    = [0]*(nhar+1)
    cofs[0] = (vmn,0)
    for i in range(1,nhar+1):
        c, s    = np.cos(i*t), np.sin(i*t)
        cofs[i] = (np.dot(v,c)*f, np.dot(v,s)*f)
    return (cofs)

def fourier_synth(cofs, npts):
    nt  = len(cofs)
    syn = np.zeros(npts)
    t  = np.linspace(0, npts, npts, endpoint=False)*2*np.pi/npts
    for n in range(1, nt):
        for j in range(npts):
            syn[j] += cofs[n][0]*np.cos(n*t[j]) + cofs[n][1]*np.sin(n*t[j])
    return (syn)

# Define K-index conversion table for ESK
K_conversions = {
    f"K{level}": level_bin 
    for level, level_bin in enumerate(
        (0, 8, 15, 30, 60, 105, 180, 300, 500, 750)
    )
}
# Define reverse mapping
nT_to_K = {v: k for k, v in K_conversions.items()}

def K_calc(d, synd, Kb=K_conversions):
    tmp = np.ptp((d-synd).reshape(8,180), axis=1)
    return(list(np.digitize(tmp, bins=list(Kb.values()), right=False)-1))

def load_official_K(filepath=esk_k_ind_file):
    df = pd.read_csv(filepath, skiprows=0, header=None, delim_whitespace=True,  
                     parse_dates=[[2,1,0]], index_col=0)
    df = df.drop(3, axis=1)
    df.index.name='Date'
    df.columns = ['00','03','06','09','12','15','18','21']
    return(df)

def load_ESK_2003():
    request = SwarmRequest()
    request.set_collection(f"SW_OPER_AUX_OBSM2_:ESK", verbose=False)
    request.set_products(measurements=["B_NEC", "IAGA_code"])
    data = request.get_between(
        dt.datetime(2003, 1, 1),
        dt.datetime(2004, 1, 1),
    )
    df = data.as_dataframe(expand=True).drop(
        columns=["Spacecraft"]
    )
    df = df.rename(columns={f"B_NEC_{i}": j for i, j in zip("NEC", "XYZ")})
    return df

First, load in (X, Y, Z) one-minute data from Eskdalemuir for 2003 into a pandas dataframe.

In [None]:
df_obs = load_ESK_2003()
df_obs.head()

Load the official K index data (available from <http://www.geomag.bgs.ac.uk/data_service/data/magnetic_indices/k_indices>) to compare with later.

In [None]:
df_K_official = load_official_K()
df_K_official.head()

Evaluate K indices for a given day:
- For each of $X$ and $Y$:
- Perform a Fourier analysis on the data to find the regular daily variation, $S_R$
- Over each 3-hour interval, find the maximum differences from $S_R$
- Convert from nT to $K$ using the conversion table for ESK
- Pick the greater of $K(X)$ and $K(Y)$ and compare with the official K index

In [None]:
def analyse_day(day=dt.date(2003, 1, 1), n_harmonics=3, df=df_obs, df_K_official=df_K_official):
    """Generate figure illustrating the K index calculation for a given day"""
    # Select given day
    _df = df.loc[day.isoformat()]
    _df_K = df_K_official.loc[day.isoformat()]
    # Select X & Y data and remove daily mean
    x = (_df["X"] - _df["X"].mean()).values
    y = (_df["Y"] - _df["Y"].mean()).values
    # Perform Fourier analysis of X & Y separately
    xcofs = fourier(x, n_harmonics)
    synx  = fourier_synth(xcofs, len(x))
    ycofs = fourier(y, n_harmonics)
    syny  = fourier_synth(ycofs, len(y))

    # Build plot
    t = np.linspace(0, 1440, 1440, endpoint=False)/60
    fig, axes = plt.subplots(2, 1, figsize=(15, 10), sharex=True)
    # Plot X & Y data with approximated variation
    axes[0].plot(t, x, color="tab:blue", alpha=0.5)
    axes[0].plot(t, synx, color="tab:blue", label="X")
    axes[0].plot(t, y, color="tab:red", alpha=0.5)
    axes[0].plot(t, syny, color="tab:red", label="Y")
    # Plot the differences
    axes[1].plot(t, (x-synx), color="tab:blue")
    axes[1].plot(t, (y-syny), color="tab:red")

    # Find and plot min/max bounds over 3-hourly intervals
    minX = np.min((x-synx).reshape(8, 180), axis=1)
    maxX = np.max((x-synx).reshape(8, 180), axis=1)
    minY = np.min((y-syny).reshape(8, 180), axis=1)
    maxY = np.max((y-syny).reshape(8, 180), axis=1)
    t_3hours = np.linspace(0, 1440, 9, endpoint=True)/60
    axes[1].fill_between(t_3hours, list(minX)+[0], list(maxX)+[0], step="post", color="tab:blue", alpha=0.5)
    axes[1].fill_between(t_3hours, list(minY)+[0], list(maxY)+[0], step="post", color="tab:red", alpha=0.5)
    
    # Determine K index from each of X & Y
    K_X = np.digitize((maxX-minX), bins=list(K_conversions.values()), right=False) - 1
    K_Y = np.digitize((maxY-minY), bins=list(K_conversions.values()), right=False) - 1
    for i, (K_X_i, K_Y_i) in enumerate(zip(K_X, K_Y)):
        # Display determined K from X & Y
        px = i*3
        py = axes[1].get_ylim()[1]
        axes[1].annotate(
            f"K(X): {K_X_i}", (px, py), xytext=(30, 18),
            textcoords="offset pixels", color="tab:blue", size=12,
        )
        axes[1].annotate(
            f"K(Y): {K_Y_i}", (px, py), xytext=(30, 3),
            textcoords="offset pixels", color="tab:red", size=12,
        )
        # Display comparison with the official K index
        K_ours = max(K_X_i, K_Y_i)
        K_official = _df_K[i]
        axes[1].annotate(
            f"{K_ours}\n{K_official}",
            (i*3, axes[1].get_ylim()[0]), xytext=(40, -70), textcoords="offset pixels"
        )
    axes[1].annotate(
        f"Determined K:\nOfficial K:",
        (0, axes[1].get_ylim()[0]), xytext=(-80, -70), textcoords="offset pixels"
    )

    # Finalise figure
    for ax in axes:
        ax.grid()
        ax.xaxis.set_ticks(np.arange(0, 27, 3))
    axes[1].set_ylabel("Residuals [nT]")
    axes[1].set_xlabel("UT [hour]")
    axes[0].set_ylabel("[nT]")
    axes[0].legend(loc="upper right")
    fig.suptitle(f"ESK: {day.isoformat()}", y=0.9)

    return fig, axes

def make_widgets_K_index_calc():
    day = widgets.SelectionSlider(
        options=[t.date() for t in pd.date_range(dt.date(2003, 1, 1), dt.date(2003, 12, 31))],
        description="Select day:", layout=widgets.Layout(width='700px')
    )
#     day = widgets.DatePicker(value=dt.date(2003, 1, 1), description="Select day:")
    n_harmonics = widgets.SelectionSlider(options=range(1, 11), value=3, description="# harmonics:")
    return widgets.VBox(
        [day,
         n_harmonics,
         widgets.interactive_output(
             analyse_day,
             {"day": day, "n_harmonics": n_harmonics}
         )],
    )

make_widgets_K_index_calc()

## Statistics of the K index

We will use the official K index from ESK to probe some statistics through the year 2003.

Histograms of the K indices for each 3-hour period:

In [None]:
axes = df_K_official.hist(
    figsize=(12, 12), bins=range(11), sharey=True, align="left", rwidth=0.8,
)
plt.suptitle('ESK 2003: Distribution of K-indices for each 3-hour interval')
axes[-1, 0].set_ylabel("Frequency")
axes[-1, 0].set_xlabel("K");

... plotted side by side:

In [None]:
plt.figure(figsize=(7,7))
plt.hist(df_K_official.values, bins=range(11), align='left')
plt.legend(df_K_official.columns)
plt.ylabel('Number of 3-hour intervals')
plt.xlabel('K');

... and stacked together:

In [None]:
plt.figure(figsize=(7,7))
plt.hist(df_K_official.values, bins=range(11), stacked=True, align='left', rwidth=0.8)
plt.legend(df_K_official.columns)
plt.ylabel('Number of 3-hour intervals')
plt.xlabel('K');

We also compute a daily sum of the K-indices for the 2003 file, and list days with high and low summed values. Note that this summation is not really appropriate because the K-index is quasi-logarithmic, however, this is a common simple measure of quiet and disturbed days. (These might be interesting days for you to look at.)

In [None]:
df_K_official['Ksum'] = df_K_official.sum(axis=1)
Ksort = df_K_official.sort_values('Ksum')
print('Quiet days: \n\n', Ksort.head(10), '\n\n')
print('Disturbed days: \n\n', Ksort.tail(10))

## Note on the Fast Fourier Transform

In the examples above we computed Fourier coefficients in the 'traditional' way, so that if $F(t)$ is a Fourier series representation of $f(t)$, then,

$$
\begin{align}
F(t) &= A_o+\sum_{n=1}^N A_n \cos\left(\frac{2\pi nt}{T}\right)+B_n \sin\left(\frac{2\pi nt}{T}\right)
\end{align}
$$

where $T$ is the fundamental period of $F(t)$. The $A_n$ and $B_n$ are estimated by

$$
\begin{align}
A_o&=\frac{1}{T}\int_0^T f(t) dt\\
A_n&=\frac{2}{T}\int_0^T f(t)\cos\left(\frac{2\pi nt}{T}\right) dt\\
B_n&=\frac{2}{T}\int_0^T f(t)\sin\left(\frac{2\pi nt}{T}\right) dt
\end{align}
$$

With $N$ samples of digital data, the integral for $A_n$ may be replaced by the summation

$$
\begin{align}
A_n&=\frac{2}{T}\sum_{j=0}^{N-1} f_j\cos\left(\frac{2\pi nj\Delta t}{T}\right) \Delta t\\
&=\frac{2}{N}\sum_{j=0}^{N-1} f_j\cos\left(\frac{2\pi nj}{N}\right)
\end{align}
$$

where the sampling interval $\Delta t$ is given by $T = N \Delta t$ and $f_j = f(j \Delta t)$. A similar expression applies for the $B_n$, and these are the coefficients returned by the function _fourier_ above.

The fast Fourier transform (FFT) offers a computationally efficient means of finding the Fourier coefficients. The conventions for the FFT and its inverse (IFFT) vary from package to package. In the _scipy.fftpack_ package, the FFT of a sequence $x_n$ of length $N$ is defined as

$$
\begin{align}
y_k&=\sum_{n=0}^{N-1} x_n\exp\left(-\frac{2\pi i\thinspace kn}{N}\right)\\
&=\sum_{n=0}^{N-1} x_n\left(\cos\left(\frac{2\pi \thinspace kn}{N}\right)-i\sin\left(\frac{2\pi \thinspace kn}{N}\right)\right)
\end{align}
$$

with the inverse defined as,

$$
\begin{align}
x_n&=\frac{1}{N}\sum_{k=0}^{N-1} y_k\exp\left(\frac{2\pi i\thinspace kn}{N}\right)\\
\end{align}
$$

The _scipy_ documentation is a little inconsistent here because it explains the order of the $y_n$ as being $y_1,y_2, \dots y_{N/2-1}$ as corresponding to increasing positive frequency and $y_{N/2}, y_{N/2+1}, \dots y_{N-1}$ as ordered by decreasing negative frequency, for $N$ even. (See: <https://docs.scipy.org/doc/scipy/reference/tutorial/fftpack.html>)

The interpretation is that if $y_k=a_k+ib_k$ then will have (for $N$ even), $y_{N-k} = a_k-ib_k$ and so

$$
\begin{align}
a_k&=\frac{1}{2}\text{Re}\left(y_k+y_{N-k}\right)\\
b_k&=\frac{1}{2}\text{Im}\left(y_k-y_{N-k}\right)
\end{align}
$$


and so we expect the relationship to the digitised Fourier series coefficients returned by the function _fourier_ defined above to be,

$$
\begin{align}
A_k&=\phantom{-}\frac{1}{N}\text{Re}\left(a_k+a_{N-k}\right)\\
B_k&=-\frac{1}{N}\text{Im}\left(b_k-b_{N-k}\right)
\end{align}
$$

In [None]:
from scipy.fftpack import fft

# Compute the fourier series as before
_df = df_obs.loc["2003-01-01"]
x = (_df["X"] - _df["X"].mean()).values
xcofs = fourier(x, 3)
# Compute using scipy FFT
npts = len(x)
xfft = fft(x)

# Compare results for the 24-hour component
k    = 1
print('Fourier coefficients: \n', f'A1 = {xcofs[1][0]} \n', f'B1 = {xcofs[1][1]} \n')
print('scipy FFT outputs: \n', f'a1 = {np.real(xfft[k]+xfft[npts-k])/npts} \n', \
      f'b1 = {-np.imag(xfft[k]-xfft[npts-k])/npts} \n')

## References

Menvielle, M. et al. (1995) ‘Computer production of K indices: review and comparison of methods’, Geophysical Journal International. Oxford University Press, 123(3), pp. 866–886. doi: [10.1111/j.1365-246X.1995.tb06895.x](https://doi.org/10.1111/j.1365-246X.1995.tb06895.x).