<a href="https://colab.research.google.com/github/AkiraNom/Automated_analysis/blob/main/Analysis_automation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# # install libraries
# ! pip install chardet
# ! pip install dataframe_image
# ! pip install fpdf
# ! pip install kaleido

In [3]:
import time
import warnings
import chardet
from tqdm.notebook import tqdm
import glob

import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats as stats
import pandas as pd
import plotly.figure_factory as ff
# libraries for creating a pdf file
from fpdf import FPDF
import dataframe_image as dfi

# set figure style
sns.set(font_scale=2)
sns.set_style('whitegrid')

# Suppress warnings
warnings.filterwarnings('ignore')


In [4]:
# define a folder path
def get_file_paths(folder_path):
  return glob.glob(f'{folder_path}ExpID*.csv')

# check encoding type in csv
'''
CSV files exported by LabChart, ADInstrument are 'ISO-8859-1'.
In case you do not change an encoding option, 'UnicodeDecodeError' will be raised.
To avoid a error, check an encoding option before reading a file with pandas
'''
def get_csv_encoding(file_path):
  with open(file_path, 'rb') as file:
    # Read the csv file in binary and check the encoding
    result = chardet.detect(file.read())
    encoding = result['encoding']
  return encoding

# Read multiple CSV files and create a DataFrame
def read_csv_files(file_paths):
  dfs = []

  print('reading csv files...')
  for file_path in tqdm(file_paths):
    try:
      # specify rows and columns to import
      import_rows = [0,1]
      import_cols = list(range(9))

      # check csv encoding type
      encoding = get_csv_encoding(file_path)

      df = pd.read_csv(file_path,
                        index_col = 0,
                        usecols=import_cols,
                        skiprows = lambda x: x not in import_rows,
                        encoding=encoding
                      )
      # add experimental ID
      myid = int(file_path[-8:-4])
      idx = 0
      df.insert(loc=idx,column = 'Exp_Id', value = myid)
      dfs.append(df)

    except UnicodeDecodeError:
      myid = int(file_path[-8:-4])
      print (f'Encording type error for id {myid}')

  return pd.concat(dfs, axis=0, ignore_index=True)

# convert obj to datetime
def convert_datetime(data):
  """
  The timedate column contains the recording date and time.
  This function extracts dates and converts them to DateTime in pandas
  """
  dates = pd.to_datetime(data.TimeDate.apply(lambda x : x.split(' ')[0]), format='%m/%d/%Y')
  data.TimeDate = dates.dt.strftime('%Y/%m/%d')

  return data

# Read meta data
def read_meta_file(folder_path):
  file_path = glob.glob(f'{folder_path}meta*.csv')[0]
  try:
    return pd.read_csv(file_path)

  except FileNotFoundError:
    print('Meta info is not present')

# Run normality test on each column
def run_normality_tests(data, cols_drop, group_column):

  """
  Run normality tests and return
    True: if both condition pass normality test
    False:if one of them does not pass
  """
  data = data.copy().drop(cols_drop,axis=1)

  normality_results = {}
  groups = data[group_column].unique()

  for column in data.columns[1:]:
      normality_results[column] = True  # Assume normally distributed initially

      for group in groups:
          group_data = data[data[group_column] == group][column]
          _, p_value = stats.normaltest(group_data)

          # If either group is not normally distributed, set to False
          if p_value < 0.05:
              normality_results[column] = False
              break  # No need to continue testing for this column

  return normality_results

# Run appropriate statistical tests based on normality
def run_statistical_tests(data, cols_drop, group_column):

  data = data.copy().drop(cols_drop,axis=1)
  groups = data[group_column].unique()
  p_values = {}
  sig_vals =[]
  sig_features=[]

  if len(groups) ==2:
    group1 = data[data[group_column] == groups[0]]
    group2 = data[data[group_column] == groups[1]]

    results = {}
    for column in data.columns:
      if column == group_column:
        continue  # Skip the grouping column itself

      if normality_results[column]:
          _, p_value = stats.ttest_ind(group1[column], group2[column])
      else:
          _, p_value = stats.mannwhitneyu(group1[column], group2[column])

      p_values[column] = p_value

  else:
      return None # for multi-grouop comparison as needed

  # collect features reach the statistical significance
  for column, p_val in p_values.items():

    print(f'Column: {column}')
    if p_val < 0.05:
        sig_vals.append('*')
        sig_features.append(column)
    else:
        sig_vals.append('')

  return sig_vals, sig_features


