
## Week 07 - Spark Application

Every year, CMS publishes complete datasets that consolidate the information submitted by reporting entities for active years of Open Payments data. Use 

Download the 2023 Open Payments file (https://download.cms.gov/openpayments/PGYR2023_P01302025_01212025.zipLinks to an external site.) and upload it to your Databricks cluster. 

Download the Covered Recipient File (https://download.cms.gov/openpayments/PHPRFL_P01302025_01212025.zipLinks to an external site.) and upload it to your Databricks cluster.

Use PySpark to perform the analyses specified below on the 2023 files.

In [0]:
import zipfile
import os


In [0]:
# Create a view or table

#temp_table_name = "PGYR2023_P01302025_01212025_zip"

#df.createOrReplaceTempView(temp_table_name)

In [0]:
%sh
unzip /tmp/PGYR2023_P01302025_01212025.zip -d /dbfs/FileStore/tables/PGYR2023_P01302025_01212025/

unzip /tmp/PHPRFL_P01302025_01212025.zip -d /dbfs/FileStore/tables/PHPRFL_P01302025_01212025/

unzip:  cannot find or open /tmp/PGYR2023_P01302025_01212025.zip, /tmp/PGYR2023_P01302025_01212025.zip.zip or /tmp/PGYR2023_P01302025_01212025.zip.ZIP.
unzip:  cannot find or open /tmp/PHPRFL_P01302025_01212025.zip, /tmp/PHPRFL_P01302025_01212025.zip.zip or /tmp/PHPRFL_P01302025_01212025.zip.ZIP.


In [0]:
# Copy the ZIP files from DBFS to the local file system (driver node)
dbutils.fs.cp("dbfs:/FileStore/tables/PGYR2023_P01302025_01212025.zip", "file:/tmp/PGYR2023_P01302025_01212025.zip")
dbutils.fs.cp("dbfs:/FileStore/tables/PHPRFL_P01302025_01212025.zip", "file:/tmp/PHPRFL_P01302025_01212025.zip")


Out[12]: True

In [0]:
# Define paths for the zip files and extraction folders
zip_file1 = '/dbfs/FileStore/tables/PGYR2023_P01302025_01212025.zip'
zip_file2 = '/dbfs/FileStore/tables/PHPRFL_P01302025_01212025.zip'
extract_dir1 = '/tmp/PGYR2023_P01302025_01212025'
extract_dir2 = '/tmp/PHPRFL_P01302025_01212025'

# Unzip the first file
with zipfile.ZipFile("/tmp/PGYR2023_P01302025_01212025.zip", 'r') as zip_ref:
    zip_ref.extractall(extract_dir1)

# Unzip the second file
with zipfile.ZipFile("/tmp/PHPRFL_P01302025_01212025.zip", 'r') as zip_ref:
    zip_ref.extractall(extract_dir2)

print("Unzipping completed.")


Unzipping completed.


In [0]:
%sh
ls /tmp/PGYR2023_P01302025_01212025


OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv
OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025.csv
OP_DTL_RSRCH_PGYR2023_P01302025_01212025.csv
OP_PGYR2023_README_P01302025.txt
OP_REMOVED_DELETED_PGYR2023_P01302025_01212025.csv


In [0]:
%sh
ls /tmp/PHPRFL_P01302025_01212025


OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv
OP_CVRD_RCPNT_PRFL_SPLMTL_README_P01302025.txt


In [0]:
# Reading the first CSV file
import os
print(os.path.exists("/tmp/PGYR2023_P01302025_01212025/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv"))

file_path1 = "file:/tmp/PGYR2023_P01302025_01212025/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv"
df1 = spark.read.csv(file_path1, header=True, inferSchema=True)

# Reading the second CSV file
file_path2 = "file:/tmp/PHPRFL_P01302025_01212025/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv"
df2 = spark.read.csv(file_path2, header=True, inferSchema=True)

# Show the first few rows of each dataframe
df1.show(5)
df2.show(5)


True
+-----------+----------------------+---------------------+--------------------+----------------------+----------------------------+---------------------+----------------------------+-----------------------------+---------------------------+-----------------------------+-----------------------------------------------+-----------------------------------------------+---------------+---------------+------------------+-----------------+------------------+---------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------------+-------------------------------------+-------------------------------------+---------------------------

## Answers

1. What is the Nature of Payments with reimbursement amounts greater than $1,000 ordered by count?

2. What are the top ten Nature of Payments by count?

3. What are the top ten Nature of Payments by total amount?

4. What are the top ten physician specialties by total amount?

5. Who are the top ten physicians by total amount?

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1. Nature of Payments with reimbursement amounts greater than $1000
high = df1.where(F.col("Total_Amount_of_Payment_USDollars") > 1000)
nat = (
    high
    .groupBy("Nature_of_Payment_or_Transfer_of_Value")
    .agg(F.count("*").alias("payment_count"))
    .sort(F.desc("payment_count"))
)
nat.show(10)

# 2. Top 10 payment types by frequency
topf = (
    df1
    .groupBy("Nature_of_Payment_or_Transfer_of_Value")
    .agg(F.count("*").alias("frequency"))
    .orderBy(F.desc("frequency"))
    .limit(10)
)
topf.show(10)

# 3. Top 10 payment types by dollar amount
hdpp = (
    df1
    .groupBy("Nature_of_Payment_or_Transfer_of_Value")
    .agg(F.sum(F.col("Total_Amount_of_Payment_USDollars")).alias("total_dollars"))
    .orderBy(F.desc("total_dollars"))
    .limit(10)
)
hdpp.show(10)

# 4. Top 10 physician specialties by total payment amount
spec_am = (
    df1.withColumn("Covered_Recipient_NPI", F.col("Covered_Recipient_NPI").cast("string"))
    .join(df2.withColumn("Covered_Recipient_NPI", F.col("Covered_Recipient_NPI").cast("string")), 
          "Covered_Recipient_NPI", 
          "left")
    .groupby("Covered_Recipient_Specialty_1")
    .agg(F.sum("Total_Amount_of_Payment_USDollars").alias("Total_Payment"))
    .orderBy(F.col("Total_Payment").desc())
    .limit(10)
)

spec_am.show(10)

# 5. Top 10 physicians by total payment amount
hppp = (
    df1
    .withColumn("full_name", F.concat_ws(" ", "Covered_Recipient_First_Name", "Covered_Recipient_Last_Name"))
    .groupBy("full_name")
    .agg(F.sum("Total_Amount_of_Payment_USDollars").alias("total_payment_amount"))
    .orderBy(F.desc("total_payment_amount"))
    .select(
        F.split("full_name", " ").getItem(0).alias("First_Name"),
        F.split("full_name", " ").getItem(1).alias("Last_Name"),
        "total_payment_amount"
    )
    .limit(10)
)
hppp.show(10)

+--------------------------------------+-------------+
|Nature_of_Payment_or_Transfer_of_Value|payment_count|
+--------------------------------------+-------------+
|                  Compensation for ...|       164092|
|                        Consulting Fee|       105228|
|                    Travel and Lodging|        24738|
|                             Honoraria|        13750|
|                             Education|        13376|
|                    Royalty or License|        11534|
|                  Compensation for ...|         8658|
|                                 Grant|         4922|
|                  Space rental or f...|         4917|
|                  Long term medical...|         2930|
+--------------------------------------+-------------+
only showing top 10 rows

+--------------------------------------+---------+
|Nature_of_Payment_or_Transfer_of_Value|frequency|
+--------------------------------------+---------+
|                     Food and Beverage| 13378464|
