In [0]:
# pyspark functions
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt

# URL processing
import urllib

# modify default show()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [0]:
# Check the contents in tables folder
display(dbutils.fs.ls("/FileStore/tables"))

In [0]:
# Define file type
file_type = 'csv'

# Whether the file has a header
first_row_is_header = "true"

# Delimiter used in file
delimiter = ","

# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
    .option("header", first_row_is_header)\
    .option("sep", delimiter)\
    .load("/FileStore/tables/CWND_accessKeys.csv")

In [0]:
# Get the AWS access key and secret ikey from the spark dataframe
ACCESS_KEY = aws_keys_df.select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').collect()[0]['Secret access key']

# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
# AWS S3 bucket name
AWS_S3_BUCKET = "data603final"

# Mount name for the bucket
MOUNT_NAME = "/mnt/data603final"

# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)

#MOUNT the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

In [0]:
# Check if hte AWS S3 bucket was mounted successfully
display(dbutils.fs.ls("/mnt/data603final/"))

In [0]:
# File location and type
file_location = "/mnt/data603final/378_49A0.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files
df_378 = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .load(file_location)
          
# Take a look at the data
display(df_378)

In [0]:
#for sake of simplicity keep only certain columns

columnsToKeeps = [
    "negotiated_type",
    "negotiated_rate",
    "expiration_date",
    "billing_class",
    "negotiation_arrangement",
    "name",
    "billing_code_type",
    "billing_code_type_version",
    "billing_code",
    "description"
]

base378 = df_378.select(columnsToKeeps)
base378.cache().count()
display(base378)

In [0]:
# count the number of unique billing codes and descriptions
base378.select(count("*").alias("Total Rows"),countDistinct("billing_code").alias("number of billing codes"),countDistinct("description").alias("number of descriptions"),countDistinct("name").alias("number of names")).show()

In [0]:
# count top billing codes
(base378
 .select("billing_code","description")
 .groupBy("billing_code","description")
 .count()
 .withColumn("count_f", format_number("count", 0))
 .orderBy("count", ascending=False)
 .select("billing_code","description","count_f")
 .show(n=50, truncate=True))

In [0]:
# count top descriptions
(base378
 .select("description")
 .groupBy("description")
 .count()
 .withColumn("count_f", format_number("count", 0))
 .orderBy("count", ascending=False)
 .select("description","count_f")
 .show(n=50, truncate=True))

In [0]:
# count DRG
(base378
 .select("billing_code").where((col("description") == "DRG - Description"))
 .groupBy("billing_code")
 .count()
 .orderBy("count", ascending=False)
 .show(n=50, truncate=False))

In [0]:
#create temp view

base378.createOrReplaceTempView("base378_view")

In [0]:
spark.sql("SELECT description, count(*) as count_desc, FORMAT_NUMBER(AVG(negotiated_rate), 2) AS avg_rate, FORMAT_NUMBER(stddev(negotiated_rate), 4) AS std_dev_rate  \
          FROM base378_view \
          GROUP BY description \
          ORDER by count_desc DESC").show(n=100, truncate=True)

In [0]:
# least variation, with var not equal to NULL or zero

spark.sql("SELECT description, \
                count(*) as count_desc, \
                FORMAT_NUMBER(AVG(negotiated_rate), 2) AS avg_rate, \
                FORMAT_NUMBER(stddev(negotiated_rate), 4) AS std_dev_rate  \
          FROM base378_view \
          GROUP BY description \
          HAVING std_dev_rate IS NOT NULL and std_dev_rate !=0 \
          ORDER by std_dev_rate").show(n=100, truncate=True)

In [0]:
# most variation

spark.sql("SELECT description, \
                count(*) as count_desc, \
                FORMAT_NUMBER(AVG(negotiated_rate), 2) AS avg_rate, \
                stddev(negotiated_rate) AS std_dev_rate  \
          FROM base378_view \
          GROUP BY description \
          ORDER by std_dev_rate DESC").show(n=100, truncate=True)