# Create barplots
def create_barplots(data, group_column, file_name):

  plt.figure(figsize=(15, 12))
  plt.subplots_adjust(hspace=0.5)

  palette = ['#808080', '#1E90FF']

  for i, target_column in enumerate(features_list):

    ax = plt.subplot(3, 2, i + 1)

    ax = sns.barplot(
        x=group_column,
        y=target_column,
        data=data,
        order=['Control', 'Treatment'],
        errorbar='sd',
        palette=palette,
        width=0.5,
        edgecolor='black',
        errcolor='black',
        errwidth=2.5,
        capsize = 0.3,
    )

    sns.swarmplot(
        x=group_column,
        y=target_column,
        data=data,
        order=['Control', 'Treatment'],
        palette='dark:black',
        size=10,
        ax=ax
        )

    # remove extra legend handles
    handles, labels = ax.get_legend_handles_labels()
    ax.set_xlabel('')
    ax.set_title(f'{target_column}')

    #remove plot boundary
    ax.spines['top'].set_color('None')
    ax.spines['right'].set_color('None')

  # save the plot
  plt.savefig(file_name, dpi=300)

  # close the plot window
  plt.close()

# restructure the dataframe to make a figure of a dataframe table
def restructure_dataframe(data, cols_drop, sig_vals):

  df_analysis = data.copy().drop(cols_drop,axis=1)
  n_treat, n_control = data.groupby('Condition')['Exp_Id'].count().values
  result=df_analysis.groupby('Condition').agg(lambda x: f'{x.mean().round(2)}±{x.std().round(2)}')
  result.loc['p<0.05'] = sig_vals
  result.insert(0, 'Sample (n)',[n_treat, n_control, ''])
  result=result.T

  return result

# create a table png from dataframe
def create_table_fig(data, cols_drop, sig_vals):

  data = restructure_dataframe(data,cols_drop, sig_vals)

  data_table = data.reset_index().rename(columns={'index':''})

  fig = ff.create_table(data_table)
  fig.update_layout(
      autosize=False,
      width=500,
      height=200
  )

  fig.write_image('table.png',scale=2)


In [14]:
# to keep track of the current page number
class ReportPDF(FPDF):
    def __init__(self):
        super().__init__()
        self.page_number = 1

    # in case you need to add company logo in the header
    def header(self):
      if self.page_number > 1:  # except for the title page
          self.image("company_logo.png", 10, 10, 30)  # adjust coordinates
          self.set_font('Arial', 'B', 12)
          # self.cell(0, 10, f'Page {self.page_number}', 0, 0, 'C')

    def footer(self):
        if self.page_number > 0:  # Exclude the title page
            self.set_y(-15)
            self.set_font('Arial', 'B', 12)
            self.cell(0, 10, f'Page {self.page_number}', 0, 0, 'C')

    def add_page(self):
        super().add_page()
        self.page_number += 1


