In [1]:
# https://gist.github.com/tomjemmett/c167376e5b6464ec1c00975be2d7864e

import numpy as np
from typing import List
from collections import namedtuple

def seven_point_one_side_mean(relative_to_mean: List[float]) -> List[bool]:
    """
    :param relative_to_mean: list of floats
    :return: list of booleans
    """
    # pad the vector with 6 zero's at the beginning
    vp = np.insert(relative_to_mean, 0, [0] * 6)
    
    return [
      np.all(vp[i + 6] == vp[i:(i + 6)]) # and (vp[i + 6] != 0)
      for i in range(len(relative_to_mean))
    ]

# Note that the code np.all(vp[i + 6] == vp[i:(i + 6)]) is not comparing the value at the 7th
# position with the mean of the values from the 1st to 6th position as the function name suggests,
# it is checking if the value at the 7th position is equal to all the values from the 1st to 6th 
# position.

def alt_seven_point_one_side_mean(relative_to_mean: List[float]) -> List[bool]:
    """
    :param relative_to_mean: list of floats
    :return: list of booleans
    """
    # pad the vector with 6 zero's at the beginning
    vp = [0]*6 + relative_to_mean
    res = []
    for i in range(len(relative_to_mean)):
        mean = sum(vp[i-1:6])/6
        if vp[i] == mean and vp[i] != 0:
            res.append(True)
        else:
            res.append(False)
    return res

def seven_point_trend(values: List[float]) -> List[int]:
    """
    Given a list of floats, this function checks for a trend in the last 7 values. 
    It returns a list of integers indicating the trend (-1 for decreasing trend, 
    1 for increasing trend, and 0 for no trend) after the 6th consecutive change.

    Parameters:
        values (List[float]): List of float numbers

    Returns:
        List[int]: List of integers indicating the trend (-1 for decreasing trend, 
        1 for increasing trend, and 0 for no trend) after the 6th consecutive change.
    """
    # edge case: len(values) < 7
    # check if the number of elements in the input list is less than 7
    if len(values) < 7:
        # return a list of zeroes of the same length as the input list
        return np.zeros(len(values), dtype=int).tolist()
    
    # calculate the difference between consecutive values and store them in the 
    # diff variable, with 6 zeros to indicate no change before the values begin.
    diff = np.insert(np.diff(values), 0, [0] * 6)

    # create an empty list to store the trend
    trend = []

    for i in range(len(diff)-5):
        # Check if all the differences in the last 6 elements are positive
        if all(x>0 for x in diff[i:i+6]):
            # Append 1 to the trend list, indicating an increasing trend
            trend.append(1)
        # Check if all the differences in the last 6 elements are negative
        elif all(x<0 for x in diff[i:i+6]):
            # Append -1 to the trend list, indicating a decreasing trend
            trend.append(-1)
        else:
            # Append 0 to the trend list, indicating no trend
            trend.append(0)
    return trend

def part_of_seven_trend(values):
    # pad the vector with 6 zero's at the end
    vp = np.insert(values, len(values), [0] * 6)

    return [
      np.any(np.abs(vp[i:(i + 7)]) == 1)
      for i in range(len(values))
    ]

def two_in_three(close_to_limits, relative_to_mean):
  if len(close_to_limits) == 0:
    return []
  # pad the vectors with two 0 at start, two 0 at end
  close_to_limits_pad = np.pad(close_to_limits, 2, "constant", constant_values=False)
  relative_to_mean_pad = np.pad(relative_to_mean, 2, "constant", constant_values=0) # relative to mean

  return [
      np.any([
          sum(close_to_limits_pad[j:(j+3)]) >= 2 and abs(sum(relative_to_mean_pad[j:(j+3)])) == 3
          for j in range(i, i+3)
      ])
      for i in range(len(close_to_limits))
  ]

def part_of_two_in_three(two_in_three, close_to_limits):
  return [
    i and j
    for i, j in zip(close_to_limits, two_in_three)
  ]

def special_cause_flag(values, outside_limits, close_to_limits, relative_to_mean):
    return (
        outside_limits |
        part_of_seven_trend(seven_point_one_side_mean(relative_to_mean)) |
        part_of_seven_trend(seven_point_trend(values)) |
        part_of_two_in_three(two_in_three(close_to_limits, relative_to_mean), close_to_limits)
    )