In [0]:
# most variation

spark.sql("SELECT name, \
                count(*) as count_desc, \
                FORMAT_NUMBER(AVG(negotiated_rate), 2) AS avg_rate, \
                FORMAT_NUMBER(stddev(negotiated_rate), 4) AS std_dev_rate,  \
                FORMAT_NUMBER(min(negotiated_rate), 2) AS min_rate, \
                FORMAT_NUMBER(max(negotiated_rate), 2) AS max_rate \
          FROM base378_view \
          GROUP BY name \
          ORDER by std_dev_rate DESC").show(n=100, truncate=False)

In [0]:
# most variation

spark.sql("SELECT billing_code_type, \
                count(*) as count_desc, \
                FORMAT_NUMBER(AVG(negotiated_rate), 2) AS avg_rate, \
                FORMAT_NUMBER(stddev(negotiated_rate), 4) AS std_dev_rate,  \
                FORMAT_NUMBER(min(negotiated_rate), 2) AS min_rate, \
                FORMAT_NUMBER(max(negotiated_rate), 2) AS max_rate \
          FROM base378_view \
          GROUP BY billing_code_type \
          ORDER by std_dev_rate DESC").show(n=100, truncate=False)

In [0]:
# variation without DRG - Description

spark.sql("SELECT billing_class, \
                FORMAT_NUMBER(AVG(negotiated_rate), 2) AS avg_rate, \
                FORMAT_NUMBER(stddev(negotiated_rate), 4) AS std_dev_rate  \
          FROM base378_view \
          WHERE description!='DRG - Description'\
          GROUP BY billing_class \
          ORDER by std_dev_rate DESC").show(n=100, truncate=False)

In [0]:
# variation without DRG - Description

spark.sql("SELECT billing_class, \
                FORMAT_NUMBER(AVG(negotiated_rate), 2) AS avg_rate, \
                FORMAT_NUMBER(stddev(negotiated_rate), 4) AS std_dev_rate  \
          FROM base378_view \
          GROUP BY billing_class \
          ORDER by std_dev_rate DESC").show(n=100, truncate=False)

In [0]:
# generate a distinct list of billing codes that are DRG - DESCRIPTION
df_drg_only = (base378.select("billing_code").distinct()
              .where(col("description") == "DRG - Description"))
df_drg_only.show(10, truncate=False)

In [0]:
# merge those DRG billing codes back into big data to find other Descriptions

df_drg_lim = (base378.join(df_drg_only,"billing_code"))

display(df_drg_lim)

In [0]:
# look for other descriptions

(df_drg_lim
 .select("description").distinct()).show(truncate=False)

In [0]:
# prove that one billing code maps to multiple descriptions
multi_bc=(df_drg_lim
 .select("billing_code","description")
 .groupBy("billing_code","description")
 .count()
 .orderBy("description","billing_code", ascending=True))

display(multi_bc)

In [0]:
# show the billing code's with more than one description
multi_bc_lim = (multi_bc
 .select("billing_code","description")
 .groupBy("billing_code")
 .count()
 .orderBy("count", ascending=False))

display(multi_bc_lim)

In [0]:
# isolate the billing codes that have more than one description

multi_bc_ge2 = (multi_bc_lim
              .select("billing_code").where(col("count")>=2))
display(multi_bc_ge2)

In [0]:
# show that i have the right billing codes

multi_bc_list = (multi_bc.join(multi_bc_ge2,"billing_code"))

display(multi_bc_list)

In [0]:
# limit data to list of billing codes and change variable name for later merge

df_drg_corct = (multi_bc_list.select("billing_code","description").distinct().withColumnRenamed("description", "description_corrected").where(col("description")!="DRG - Description"))

display(df_drg_corct)

In [0]:
# audit: pre merge count for specific billing_code

