In [None]:
from pyspark.sql import SQLContext, Row, DataFrame, HiveContext, SparkSession
from pyspark.sql.functions import udf, col, lit, when, min as sql_min
from pyspark.sql.types import *

sqlContext = SQLContext(sc)
sqlContext_H = HiveContext(sc)

import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

from datetime import datetime, date
import dateutil
import pandas as pd
import numpy as np
from collections import OrderedDict
from random import randint
from pyspark.sql.window import Window
import pyspark.sql.functions as func
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
%%time

data = sqlContext_H.read.parquet(
    HOST + "/parquet/07/*",
    HOST + "/parquet/08/*",
    HOST + "/parquet/09/*"
)

def convert_dttm(x):
    try:
        return datetime.strptime(x, '%Y-%m-%d %H:%M:%S')
    except:
        return None
convert_dttm_udf =  udf(convert_dttm, TimestampType())

data = data.withColumn(
        "ACCESS_DTTM", convert_dttm_udf(col("ACCESS_DTTM"))
    ).withColumn(
        "ADMISSION_DTTM", convert_dttm_udf(col("ADMISSION_DTTM"))
    ).withColumn(
        "DISCHARGE_DTTM", convert_dttm_udf(col("DISCHARGE_DTTM"))
    )

In [None]:
data.limit(10).toPandas()

---
### 1. Select fields

In [None]:
#data_analysis.unpersist()
columns = ["ACCESS_DTTM", "USER_ID", "METRIC_NAME", "MODULE", "EPIC_PATIENT_ID", "REPORT_NAME", "METRIC_DESCRIPTION"]
data_analysis = data.select(columns).persist()
data_analysis.limit(3).toPandas()

In [None]:
sqlContext_H.registerDataFrameAsTable(data_analysis, "data_analysis")

In [None]:
def draw_plot(rdd, x, y, n, title=''):
    X = rdd.rdd.map(lambda p: p[x]).take(n)
    Y = rdd.rdd.map(lambda p: p[y]).take(n)
    fig = plt.figure(figsize=(18,6))
    fig.suptitle(title, fontsize=14, fontweight='bold')

    width = .35
    ind = np.arange(len(Y))
    plt.bar(ind, Y, width=width)
    plt.xticks(ind + width / 2, X)
    
    fig.autofmt_xdate()

    print('\n')

---
### 2. Amount of unique modules, unique metrics

In [None]:
%%time
query = " ".join([
        "SELECT COUNT(DISTINCT MODULE) AS DISTINCT_MODULE",
        "FROM data_analysis",
    ])
u_mo = sqlContext_H.sql(query)
u_mo.show()

In [None]:
%%time
query = " ".join([
        "SELECT COUNT(DISTINCT METRIC_NAME) AS DISTINCT_METRIC_NAME",
        "FROM data_analysis",
    ])
u_me = sqlContext_H.sql(query)
u_me.show()

---
> * Total unique MODULE amount is 39.
> * Total unique METRIC_NAME amount is 689.

---

---
### 3. Percentage of records with undefined modules and metrics

In [None]:
%%time
total_rows = data_analysis.rdd.count()
print "Total rows in datasset: {}".format(total_rows)

In [None]:
%%time
query = " ".join([
    "SELECT *",
    "FROM data_analysis",
    "WHERE MODULE = ''",
    "OR MODULE = 'None'",
    "OR MODULE = null"
])
u_mo = sqlContext_H.sql(query)
c_u_mo = u_mo.rdd.count()
c_u_mo

In [None]:
%%time
per_mo = (c_u_mo*100.0)/total_rows
print "Percentage of rows where MODULE is undefined: {:.2f}%".format(per_mo)

In [None]:
%%time
query = " ".join([
    "SELECT *",
    "FROM data_analysis",
    "WHERE METRIC_NAME = ''",
    "OR METRIC_NAME = 'None'",
    "OR METRIC_NAME = null"
])
u_me = sqlContext_H.sql(query)
c_u_me = u_me.rdd.count()
c_u_me

In [None]:
%%time
per_me = (c_u_me*100.0)/total_rows
print "Percentage of rows where METRIC_NAME is undefined: {:.2f}%".format(per_me)

---
> * There are about 4% records where a MODULE is undefined.
> * All records have a METRIC_NAME.

---

# ...

---
### 8. Find all records related with record printing (this info may also be in METRIC_DESCRIPTION field) and investigate its dynamics

In [None]:
%%time
def f1(x):
    return str(x.date())

def f2(x):
    return x.hour

def f3(c1, c2, c3):
    if 'print' in (c1 + ' ' + c2 + ' ' + c3).lower():
        return 1
    else:
        return 0

#dow_udf = udf(f1, IntegerType())
dow_udf = udf(f1, StringType())
time_udf = udf(f2, IntegerType())
f_udf = udf(f3, IntegerType())