def create_summary_and_table(pdf, n_treat, n_control, first_date, last_date, sig_features):
    # Add summary and table section on the same page
    pdf.add_page()
    pdf.ln(10)
    pdf.set_font('Arial', 'B', 16)
    pdf.cell(0, 10, 'Experiment Summary and Table', ln=True, align='C')
    pdf.ln(10)

    summary_text = (
        f'The experiments were conducted in {n_treat + n_control} animals between'
        f' {first_date} and {last_date}. '
    )

    if sig_features:
        summary_text += f'{str(sig_features)[1:-2]} is/are significantly different between conditions.'
    else:
        summary_text += 'There is no significant difference between conditions.'

    pdf.set_font('Arial', '', 12)
    pdf.multi_cell(0, 10, summary_text)
    pdf.ln(10)

    pdf.image('table.png', w=170)

    # Analysis Methods Text
    pdf.ln(10)
    pdf.set_font('Arial', 'B', 14)
    pdf.cell(0, 10, 'Analysis Methods:', ln=True, align='L')
    pdf.set_font('Arial', '', 12)
    analysis_methods = (
        'Normality test is first performed using the scipy.stats package. If the data is normally distributed,'
        ' T-test is used. Otherwise, Mann-Whitney U test is used for hypothesis testing.'
        '\nP < 0.05 is considered as significance.'
    )
    pdf.multi_cell(0, 10, analysis_methods)


def create_data_distribution(pdf, width):
    # Add data distribution section on a new page
    pdf.add_page()
    pdf.set_font('Arial', 'B', 16)
    pdf.cell(0, 10, 'Data Distribution', ln=True, align='C')

    pdf.image('barplots.png', 10, 35, w=width-10)

def export_pdf(data, sig_features):
    # set A4 size(210 by 297 mm)
    width = 210

    # Create pdf object
    pdf = ReportPDF()
    pdf.set_auto_page_break(auto=True, margin=15)

    # variables for report
    first_date, last_date = data.TimeDate.min(), data.TimeDate.max()
    n_treat, n_control = data.groupby('Condition')['Exp_Id'].count().values

    # Title and introductory page
    pdf.add_page()
    # add company log
    pdf.image("company_logo.png", 10, 10, 30) # adjust coordinates
    pdf.set_font('Arial', 'B', 24)
    pdf.cell(0, 50, 'ECG Record Report', ln=True, align='C')

    # Add author of analysis
    pdf.set_font('Arial', 'I', 14)
    pdf.cell(0, 10, 'Author: John Doe', ln=True, align='C')

    pdf.set_font('Arial', '', 14)
    pdf.cell(0, 10, time.strftime('%Y/%m/%d'), ln=True, align='C')
    pdf.ln(20)

    # Create Summary and Table on the same page
    create_summary_and_table(pdf, n_treat, n_control, first_date, last_date, sig_features)

    # Create Data Distribution on a new page
    create_data_distribution(pdf, width)

    # Save the PDF
    pdf_file_name = 'Analysis_report.pdf'
    pdf.output(pdf_file_name)

In [15]:

if __name__ == "__main__":
  # Get a folder path
  folder_path = './drive/MyDrive/ECG_files/'
  file_paths = get_file_paths(folder_path)

  # Read CSV files and create a DataFrame
  df = read_csv_files(file_paths)

  df = convert_datetime(df)

  meta = read_meta_file(folder_path)
  df = meta.merge(df, how='left')

  # Perform statistical tests based on normality
  group_column = 'Condition'
  # trim two unnecessary cols
  cols_drop = ['Exp_Id','TimeDate']

  # Run normality tests on each column
  normality_results = run_normality_tests(df, cols_drop, group_column)

  sig_vals, sig_features = run_statistical_tests(df, cols_drop, group_column)

  # features list for plots
  features_list = ['RR Interval (ms)', 'Heart Rate (BPM)', 'PR Interval (ms)',
                  'P Duration (ms)', 'QRS Interval (ms)', 'QTc (ms)'
                  ]

  # Create and save barplots
  create_barplots(data=df,group_column=group_column, file_name='barplots.png')

  # create a table fig for a report
  create_table_fig(df, cols_drop, sig_vals)

  # export a pdf report
  export_pdf(df, sig_features)


reading csv files...


  0%|          | 0/42 [00:00<?, ?it/s]

Column: RR Interval (ms)
Column: Heart Rate (BPM)
Column: PR Interval (ms)
Column: P Duration (ms)
Column: QRS Interval (ms)
Column: QT Interval (ms)
Column: QTc (ms)
