In [0]:
pip install azure-storage-blob xlrd==1.2.0

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, ArrayType, Row
from pyspark.sql.functions import split, explode, sequence, col
from azure.storage.blob import BlobServiceClient

from pathlib import Path
from itertools import chain
import pandas as pd
import xlrd
import os

In [0]:
# Read input parameters
dbutils.widgets.text("folder", "")
folder = dbutils.widgets.get("folder")

In [0]:
# Connect to blob storage
blob_account_name = "cahospitalstorage"
blob_container_name = "chargemasters"
blob_sas_token = r"sp=racwl&st=2021-08-09T17:55:40Z&se=2021-08-10T01:55:40Z&sv=2020-08-04&sr=c&sig=Q8eHbmLaLe9wujjAPXjhoqzQcsYISGO4HHHnY8P5dYE%3D"

wasbs_path = "wasbs://%s@%s.blob.core.windows.net/" % (blob_container_name, blob_account_name)
spark.conf.set("fs.azure.sas.%s.%s.blob.core.windows.net" % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + wasbs_path)

In [0]:
# Get path of excel files
connection_string = "DefaultEndpointsProtocol=https;AccountName=cahospitalstorage;AccountKey=w3EYeDH//L+J0uscsn3BYDatbFCcB7CUXK4MfOSNmwc16v3WAQ7EPcw2IpchUrelLVuowjfugmv99e0Os8J5Sw==;EndpointSuffix=core.windows.net"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(blob_container_name)

files = container_client.list_blobs(name_starts_with='raw_data/' + folder)
files_rdd = spark.sparkContext.parallelize(files).map(lambda record: record['name'])
excel_files_rdd = files_rdd.filter(lambda file: '.xlsx' in file or '.xls' in file)

In [0]:
def sheet_names(file):
    blob_data = container_client.get_blob_client(file)
    blob = blob_data.download_blob().content_as_bytes()
    try:
        df = pd.read_excel(blob, None)
        return (file, list(df.keys()))
    except:
        return (None, None)

def read_files(row):
    file = row[0]
    sheet = row[1]
    match_string = 'Evaluation & Management Services (CPT Codes 99201-99499)'
  
    blob_data = container_client.get_blob_client(file)
    blob = blob_data.download_blob().content_as_bytes()
    try:
        df = pd.read_excel(blob, sheet)
        df_string = df.to_string()
        if match_string in df_string:
            return (file, sheet)
    except:
        return (None, None)

sheetnames = excel_files_rdd.map(sheet_names).toDF(['File', 'Sheetname'])
sheetnames_expand = sheetnames.withColumn('Sheetname',explode('Sheetname'))

forms = sheetnames_expand.rdd.map(read_files).filter(lambda x: x is not None)
forms_list = forms.collect()

In [0]:
# Read each target excel sheet and concatentate all rows with CPT code
schema = StructType([
    StructField('Description', StringType(), True),
    StructField('CPT', StringType(), True),
    StructField('Charge', StringType(), False)
])
df_all = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)

for form in forms_list:
  filepath = form[0]
  sheet = form[1]
  blob_data = container_client.get_blob_client(filepath)
  try:
    with open("temp.xlsx", "wb") as temp:
      data = blob_data.download_blob()
      data.readinto(temp)
    pdf = pd.read_excel("temp.xlsx", sheet_name = sheet, 
                        usecols=[0,1,2], 
                        names = ['Description', 'CPT', 'Charge'])

    df_tmp = spark.createDataFrame(pdf, schema=schema)
    df_tmp = df_tmp.filter(df_tmp.CPT.rlike("\d{5}"))
    df_all = df_all.union(df_tmp)
  except:
    print("Cannot read xlsx file: %s" % (filepath))
    pass

In [0]:
# Clean up df_all
## Ex. ranges => CPT Code 97161-9763 -> 97161, 97162, 97163
## Ex. splits => CPT Code 81002 or 81003
df_ranges = df_all.filter(df_all.CPT.contains("-"))
df_split  = df_all.filter(df_all.CPT.rlike("/|or"))
df_int  = df_all.filter(~df_all.CPT.rlike("-|/|or"))

# df_ranges
df_array = df_ranges.withColumn('CPT_array', split('CPT', '-').cast(ArrayType(IntegerType())))
df_array = df_array.withColumn('CPT', explode(sequence(col('CPT_array')[0], col('CPT_array')[1])))
df_array = df_array.select(['Description', 'CPT', 'Charge'])

# df_split
df_split = df_split.withColumn('CPT',explode(split('CPT',"\/|or"))) 

# combine
df_new_rows = df_array.union(df_split)
df_final = df_new_rows.union(df_int)

In [0]:
# Export df_final to blob storage
output = "clean_data_chargemasters"
df_final.write.option("header", "true").mode("overwrite").parquet(output)
for file in os.listdir("/dbfs/" + output):
  try:
    with open("/dbfs/%s/%s" % (output, file), "rb") as data:
        container_client.upload_blob(name="%s/%s/%s" % (output, folder, file), data=data, overwrite=True)
  except Exception as e:
    print(e)

In [0]:
dbutils.notebook.exit("%s/%s/%s" % (blob_container_name, output, folder))