def spc_x_calc(values, fix_after_n_points = None):
    fix_values = values[:fix_after_n_points]
    # constant
    limit = 2.66

    mean = np.mean(fix_values)
    mr = np.abs(np.diff(fix_values))
    amr = np.mean(mr)

    # screen for outliers
    mr = mr[mr < 3.267 * amr]
    amr = np.mean(mr)

    lpl = mean - (limit * amr)
    upl = mean + (limit * amr)
    
    # identify near lower/upper process limits
    nlpl = mean - (limit * 2 / 3 * amr)
    nupl = mean + (limit * 2 / 3 * amr)

    # identify any points which are outside the upper or lower process limits
    outside_limits = (values < lpl) | (values > upl)
    # identify whether a point is above or below the mean
    relative_to_mean = np.sign(values - mean)

    # identify if a point is between the near process limits and process limits
    close_to_limits = ~outside_limits & ((values < nlpl) | (values > nupl))

    spc_return_type = namedtuple("spc_x", [
        "values",
        "mean",
        "lpl",
        "upl",
        "outside_limits",
        "relative_to_mean",
        "close_to_limits",
        "special_cause_flag"
    ])

    return spc_return_type(
        values,
        mean,
        lpl,
        upl,
        outside_limits,
        relative_to_mean,
        close_to_limits,
        special_cause_flag(values, outside_limits, close_to_limits, relative_to_mean)
    )

In [2]:
spc_x_calc([1,2,3,3,2,4,5,8])

spc_x(values=[1, 2, 3, 3, 2, 4, 5, 8], mean=3.5, lpl=0.07999999999999963, upl=6.92, outside_limits=array([False, False, False, False, False, False, False,  True]), relative_to_mean=array([-1., -1., -1., -1., -1.,  1.,  1.,  1.]), close_to_limits=array([ True, False, False, False, False, False, False, False]), special_cause_flag=array([False, False, False, False, False, False, False,  True]))

In [3]:
import pandas as pd
import numpy as np

# Create a pandas dataframe with a date column and a data column
# The data column will have random integers between 0 and 100

date_rng = pd.date_range(start='1/1/2020', end='1/10/2022', freq='D')
df = pd.DataFrame(date_rng, columns=['date'])
df['data'] = np.random.randint(0,100,size=(len(date_rng)))

print(df)

          date  data
0   2020-01-01    61
1   2020-01-02    69
2   2020-01-03    35
3   2020-01-04    59
4   2020-01-05    85
..         ...   ...
736 2022-01-06    28
737 2022-01-07    59
738 2022-01-08     0
739 2022-01-09    88
740 2022-01-10    35

[741 rows x 2 columns]


In [4]:
import pandas as pd
import numpy as np

def spc_x_calc_pandas(df, values_col, fix_after_n_points = None):
    values = df[values_col].values
    fix_values = values[:fix_after_n_points]
    # constant
    limit = 2.66

    mean = np.mean(fix_values)
    mr = np.abs(np.diff(fix_values))
    amr = np.mean(mr)

    # screen for outliers
    mr = mr[mr < 3.267 * amr]
    amr = np.mean(mr)

    lpl = mean - (limit * amr)
    upl = mean + (limit * amr)
    
    # identify near lower/upper process limits
    nlpl = mean - (limit * 2 / 3 * amr)
    nupl = mean + (limit * 2 / 3 * amr)

    # identify any points which are outside the upper or lower process limits
    outside_limits = (values < lpl) | (values > upl)
    # identify whether a point is above or below the mean
    relative_to_mean = np.sign(values - mean)

    # identify if a point is between the near process limits and process limits
    close_to_limits = ~outside_limits & ((values < nlpl) | (values > nupl))

    # create output pandas dataframe from numpy calculations
    output_df = df
    output_df['mean'] = mean
    output_df['lpl'] = lpl
    output_df['upl'] = upl
    output_df['outside_limits'] = outside_limits
    output_df['relative_to_mean'] = relative_to_mean
    output_df['close_to_limits'] = close_to_limits
    output_df['special_cause_flag'] = special_cause_flag(values, outside_limits, close_to_limits, relative_to_mean)

    return output_df