(base378
 .select("billing_code","description").where((col("description") == "ANESTHESIA FOR PROCEDURES ON MAJOR LOWER ABDOMINAL VESSELS; NOT OTHERWISE SPECIFIED"))
 .groupBy("billing_code","description")
 .count()
 .orderBy("count", ascending=False)
 .show(n=50, truncate=False))

In [0]:
# pre merge count of total obs -- should not change after join
base378.select(count("*").alias("Total Rows")).show()

In [0]:
#merge in corrected descriptions via billing code with a left join

base378_corct = base378.join(df_drg_corct,["billing_code"], "left")

display(base378_corct)

In [0]:
# post merge count -- total obs
# i expect pre and post count to be equal since i'm not adding rows; i'm only adding a column

base378_corct.select(count("*").alias("Total Rows")).show()

In [0]:
# success! pre and post total obs count are equal

In [0]:
# overwrite the field DESCRIPTION whenever the field DESCRIPTION_CORRECTED is not null

base378_fix = base378_corct.withColumn("description", when(col("description_corrected").isNull(), col("description")).otherwise(col("description_corrected")))

In [0]:
# audit: post merge count

#i expect the count to be greater than pre count

(base378_fix
 .select("billing_code","description").where((col("description") == "ANESTHESIA FOR PROCEDURES ON MAJOR LOWER ABDOMINAL VESSELS; NOT OTHERWISE SPECIFIED"))
 .groupBy("billing_code","description")
 .count()
 .orderBy("count", ascending=False)
 .show(n=50, truncate=False))

In [0]:
# success! 35 (pre) << 39 (post)

In [0]:
#drop the field DESCRIPTION_CORRECTED now that correction is made

base378a = base378_fix.drop("description_corrected")

In [0]:
(base378a
 .select("negotiated_rate").where((col("description") == "ANESTHESIA FOR PROCEDURES ON MAJOR LOWER ABDOMINAL VESSELS; NOT OTHERWISE SPECIFIED"))
 .groupBy("negotiated_rate")
 .count()
 .orderBy("count", ascending=False)
 .show(n=50, truncate=False))

In [0]:
(base378
 .select("negotiated_rate").where((col("description") == "ANESTHESIA FOR PROCEDURES ON MAJOR LOWER ABDOMINAL VESSELS; NOT OTHERWISE SPECIFIED"))
 .groupBy("negotiated_rate")
 .count()
 .orderBy("count", ascending=False)
 .show(n=50, truncate=False))

In [0]:
desc="ANESTHESIA FOR PROCEDURES ON MAJOR LOWER ABDOMINAL VESSELS; NOT OTHERWISE SPECIFIED"

avg_crctd1 = base378a.filter(base378a.description == desc).select(avg(base378a.negotiated_rate)).collect()[0][0]
std_dev_corct1 = base378a.filter(base378a.description == desc).select(stddev(base378a.negotiated_rate)).collect()[0][0]

avg_orig1 = base378.filter(base378.description == desc).select(avg(base378.negotiated_rate)).collect()[0][0]
std_dev_orig1 = base378.filter(base378.description == desc).select(stddev(base378.negotiated_rate)).collect()[0][0]

# Print the result
print("The average negotiated_rate when the original desc is equal to '{}' is: {}".format(desc, avg_orig1))
print("The average negotiated_rate when the corrected desc is equal to '{}' is: {}".format(desc, avg_crctd1))

print("Standard Deviation of negotiated_rate when original description is", desc, ":", std_dev_orig1)
print("Standard Deviation of negotiated_rate when corrected description is", desc, ":", std_dev_corct1)

In [0]:
desc="ANESTHESIA FOR PROCEDURES ON MAJOR LOWER ABDOMINAL VESSELS; INFERIOR VENA CAVA LIGATION"

avg_crctd2 = base378a.filter(base378a.description == desc).select(avg(base378a.negotiated_rate)).collect()[0][0]
std_dev_corct2 = base378a.filter(base378a.description == desc).select(stddev(base378a.negotiated_rate)).collect()[0][0]