check_df = data_analysis.withColumn("CKECK", f_udf(col("METRIC_DESCRIPTION"), col("MODULE"), col("METRIC_NAME")))\
                        .withColumn("DAY", dow_udf(col("ACCESS_DTTM")))\
                        .withColumn("TIME_OF_DAY", time_udf(col("ACCESS_DTTM")))
                       #.withColumn("DAY_OF_WEEK", dow_udf(col("ACCESS_DTTM")))\
sqlContext_H.registerDataFrameAsTable(check_df, 'check_df')

In [None]:
%%time
query = " ".join([
        "SELECT *",
        "FROM check_df",
        "WHERE CKECK = 1 OR REPORT_NAME = 'Print' OR REPORT_NAME = 'Printed/Sent by Order Transmittal'"
    ])
with_print = sqlContext_H.sql(query)
sqlContext_H.registerDataFrameAsTable(with_print, 'with_print')

In [None]:
%%time
with_print.limit(5).toPandas()

In [None]:
%%time
query = " ".join([
        "SELECT concat_ws(';', collect_set(METRIC_NAME)) AS METRIC_NAME,",
        "concat_ws(';', collect_set(MODULE)) AS MODULE,",
        "concat_ws(';', collect_set(METRIC_DESCRIPTION)) AS METRIC_DESCRIPTION",
        "FROM with_print",
        #"GROUP BY METRIC_NAME, MODULE, METRIC_DESCRIPTION"
    ])
collect_df = sqlContext_H.sql(query)

In [None]:
%%time
collect_df.limit(10).toPandas()

In [None]:
collect_df_pd = collect_df.toPandas()

In [None]:
m_n_l = [i for i in collect_df_pd['METRIC_NAME'].values.tolist()[0].split(';') if 'print' in i.lower()]
filter_metric = "WHERE " + ' OR '.join(["METRIC_NAME = '{0}'".format(i) for i in m_n_l])

In [None]:
filter_metric

In [None]:
print "Distinct values of METRIC_NAME: {}".format(len(m_n_l))

In [None]:
print m_n_l

In [None]:
mo_l = [i for i in collect_df_pd['MODULE'].values.tolist()[0].split(';') if 'print' in i.lower()]
filter_module = "WHERE " + ' OR '.join(["MODULE = '{0}'".format(i) for i in mo_l])

In [None]:
print "Distinct values of MODULE: {}".format(len(mo_l))

In [None]:
print mo_l

In [None]:
me_d = [i for i in collect_df_pd['METRIC_DESCRIPTION'].values.tolist()[0].split(';') if 'print' in i.lower()]
filter_metric_des = "WHERE " + ' OR '.join(["METRIC_DESCRIPTION = '{0}'".format(i) for i in me_d])

In [None]:
print "Distinct values of METRIC_DESCRIPTION: {}".format(len(me_d))

In [None]:
print me_d

#### 8.1 How many users have these metrics / modules / metric descriptions

In [None]:
%%time
u_users_c = with_print.select("USER_ID").distinct().rdd.count()
print "Users has these metrics/modules/metric_descriptions: {}".format(u_users_c)

#### 8.2 Which of them are most popular

In [None]:
query = " ".join([
        "SELECT METRIC_NAME, MODULE, COUNT(METRIC_NAME) AS AMOUNT, COUNT(DISTINCT USER_ID) AS DISTINCT_USERS",
        "FROM with_print",
        filter_metric,
        "GROUP BY METRIC_NAME, MODULE",
        "ORDER BY AMOUNT DESC, DISTINCT_USERS DESC"
    ])
show_metric = sqlContext_H.sql(query)

In [None]:
%%time
show_metric.limit(20).toPandas()

In [None]:
%%time
show_metric.write.format("com.databricks.spark.csv").save(HOST + "/csv/EDA_METRICS_10_2_METRIC_NAME_MODULE.csv")

In [None]:
query = " ".join([
        "SELECT MODULE, METRIC_NAME, COUNT(MODULE) AS AMOUNT, COUNT(DISTINCT USER_ID) AS DISTINCT_USERS",
        "FROM with_print",
        filter_module,
        "GROUP BY MODULE, METRIC_NAME",
        "ORDER BY AMOUNT DESC, DISTINCT_USERS DESC"
    ])
show_module = sqlContext_H.sql(query)

In [None]:
%%time
show_module.limit(20).toPandas()

In [None]:
%%time
show_module.write.format("com.databricks.spark.csv").save(HOST + "/csv/EDA_METRICS_10_2_MODULE_METRIC_NAME.csv")

