In [None]:
from datetime import datetime
from functools import reduce
from math import ceil
from pathlib import Path
from time import sleep

import bs4
import numpy as np
import mailersend
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver import ActionChains
from selenium.webdriver.common.keys import Keys

### USER NOTE: CLEAR CREDENTIALS BEFORE COMMITING

In [None]:
USERNAME = ""
PASSWORD = ""

In [None]:
site = {
    "login_url": "https://my.mdanderson.org/MyChart/Authentication/Login?postloginurl=Clinical%2fTestResults",
    "username_id": "Login",
    "password_id": "Password",
    "credentials": {
        "username": USERNAME,
        "password": PASSWORD,
    },
    "login_button_id": "submit",
    "login_confirm_css": "menuicon heal",
}

# scraper

In [None]:
def scrape(site):
    browser, lab_entries = get_lab_entries(site)
    if browser is None:
        return None
    parse_lab_pages(browser, lab_entries)
    panel_dict = parse_lab_pages(browser, lab_entries)
    df = agg_df(panel_dict)
    df.to_csv("test_output.csv")
    return df


def get_lab_entries(site: dict):
    browser = login(site)
    sleep(3)
    lab_entries = browser.find_elements_by_class_name("SingleResult")
    if not has_updated(lab_entries, "prev_entry.txt"):
        browser.close()
        return None, None
    load_labs(browser)
    lab_entries = browser.find_elements_by_class_name("SingleResult")
    print(f"labs detected: {len(lab_entries)}")
    return browser, lab_entries


def load_labs(browser, pause_time=0.5):
    load_more = browser.find_elements_by_class_name("loadmore")
    while load_more:
        try:
            load_more.pop().click()
        except Exception as e:
            print("loadmore done?")
            return False
        sleep(pause_time)
        load_more = browser.find_elements_by_class_name("loadmore")
    return True


def parse_lab_pages(browser, lab_entries):
    home_handle = browser.current_window_handle
    panel_dict = dict()
    for entry in lab_entries:
        entry_data = get_entry_data(entry)
        name = entry_data["name"]
        elem = entry_data["elem"]
        if name in panel_dict:
            continue
        lab_url = elem.get_attribute("href")
        browser.execute_script("window.open('');")
        browser.switch_to.window(browser.window_handles[1])
        browser.get(lab_url)
        tabs = browser.find_elements_by_class_name("membertab")
        panel_dict[name] = get_table(browser, tabs, name)
    browser.close()
    browser.switch_to.window(home_handle)
    return panel_dict


def get_table(browser, tabs, name):
    try:
        results_tab = list(filter(lambda t: t.find_element_by_tag_name("span").text == "Past Results", tabs)).pop()
    except Exception as e:
        print(f"{name}: NO TABLE")
        return "NO TABLE"
    results_tab.click()
    start_date = browser.find_element_by_class_name("date")
    start_date.clear()
    start_date.send_keys("1/1/15")
    apply_button = browser.find_element_by_class_name("otherbutton")
    apply_button.click()
    html = browser.page_source
    soup = bs4.BeautifulSoup(html,'html.parser')
    table = list(soup.select(".tableWrapper")[0].children)[0]
    df = pd.read_html(str(table))[0]
    df = df.set_index(df.columns[0]).T
    return df


def agg_df(panel_dict):
    df_list = list(filter(lambda x: isinstance(x, pd.DataFrame), panel_dict.values()))
    df = reduce(
        lambda df_1, df_2: pd.merge(df_1, df_2, how="outer", left_index=True, right_index=True),
        df_list)
    return df


def login(site: dict):
    browser = webdriver.Chrome()
    browser.get(site["login_url"])
    username_box = browser.find_element_by_id(site["username_id"])
    password_box = browser.find_element_by_id(site["password_id"])
    username_box.send_keys(site["credentials"]["username"])
    password_box.send_keys(site["credentials"]["password"])
    login_button = browser.find_element_by_id(site["login_button_id"])
    login_button.click()
    return browser


def has_updated(lab_entries: list, prev_entry_path: str):
    path = Path(prev_entry_path)
    if not path.exists():
        return False
    with open(path, "r") as f:
        prev_name, prev_date = f.read().splitlines()
    entry_data = get_entry_data(lab_entries[0])
    if entry_data["name"] != prev_name or entry_data["date"] != prev_date:
        return True
    return False


def get_entry_data(entry):
    entry_dict = dict()
    # panel = entry.find_element_by_class_name("ResultName").find_element_by_tag_name("span")
    entry_dict["elem"] = entry.find_element_by_class_name("ResultName").find_element_by_tag_name(
        "a")
    entry_dict["name"] = entry_dict["elem"].find_element_by_tag_name("span").text
    entry_dict["date"] = entry.find_element_by_class_name("DateWrapper").find_element_by_tag_name(
        "span").text
    return entry_dict

# clean and plot

In [None]:
EXTREMA_COLORS = {"min": "orange", "max": "red"}
BAD_METRICS = ("Not", "NEG", "TRACE", "Non", "NOT", "Negative", "Normal", "OCC")