spc = spc_x_calc_pandas(df, 'data')
spc

Unnamed: 0,date,data,mean,lpl,upl,outside_limits,relative_to_mean,close_to_limits,special_cause_flag
0,2020-01-01,61,51.236167,-41.97167,144.444005,False,1.0,False,False
1,2020-01-02,69,51.236167,-41.97167,144.444005,False,1.0,False,False
2,2020-01-03,35,51.236167,-41.97167,144.444005,False,-1.0,False,False
3,2020-01-04,59,51.236167,-41.97167,144.444005,False,1.0,False,False
4,2020-01-05,85,51.236167,-41.97167,144.444005,False,1.0,False,False
...,...,...,...,...,...,...,...,...,...
736,2022-01-06,28,51.236167,-41.97167,144.444005,False,-1.0,False,False
737,2022-01-07,59,51.236167,-41.97167,144.444005,False,1.0,False,False
738,2022-01-08,0,51.236167,-41.97167,144.444005,False,-1.0,False,False
739,2022-01-09,88,51.236167,-41.97167,144.444005,False,1.0,False,False


In [5]:
import plotly.graph_objects as go
from datetime import datetime
from dateutil.relativedelta import relativedelta

def get_status(row: pd.Series) -> str:
    """
    Given a row of a dataframe, returns a string depending on the values
    of the 'outside_limits', 'close_to_limits' and 'relative_to_mean'
    columns of that row.
    
    :param row: A row of a dataframe
    :type row: pd.Series
    :return: A string indicating the status
    :rtype: str
    """
    if row['outside_limits']:
        return 'Outside Limit'
    elif row['close_to_limits']:
        return 'Close to limit'
    elif row['relative_to_mean'] > 0:
        return 'Above mean'
    elif row['relative_to_mean'] < 0:
        return 'Below mean'
    else:
        return ''
    
def get_colour(row: pd.Series) -> str:
    """
    Given a row of a dataframe, returns a string depending on the values
    of the 'outside_limits', 'close_to_limits' columns of that row.
    
    :param row: A row of a dataframe
    :type row: pd.Series
    :return: A string indicating the colour
    :rtype: str
    """
    if row['outside_limits']:
        return 'red'
    elif row['close_to_limits']:
        return 'yellow'
    else:
        return 'rgb(22, 96, 167)'

remove = ['zoom2d','pan2d', 'select2d', 'lasso2d', 'zoomIn2d',
            'zoomOut2d', 'autoScale2d', 'resetScale2d', 'zoom',
            'pan', 'select', 'zoomIn', 'zoomOut', 'autoScale',
            'resetScale', 'toggleSpikelines', 'hoverClosestCartesian',
            'hoverCompareCartesian', 'toImage']

def plot_line_chart(df, values_col, date_col, plot_title, x_lab, y_lab):
    # Create a scatter plot of the data points
    scatter = go.Scatter(
        x=df[date_col],
        y=df[values_col],
        name = 'Performance',
        mode='lines+markers',
        marker=dict(
            color=df.apply(lambda row: get_colour(row), axis=1),
            size=10,
            symbol='circle'),
        line = dict(color = 'rgb(22, 96, 167)',
                          width = 3, dash = 'solid'),
        text = df.apply(lambda row: get_status(row), axis=1),
        hovertemplate = '%{text}: %{y:.0f}<extra></extra>',
    )
    # Create a line plot of the mean
    mean_line = go.Scatter(
        x=df[date_col],
        y=df['mean'],
        mode='lines',
        line = dict(color = 'rgba(174, 37, 115, 0.5)',
                    width = 2,
                    dash = 'dash'),
        name = "Mean",
        hovertemplate = 'mean: %{y:.0f}<extra></extra>',
    )
    # Create a shaded area for the lower and upper control limits
    lpl_area = go.Scatter(
        x=df[date_col],
        y=df['lpl'],
        mode='lines',
        line=dict(
            color='rgba(174, 37, 115, 0.1)',
            width=0,
        ),
        name = "lpl",
        hovertemplate = 'lpl: %{y:.0f}<extra></extra>',
    )
    upl_area = go.Scatter(
        x=df[date_col],
        y=df['upl'],
        mode='lines',
        line=dict(
            color='rgba(174, 37, 115, 0.1)',
            width=0,
        ),
        fill='tonexty',
        fillcolor='rgba(174, 37, 115, 0.1)',
        name = "upl",
        hovertemplate = 'upl: %{y:.0f}<extra></extra>',
    )
    min_xaxis = min(df[date_col])
    max_xaxis = max(df[date_col])
    max_yaxis = max(df[values_col])
    layout = go.Layout(title = plot_title,
                   font = dict(size = 12),
                   xaxis = dict(title = x_lab,
                                # add more time to x-axis to show plot circles
                                range = [min_xaxis - relativedelta(days=5),
                                         max_xaxis + relativedelta(days=5)]),
                   yaxis = dict(title = y_lab,
                                # fix y0 at 0 and add 10% to y1
                                range = [0, max_yaxis + (max_yaxis * 0.1)]),
                   showlegend = False,
                   hovermode = "x unified")
    config = {'displaylogo': False,
              'displayModeBar': True,
              'modeBarButtonsToRemove': remove}
    # Create the figure and show it
    fig = go.Figure(data=[scatter, mean_line, lpl_area, upl_area], layout=layout)
    fig.update_layout(template='plotly_white')
    fig.show(config=config)


