In [4]:
# mount the google drive to access its content.
from google.colab import drive
drive.mount('/content/gdrive',force_remount=True)


Mounted at /content/gdrive


In [5]:
import warnings
warnings.filterwarnings('ignore')

In [6]:
!pip install pyspark



In [7]:
# import libraries
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, desc, avg, mean

In [8]:
# read excel file
fp = "/content/gdrive/My Drive/Colab Notebooks/Tendo/Exercise Data.xlsx"
sheet_names = ['encounter_e1', 'lab_e1', 'medications_e1', 'patient_e1']

encounter_e1 = pd.read_excel(fp, sheet_name=sheet_names[0])
lab_e1 = pd.read_excel(fp, sheet_name=sheet_names[1])
medications_e1 = pd.read_excel(fp, sheet_name=sheet_names[2])
patient_e1 = pd.read_excel(fp, sheet_name=sheet_names[3])

In [9]:
#Create PySpark SparkSession
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()
#Create PySpark DataFrame from Pandas
sparkDF_encounter_e1=spark.createDataFrame(encounter_e1)
sparkDF_encounter_e1.show()
sparkDF_lab_e1=spark.createDataFrame(lab_e1)
sparkDF_lab_e1.show()
sparkDF_medications_e1=spark.createDataFrame(medications_e1)
sparkDF_medications_e1.show()
sparkDF_patient_e1=spark.createDataFrame(patient_e1)
sparkDF_patient_e1.show()

+------------+-----------+-------------------+-------------------+-------------------+--------------+-------------------------+-----------------------+-----------------------------+--------------------+-------------+-----+--------+---+---------------+--------------------+--------------+--------------------+---------------+--------------+---------------------+--------------+
|   patientid|encounterid|     encounter_date|         admit_date|     discharge_date|reporting_year|discharge_department_name|discharge_attending_npi|discharge_attending_specialty|billing_provider_npi|attending_npi|  drg|drg_type|mdc|admit_diagnosis|     financial_class|encounter_type|    admission_source|point_of_origin|admission_type|discharge_disposition|length_of_stay|
+------------+-----------+-------------------+-------------------+-------------------+--------------+-------------------------+-----------------------+-----------------------------+--------------------+-------------+-----+--------+---+-----------

In [10]:
df = sparkDF_patient_e1.join(sparkDF_encounter_e1,['patientid']) \
                  .join(sparkDF_medications_e1,['patientid','encounterid']).groupBy(sparkDF_patient_e1.patientid,'Sex','Age','primary_care_provider','medication_simple_generic_name','dose_unit','admit_diagnosis') \
                  .agg(avg('minimum_dose').alias('avg_minimum_dose')).sort(desc(sparkDF_patient_e1.patientid))

In [11]:
df.show(df.count(), False)

+------------+------+---+---------------------+------------------------------+---------------+---------------+------------------+
|patientid   |Sex   |Age|primary_care_provider|medication_simple_generic_name|dose_unit      |admit_diagnosis|avg_minimum_dose  |
+------------+------+---+---------------------+------------------------------+---------------+---------------+------------------+
|111013238188|Female|30 |1.659717221E9        |misoprostol                   |mcg            |O99.344        |25.0              |
|111013238188|Female|30 |1.659717221E9        |terbutaline sulfate           |mg             |O99.344        |0.25              |
|111013238188|Female|30 |1.659717221E9        |fentanyl/bupivacaine/NS/PF    |NaN            |O99.344        |NaN               |
|111013238188|Female|30 |1.659717221E9        |docusate sodium               |mg             |O99.344        |100.0             |
|111013238188|Female|30 |1.659717221E9        |prenatal vit no.130/iron/folic|tablet      

In [12]:
from datetime import datetime
filepath = "/content/gdrive/My Drive/Colab Notebooks/Tendo/"
filename = r"target_1_" + datetime.now().strftime("%Y%m%d") + ".txt"
df.toPandas().to_csv(filepath+filename, header=True, index=None, sep='|', mode='a', encoding = 'utf-8', quotechar='"', line_terminator="")