In [None]:
query = " ".join([
        "SELECT METRIC_DESCRIPTION, MODULE, METRIC_NAME, COUNT(METRIC_DESCRIPTION) AS AMOUNT, COUNT(DISTINCT USER_ID) AS DISTINCT_USERS",
        "FROM with_print",
        filter_metric_des,
        "GROUP BY METRIC_DESCRIPTION, MODULE, METRIC_NAME",
        "ORDER BY AMOUNT DESC, DISTINCT_USERS DESC"
    ])
show_metric_des = sqlContext_H.sql(query).persist()

In [None]:
%%time
show_metric_des.limit(20).toPandas()

In [None]:
%%time
show_metric_des.write.format("com.databricks.spark.csv").save(HOST + "/csv/EDA_METRICS_10_2_METRIC_DESCRIPTION.csv")

In [None]:
show_metric_des.unpersist()

#### 8.3 How many such records users / patients have per day / week / hour

In [None]:
def draw_scale(rdd, x, y, z, n_label_x, n_label_y, n_label_z, title=''):
    import numpy as np
    import matplotlib.pyplot as plt
    from collections import OrderedDict
    
    fig, ax1 = plt.subplots(figsize=(18,6))
    
    fig.suptitle(title, fontsize=14, fontweight='bold')

    t = rdd.rdd.map(lambda p: p[x]).collect()
    s1 = rdd.rdd.map(lambda p: p[y]).collect()
    s2 = rdd.rdd.map(lambda p: p[z]).collect()
    if x == 'TIME_OF_DAY':
        d_t = range(0, 24)
        d_s1 = {i: s1[t.index(i)] if i in t else 0 for i in range(0, 24)}
        d_s1 = OrderedDict(sorted(d_s1.items(), key=lambda t: t[0])).values()

        d_s2 = {i: s2[t.index(i)] if i in t else 0 for i in range(0, 24)}
        d_s2 = OrderedDict(sorted(d_s2.items(), key=lambda t: t[0])).values()
        names = d_t
    else:
        d_t = range(0, len(t))
        d_s1 = s1
        d_s2 = s2
        names = t
    
    ax1.plot(d_t, d_s1, 'b-')
    ax1.set_xlabel(n_label_x)
    # Make the y-axis label and tick labels match the line color.
    ax1.set_ylabel(n_label_y, color='b')
    for tl in ax1.get_yticklabels():
        tl.set_color('b')


    ax2 = ax1.twinx()
    ax2.plot(d_t, d_s2, 'r-')
    ax2.set_ylabel(n_label_z, color='r')
    for tl in ax2.get_yticklabels():
        tl.set_color('r')
    plt.xticks(np.arange(min(d_t), max(d_t)+1, 1.0), names)

    plt.show()

##### by hour and user

In [None]:
%%time
query = " ".join([
    "SELECT USER_ID, TIME_OF_DAY, AVG(TOTAL) AS AVG_TOTAL, SUM(TOTAL) AS TOTAL",
    "FROM (SELECT USER_ID, TIME_OF_DAY, ACCESS_DTTM, COUNT(USER_ID) AS TOTAL",
           "FROM with_print",
           "GROUP BY USER_ID, TIME_OF_DAY, ACCESS_DTTM) AS T",
    "GROUP BY TIME_OF_DAY, USER_ID",
    "ORDER BY AVG_TOTAL DESC"
])
c_u_h = sqlContext_H.sql(query).persist()

In [None]:
%%time
c_u_h.limit(10).toPandas()

In [None]:
%%time
c_u_h.write.format("com.databricks.spark.csv").save(HOST + "/csv/EDA_METRICS_10_3_hour_user.csv")

In [None]:
draw_scale(c_u_h.filter("USER_ID = '313999'"), "TIME_OF_DAY", "AVG_TOTAL", "TOTAL", "Time of day", "AVG TOTAL", "TOTAL", "FOR USER 313999")

In [None]:
draw_scale(c_u_h.filter("USER_ID = '105369'"), "TIME_OF_DAY", "AVG_TOTAL", "TOTAL", "Time of day", "AVG TOTAL", "TOTAL", "FOR USER 105369")

In [None]:
draw_scale(c_u_h.filter("USER_ID = '706862'"), "TIME_OF_DAY", "AVG_TOTAL", "TOTAL", "Time of day", "AVG TOTAL", "TOTAL", "FOR USER 706862")

In [None]:
draw_scale(c_u_h.filter("USER_ID = '205792'"), "TIME_OF_DAY", "AVG_TOTAL", "TOTAL", "Time of day", "AVG TOTAL", "TOTAL", "FOR USER 205792")

In [None]:
draw_scale(c_u_h.filter("USER_ID = '10010'"), "TIME_OF_DAY", "AVG_TOTAL", "TOTAL", "Time of day", "AVG TOTAL", "TOTAL", "FOR USER 10010")

In [None]:
c_u_h.unpersist()