avg_orig2 = base378.filter(base378.description == desc).select(avg(base378.negotiated_rate)).collect()[0][0]
std_dev_orig2 = base378.filter(base378.description == desc).select(stddev(base378.negotiated_rate)).collect()[0][0]

# Print the result
print("The average negotiated_rate when the original description is equal to '{}' is: {}".format(desc, avg_orig2))
print("The average negotiated_rate when the corrected description is equal to '{}' is: {}".format(desc, avg_crctd2))

print("Standard Deviation of negotiated_rate when original description is", desc, ":", std_dev_orig2)
print("Standard Deviation of negotiated_rate when corrected description is", desc, ":", std_dev_corct2)

In [0]:
desc="ANESTHESIA FOR TRANSURETHRAL PROCEDURES (INCLUDING URETHROCYSTOSCOPY); WITH FRAGMENTATION, MANIPULATION AND/OR REMOVAL OF URETERAL CALCULUS"

avg_crctd3 = base378a.filter(base378a.description == desc).select(avg(base378a.negotiated_rate)).collect()[0][0]
std_dev_corct3 = base378a.filter(base378a.description == desc).select(stddev(base378a.negotiated_rate)).collect()[0][0]

avg_orig3 = base378.filter(base378.description == desc).select(avg(base378.negotiated_rate)).collect()[0][0]
std_dev_orig3 = base378.filter(base378.description == desc).select(stddev(base378.negotiated_rate)).collect()[0][0]

# Print the result
print("The average negotiated_rate when the original description is equal to '{}' is: {}".format(desc, avg_orig3))
print("The average negotiated_rate when the corrected description is equal to '{}' is: {}".format(desc, avg_crctd3))

print("Standard Deviation of negotiated_rate when original description is", desc, ":", std_dev_orig3)

print("Standard Deviation of negotiated_rate when corrected description is", desc, ":", std_dev_corct3)

In [0]:
# Filter the DataFrame based on the description value
filtered_df = base378.filter(base378.description == "ANESTHESIA FOR PROCEDURES ON MAJOR LOWER ABDOMINAL VESSELS; NOT OTHERWISE SPECIFIED")

# Extract the "negotiated_rate" column as a list
rates = filtered_df.select("negotiated_rate").rdd.flatMap(lambda x: x).collect()

# Plot the distribution using matplotlib
plt.hist(rates, bins=20, edgecolor='black')
plt.xlabel("Negotiated Rate")
plt.ylabel("Frequency")
plt.title("Distribution of Negotiated Rate")
plt.show()

In [0]:
# Filter the DataFrame based on the description value
filtered_df1 = base378a.filter(base378a.description == "ANESTHESIA FOR PROCEDURES ON MAJOR LOWER ABDOMINAL VESSELS; NOT OTHERWISE SPECIFIED")

# Extract the "negotiated_rate" column as a list
rates1 = filtered_df1.select("negotiated_rate").rdd.flatMap(lambda x: x).collect()

# Plot the distribution using matplotlib
plt.hist(rates1, bins=20, edgecolor='black')
plt.xlabel("Negotiated Rate")
plt.ylabel("Frequency")
plt.title("Distribution of Negotiated Rate")
plt.show()

In [0]:
# Filter the DataFrame
filtered_df = base378.filter(base378.description == "ANESTHESIA FOR PROCEDURES ON MAJOR LOWER ABDOMINAL VESSELS; NOT OTHERWISE SPECIFIED")

# Extract the "negotiated_rate" column
negotiated_rates = filtered_df.select("negotiated_rate").toPandas()["negotiated_rate"]

# Plot the data points
plt.scatter(range(len(negotiated_rates)), negotiated_rates)
plt.xlabel("Data Points")
plt.ylabel("Negotiated Rate")
plt.title("Individual Data Points for 'ANESTHESIA FOR PROCEDURES ON MAJOR LOWER ABDOMINAL VESSELS'")