In [6]:
# File path of the CSV file
file_path = '.././nhspy_plotthedots/data/ae_attendances.csv'

# Read the CSV file and store it in a DataFrame
df = pd.read_csv(file_path)

df

from datetime import datetime

# Convert 'period' column to datetime format
df['period'] = pd.to_datetime(df['period'])

# Create a subset of the DataFrame based on certain conditions
sub_set = df[(df['org_code'] == "RQM") & (df['type'] == "1") & (df['period'] < datetime(2018, 4, 1))]
sub_set = sub_set.sort_values(by='period').reset_index(drop=True)

sub_set

Unnamed: 0,period,org_code,type,attendances,breaches,admissions
0,2016-04-01,RQM,1,15154,1199,3415
1,2016-05-01,RQM,1,16705,929,3590
2,2016-06-01,RQM,1,16021,970,3398
3,2016-07-01,RQM,1,16761,1178,3321
4,2016-08-01,RQM,1,15084,1110,3198
5,2016-09-01,RQM,1,15918,1388,3260
6,2016-10-01,RQM,1,16564,2061,3563
7,2016-11-01,RQM,1,16478,1985,3399
8,2016-12-01,RQM,1,16931,2675,3671
9,2017-01-01,RQM,1,16920,3141,3339


In [7]:
spc = spc_x_calc_pandas(sub_set, 'breaches')
spc

Unnamed: 0,period,org_code,type,attendances,breaches,admissions,mean,lpl,upl,outside_limits,relative_to_mean,close_to_limits,special_cause_flag
0,2016-04-01,RQM,1,15154,1199,3415,1545.166667,740.395758,2349.937576,False,-1.0,False,False
1,2016-05-01,RQM,1,16705,929,3590,1545.166667,740.395758,2349.937576,False,-1.0,True,True
2,2016-06-01,RQM,1,16021,970,3398,1545.166667,740.395758,2349.937576,False,-1.0,True,True
3,2016-07-01,RQM,1,16761,1178,3321,1545.166667,740.395758,2349.937576,False,-1.0,False,False
4,2016-08-01,RQM,1,15084,1110,3198,1545.166667,740.395758,2349.937576,False,-1.0,False,False
5,2016-09-01,RQM,1,15918,1388,3260,1545.166667,740.395758,2349.937576,False,-1.0,False,False
6,2016-10-01,RQM,1,16564,2061,3563,1545.166667,740.395758,2349.937576,False,1.0,False,False
7,2016-11-01,RQM,1,16478,1985,3399,1545.166667,740.395758,2349.937576,False,1.0,False,False
8,2016-12-01,RQM,1,16931,2675,3671,1545.166667,740.395758,2349.937576,True,1.0,False,True
9,2017-01-01,RQM,1,16920,3141,3339,1545.166667,740.395758,2349.937576,True,1.0,False,True


In [8]:
plot_line_chart(spc, 'breaches', 'period', plot_title = 'Chelsea & Westminster Hospital NHS Foundation Trust (RQM)', x_lab = 'Month of attendance', y_lab = 'Number of 4-Hour Target Breaches')