def clean_plot_df(df, start_date="2021-1-1", min_col_entries=5, n_cols=3):
    df = df.loc[~df.index.str.contains("Unnamed")]
    day_n = df.index.str.split(".").map(lambda l: list_val_default(l))
    date = df.index.str.split(".").map(lambda x: x[0])
    df.index = pd.to_datetime(date) + day_n.map(lambda t: pd.Timedelta(hours=4 * int(t)))
    dfm = get_metrics_df(df)
    dfn = get_df_numeric(df, start_date, min_col_entries)
    plot_numeric_df(dfn, dfm, n_cols)


def plot_numeric_df(dfn, dfm, n_cols):
    # dfl = dfn.reset_index().melt(id_vars="index").dropna()
    rows = ceil(len(dfn.columns) / n_cols)
    titles = dfn.columns.map(
        lambda col: col + " " + str(dfm.loc[col]["units"]) if col in dfm.index else col)
    titles = titles.str.replace("_x", "")
    fig = make_subplots(
        rows=rows, cols=n_cols,
        shared_xaxes=True,
        vertical_spacing=0.01,
        subplot_titles=titles,
    )
    for i, col in enumerate(dfn.columns):
        j = i // n_cols + 1
        i = i % n_cols + 1
        fig.add_trace(
            go.Scatter(x=dfn.index, y=dfn[col], line_shape='linear'),
            row=j,
            col=i
        )
        if col in dfm.index:
            for extrema in ["min", "max"]:
                lim = dfm[extrema][col]
                if not np.isnan(lim):
                    fig.add_hline(y=lim, line_width=3, line_color=EXTREMA_COLORS[extrema], row=j,
                                  col=i)
    fig.update_layout(
        height=300 * rows,
        width=500 * n_cols,
        title_text="Stacked Subplots with Shared X-Axes"
    )
    fig.update_xaxes(matches='x')
    date = str(datetime.now().date())
    fig.write_html(f"ehr_{date}.html")
    fig.show()


def get_metrics_df(df):
    metric_names = df.columns.str.split("  ").map(lambda l: l[0]).to_series().reset_index(drop=True)
    metric_ranges = df.columns.str.split("  ").map(
        lambda l: list_val_default(l, np.nan)).to_series().reset_index(drop=True)
    dfm = pd.concat([metric_names, metric_ranges], axis=1)
    dfm.columns = ["metric", "range"]
    dfm.set_index("metric", inplace=True)
    dfm = dfm["range"]
    dfm = dfm[~dfm.index.duplicated()]
    dfm = dfm[
        ~dfm.str.contains('|'.join(BAD_METRICS),
                          regex=True).astype(bool)]
    dfm_lists = dfm.dropna().str.split(" ").dropna()
    dfm = pd.DataFrame(dict(dfm_lists.map(parse_metric_range))).T
    dfm["min"] = dfm["min"].str.replace('|'.join((",", "_x")), "", regex=True).astype(float)
    dfm["max"] = dfm["max"].str.replace('|'.join((",", "_x")), "", regex=True).astype(float)
    df.columns = df.columns.str.split("  ").map(lambda l: l[0])
    df.columns = df.columns.str.lower()
    dfm.index = dfm.index.str.lower()
    df.columns = df.columns.str.replace(" ", "_")
    dfm.index = dfm.index.str.replace(" ", "_")
    return dfm

def parse_metric_range(elem_list):
    metric_dict = dict()
    len_ = len(elem_list)
    if len_ == 1:
        metric_dict["units"] = elem_list[-1]
    elif len_ == 2:
        metric_dict["units"] = elem_list[-1]
        val = elem_list[0]
        for comparator, col in {"<=": "max", ">=": "min", "<": "max", ">": "min"}.items():
            if val.startswith(comparator):
                metric_dict[col] = float(val[len(comparator):])
                break
    elif len_ >= 3:
        metric_dict["min"] = elem_list[0]
        metric_dict["max"] = elem_list[2]
        if "-" not in elem_list:
            return parse_metric_range([elem_list[0], " ".join(elem_list[1:])])
        if len_ == 4:
            metric_dict["units"] = elem_list[-1]
    return metric_dict


def get_df_numeric(df, start_date, min_col_entries):
    df.to_csv("output_cleaned.csv")
    dfn = df.select_dtypes(np.float64)
    dfn = dfn[dfn.index > start_date]
    dfn = dfn.loc[:,~dfn.columns.duplicated()]
    cols_lt_min = dfn.isnull().sum(axis = 0).sort_values() > len(dfn) - min_col_entries
    dfn = dfn[dfn.columns[~cols_lt_min]]
    dfn = dfn.interpolate(limit_area="inside")
    dfn = dfn.sort_index()
    return dfn


def list_val_default(list_, default=0):
    return list_[1] if len(list_) > 1 else default


In [None]:
df = scrape(site)
if df is None:
    print("NO UPDATES")
else:
    clean_plot_df(df)

In [None]:
# mailer = mailersend.NewApiClient()

# subject = "Subject"
# text = "This is the text content"
# html = "<p>This is the HTML content</p>"

# my_mail = "owner@verified_domain.com"
# subscriber_list = [ 'pamela@dundermifflin.com',
# 'dwight@dunderfmifflin.com', 'jim@dundermifflin.com']

# mailer.send(my_mail, subscriber_list, subject, html, text)

In [None]:
df.select_dtypes("O").dropna(how="all")

In [None]:
# df = pd.read_csv("test_output.csv", index_col=0)