plt.show()

In [0]:
# Filter the DataFrame
filtered_df = base378a.filter(base378a.description == "ANESTHESIA FOR PROCEDURES ON MAJOR LOWER ABDOMINAL VESSELS; NOT OTHERWISE SPECIFIED")

# Extract the "negotiated_rate" column
negotiated_rates = filtered_df.select("negotiated_rate").toPandas()["negotiated_rate"]

# Plot the data points
plt.scatter(range(len(negotiated_rates)), negotiated_rates)
plt.xlabel("Data Points")
plt.ylabel("Negotiated Rate")
plt.title("Individual Data Points for 'ANESTHESIA FOR PROCEDURES ON MAJOR LOWER ABDOMINAL VESSELS'")

plt.show()

In [0]:
# count top descriptions
(base378
 .select("description")
 .groupBy("description")
 .count()
 .orderBy("count", ascending=False)
 .show(n=50, truncate=False))

In [0]:
# count top name
(base378
 .select("description","name")
 .groupBy("description","name")
 .count()
 .orderBy("count", ascending=False)
 .show(n=50, truncate=False))

In [0]:
# count top billing_codes
(base378
 .select("billing_code")
 .groupBy("billing_code")
 .count()
 .orderBy("count", ascending=False)
 .show(n=50, truncate=False))

In [0]:
# count top names
(base378
 .select("name")
 .groupBy("name")
 .count()
 .orderBy("count", ascending=False)
 .show(n=50, truncate=False))

In [0]:
# count negotiated type
(base378
 .select("negotiated_type")
 .groupBy("negotiated_type")
 .count()
 .orderBy("count", ascending=False)
 .show(truncate=False))

In [0]:
# count billing class
(base378
 .select("billing_class")
 .groupBy("billing_class")
 .count()
 .withColumn("count", format_number("count", 0))
 .orderBy("count", ascending=False)
 .show(truncate=False))

In [0]:
# count negotiation arrangement
(base378
 .select("negotiation_arrangement")
 .groupBy("negotiation_arrangement")
 .count()
 .orderBy("count", ascending=False)
 .show(truncate=False))

In [0]:
total_count = base378.count()

# count billing code type
(base378
 .select("billing_code_type")
 .groupBy("billing_code_type")
 .count()
 .withColumn("count_f", format_number("count", 0))
 .withColumn("percentage", format_number(col("count") / total_count * 100, 2))
 .orderBy(desc("count"))
 .select("billing_code_type", "count_f", "percentage")
 .show(truncate=False))

In [0]:
# count billing code type
(base378
 .select("billing_class","billing_code_type")
 .groupBy("billing_class","billing_code_type")
 .count()
 .withColumn("count_formatted", format_number("count", 0))
 .orderBy("count", ascending=False)
 .show(truncate=False))

In [0]:
base378.groupby("billing_code").agg(sum("negotiated_rate").alias("sum_negotiated_rate")).show(n=10)

In [0]:
# I wanted to run all of this on a second dataset but ran out of time

# File location and type
file_location = "/mnt/data603final/443_69B0.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files
df_443 = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .load(file_location)
          
# Take a look at the data
display(df_443)

In [0]:
# Allow creating table using non-empty location
spark.conf.set("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation","True")

# Save Table
base378.write.format("parquet").saveAsTable('bcbsla_378_49a0')

# Save Table
df_443.write.format("parquet").saveAsTable('bcbsla_443_49a0')

In [0]:
# Remove the file if it was saved before
# dbutils.fs.rm('/mnt/data603final/378_49A0', True)

# Save to the mounted S3 bucket
base378.write.save(f'/mnt/data603final/378_49A0_New', format='csv')

# Check if the file was saved succesfully
display(dbutils.fs.ls("/mnt/data603final/378_49A0_New"))

In [0]:
# Unmount S3 bucket
dbutils.fs.unmount("/mnt/data603final")