
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

File #1 - OP_DTL_GNRL_PGYR2023_P01302025_{dateOfFilesExtraction-MMDDYYYY}.csv: 
This file contains the data set of General Payments reported for the 2023 program year. General Payments are defined as payments or other transfers of value made to a covered recipient (physician, non-physician practitioner or teaching hospital) that are not made in connection with a research agreement or research protocol. 

File #2 - OP_DTL_RSRCH_PGYR2023_P01302025_{dateOfFilesExtraction-MMDDYYYY}.csv:
This file contains the data set of Research Payments reported for the 2023 program year. Research Payments are defined as payments or other transfers of value made in connection with a research agreement or research protocol.

Covered Recipient Profile Supplement File

The Covered Recipient Profile Supplement file contains information about physicians and non-physician practitioners who have been indicated as recipients of payments, other transfers of value, or ownership and investment interest in payment records, as well as physicians and non-physician practitioners who have been identified as principal investigators associated with research payment records published by Open Payments.

This file contains only those physicians that have at least one published payment record in this cycle of the publication as of May 30, 2024. The criteria used by the Centers for Medicare and Medicaid Services (CMS) to determine which payment records are eligible for publication is available in the Open Payments Methodology and Data Dictionary Document. This document can be found on the Resources page of the Open Payments website (https://www.cms.gov/OpenPayments/Resources). The Methodology and Data Dictionary Document also includes information on the data collection and reporting methodology, data fields included in the files, and any notes or special considerations that users should be aware of. 


Here we have taken in consideration the General Paymenst file, Research file. Since the nature of payments present in the General Payments file are eligible for payment. Moreover, the costs of the research is also eligible for payments. Hence they are considered.

This code runs in a Databricks notebook and uses shell commands to automate the download and extraction of a ZIP file containing Open Payments data from the CMS (Centers for Medicare & Medicaid Services) website. Downloads a ZIP file from the CMS Open Payments website and saves it as openpayments_2023.zip in the /tmp folder on the Databricks driver node. The -L option ensures that if the URL redirects, the download still works. Unzips (extracts) the contents of that ZIP file into a new folder called /tmp/openpayments_2023.

In [0]:
%%sh
# Download the CMS Open Payments ZIP file
curl -L https://download.cms.gov/openpayments/PGYR2023_P01302025_01212025.zip -o /tmp/openpayments_2023.zip

# Unzip the file to a directory
unzip /tmp/openpayments_2023.zip -d /tmp/openpayments_2023

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  1  752M    1 11.6M    0     0  30.4M      0  0:00:24 --:--:--  0:00:24 30.4M 16  752M   16  123M    0     0  88.9M      0  0:00:08  0:00:01  0:00:07 89.0M 41  752M   41  310M    0     0   130M      0  0:00:05  0:00:02  0:00:03  130M 69  752M   69  525M    0     0   155M      0  0:00:04  0:00:03  0:00:01  155M 87  752M   87  661M    0     0   150M      0  0:00:04  0:00:04 --:--:--  150M100  752M  100  752M    0     0   155M      0  0:00:04  0:00:04 --:--:--  165M


Archive:  /tmp/openpayments_2023.zip
  inflating: /tmp/openpayments_2023/OP_PGYR2023_README_P01302025.txt  
  inflating: /tmp/openpayments_2023/OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025.csv  
  inflating: /tmp/openpayments_2023/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv  
  inflating: /tmp/openpayments_2023/OP_DTL_RSRCH_PGYR2023_P01302025_01212025.csv  
  inflating: /tmp/openpayments_2023/OP_REMOVED_DELETED_PGYR2023_P01302025_01212025.csv  


The line import os loads a built-in Python module called os, which helps work with files and directories.

The line os.listdir("/tmp/openpayments_2023") checks the /tmp/openpayments_2023 folder and returns a list of all the files inside it.

In [0]:
import os

# List the extracted files
os.listdir("/tmp/openpayments_2023")

Out[2]: ['OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025.csv',
 'OP_DTL_RSRCH_PGYR2023_P01302025_01212025.csv',
 'OP_REMOVED_DELETED_PGYR2023_P01302025_01212025.csv',
 'OP_PGYR2023_README_P01302025.txt',
 'OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv']

In [0]:
# Reading the CSV file into a Spark DataFrame
df = spark.read.csv("file:/tmp/openpayments_2023/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv", header=True, inferSchema=True)

# Saving it as a Delta table
df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("General_2023_OpenPayments")

The below query for checking whether all the rows have been loaded in the table from the file.[](url)

In [0]:
%sql
Select count(*) 
 from General_2023_OpenPayments 

count(1)
14607607


The below query for checking the different types of Types Covered Recipient Type in th table. Since all the types may not be eligible for reiumbursement.

In [0]:
%sql
Select distinct Covered_Recipient_Type from General_2023_OpenPayments

Covered_Recipient_Type
Covered Recipient Teaching Hospital
Covered Recipient Non-Physician Practitioner
Covered Recipient Physician
"192.95"""
772.90
""
244.25
"736.91"""
040.29
MDR document review Round 2. Final copy review CER


The below code is for loading the Research file in a dataframe and then into a table.

In [0]:
# File location and type
file_location = "/FileStore/tables/OP_DTL_RSRCH_PGYR2023_P01302025_01212025-1.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

#display(df)

In [0]:
# Create a view or table

temp_table_name = "Research_table"

df.createOrReplaceTempView(temp_table_name)

The below query for checking whether all the rows have been loaded in the table from the file.[](url)

In [0]:
%sql
Select count(*) from Research_table;

count(1)
1028283


The below code is for loading the Recepient file in a dataframe and then into a table.

In [0]:
# File location and type
file_location = "/FileStore/tables/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

#display(df)

In [0]:
# Create a view or table

temp_table_name = "Recepient_table"

df.createOrReplaceTempView(temp_table_name)

The below query for checking whether all the rows have been loaded in the table from the file.[](url)

In [0]:
%sql
Select count(*) from Recepient_table

count(1)
1530413


The General_2023_OpenPayments is joined with Recepient_table based on the Covered_Recipient_Profile_ID to have the records that are eligible for the reuimbursement.

In [0]:

from pyspark.sql.functions import col

# Loading the tables tables
General_Open_Payments_df = spark.table("General_2023_OpenPayments")
recepient_df = spark.table("Recepient_table")

# To match the key column types match
General_Open_Payments_df = General_Open_Payments_df.withColumn("Covered_Recipient_Profile_ID", col("Covered_Recipient_Profile_ID").cast("string"))
recepient_df = recepient_df.withColumn("Covered_Recipient_Profile_ID", col("Covered_Recipient_Profile_ID").cast("string"))

# Filtering Research using inner join, but only keeping Research columns
General_Open_Payments_df_filtered = General_Open_Payments_df.join(
    recepient_df.select("Covered_Recipient_Profile_ID").dropna().distinct(),
    on="Covered_Recipient_Profile_ID",
    how="inner"
).select(General_Open_Payments_df.columns)  # <- Only keeps Research_csv columns

# Saving as new table
General_Open_Payments_df_filtered.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("General_Open_Payments_df_filtered")


In [0]:
%sql
Select distinct Covered_Recipient_Type
 from General_Open_Payments_df_filtered

The below code is used to filter the unwanted Types from the Covered_Recipient_Type in order to clean the datasets

In [0]:
from pyspark.sql.functions import col

# Loading the table into a DataFrame
df = spark.table("General_Open_Payments_df_filtered")

# Defining allowed values
valid_types = [
    "Covered Recipient Physician",
    "Covered Recipient Non-Physician Practitioner",
    "Covered Recipient Physician/Covered Recipient Non-Physician Practitioner"
]

# Filtering the DataFrame
filtered_df = df.filter(col("Covered_Recipient_Type").isin(valid_types))

# Overwriting the table with filtered data
filtered_df.write.mode("overwrite").saveAsTable("General_Open_Payments_df_filtered")

In [0]:
%sql
Select distinct Covered_Recipient_Type
 from General_Open_Payments_df_filtered

Covered_Recipient_Type
Covered Recipient Non-Physician Practitioner
Covered Recipient Physician


The Research table is joined with Recepient_table based on the Covered_Recipient_Profile_ID to have the records that are eligible for the reuimbursement.

In [0]:
from pyspark.sql.functions import col

# Load tables
Research_table_df = spark.table("Research_table")
recepient_df = spark.table("Recepient_table")

# Ensure key column types match
Research_table_df = Research_table_df.withColumn("Covered_Recipient_Profile_ID", col("Covered_Recipient_Profile_ID").cast("string"))
recepient_df = recepient_df.withColumn("Covered_Recipient_Profile_ID", col("Covered_Recipient_Profile_ID").cast("string"))

# Filter Research using inner join, but only keep Research columns
Research_table_df_filtered = Research_table_df.join(
    recepient_df.select("Covered_Recipient_Profile_ID").dropna().distinct(),
    on="Covered_Recipient_Profile_ID",
    how="inner"
).select(Research_table_df.columns)  # <- Only keeps Research_csv columns

# Save as new table
Research_table_df_filtered.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("Research_table_df_filtered")


The below query to check the counts eligible for the Reuimbursement from the Research table.

In [0]:
%sql
Select count(*) from Research_table_df_filtered;

count(1)
26273


In [0]:
%sql
Select distinct Covered_Recipient_Type
 from Research_table_df_filtered

Covered_Recipient_Type
Covered Recipient Non-Physician Practitioner
Covered Recipient Physician


The below three code blocks is written to create a table with few selected columns that are required for the assignment. Thus it will save execution time. A union is performed on the tables to combine the records based on the y-axis. Thus all the records will be there in one table from where we can answer the questions of the assignment.

In [0]:
from pyspark.sql.functions import col

# Select and rename the necessary columns since union will be perfromed so the colum names should be same 
selected_df = Research_table_df_filtered.select(
    "Covered_Recipient_Profile_ID",
    "Covered_Recipient_NPI",
    "Covered_Recipient_First_Name",
    "Covered_Recipient_Middle_Name",
    "Covered_Recipient_Last_Name",
    "Covered_Recipient_Specialty_1",
    "Total_Amount_of_Payment_USDollars",
    col("Form_of_Payment_or_Transfer_of_Value").alias("Nature_of_Payment_or_Transfer_of_Value")
)

# Save as a new Delta table with renamed column
selected_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("Research_Payments_Modified")

In [0]:
# Select only the columns needed
selected_df = General_Open_Payments_df_filtered.select(
    "Covered_Recipient_Profile_ID",
    "Covered_Recipient_NPI",
    "Covered_Recipient_First_Name",
    "Covered_Recipient_Middle_Name",
    "Covered_Recipient_Last_Name",
    "Covered_Recipient_Specialty_1",
    "Total_Amount_of_Payment_USDollars",
    "Nature_of_Payment_or_Transfer_of_Value"
)

# Save the selected data as a new Delta table
selected_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("General_Open_Payments_Modified")

The below code is for performing union of the two tables

In [0]:
# Loading both tables
research_df = spark.table("Research_Payments_Modified")
general_df = spark.table("General_Open_Payments_Modified")

# Performing the union
union_df = research_df.unionByName(general_df)

# Showing the result
#union_df.show()

# Saving the union result as a new Delta table
union_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("Combined_Research_General_Payments")

In [0]:
#%sql
#Select * from Combined_Research_General_Payments

Covered_Recipient_Profile_ID,Covered_Recipient_NPI,Covered_Recipient_First_Name,Covered_Recipient_Middle_Name,Covered_Recipient_Last_Name,Covered_Recipient_Specialty_1,Total_Amount_of_Payment_USDollars,Nature_of_Payment_or_Transfer_of_Value
104121,1306827142.0,JON,CHRISTOPHER,STRINGER,Allopathic & Osteopathic Physicians|Internal Medicine,212.28,In-kind items and services
127875,1811959232.0,JEANINE,,DOWNIE,Allopathic & Osteopathic Physicians|Dermatology,2.77,In-kind items and services
127875,1811959232.0,JEANINE,,DOWNIE,Allopathic & Osteopathic Physicians|Dermatology,142.67,In-kind items and services
127875,1811959232.0,JEANINE,,DOWNIE,Allopathic & Osteopathic Physicians|Dermatology,147.43,In-kind items and services
127875,1811959232.0,JEANINE,,DOWNIE,Allopathic & Osteopathic Physicians|Dermatology,27.79,In-kind items and services
127875,1811959232.0,JEANINE,,DOWNIE,Allopathic & Osteopathic Physicians|Dermatology,25.9,In-kind items and services
127875,1811959232.0,JEANINE,,DOWNIE,Allopathic & Osteopathic Physicians|Dermatology,3.02,In-kind items and services
127875,1811959232.0,JEANINE,,DOWNIE,Allopathic & Osteopathic Physicians|Dermatology,26.29,In-kind items and services
127875,1811959232.0,JEANINE,,DOWNIE,Allopathic & Osteopathic Physicians|Dermatology,24.59,In-kind items and services
127875,1811959232.0,JEANINE,,DOWNIE,Allopathic & Osteopathic Physicians|Dermatology,147.43,In-kind items and services


In [0]:
# 1. What is the Nature of Payments with reimbursement amounts greater than $1,000 ordered by count?

from pyspark.sql.functions import col

# Loading the unified payments table
df = spark.table("Combined_Research_General_Payments")

# Filtering payments greater than $1,000 and group by Nature of Payment
nature_over_1000 = df.filter(col("Total_Amount_of_Payment_USDollars") > 1000) \
    .groupBy("Nature_of_Payment_or_Transfer_of_Value") \
    .count() \
    .orderBy(col("count").desc())

# Showing the result
nature_over_1000.show()

+--------------------------------------+------+
|Nature_of_Payment_or_Transfer_of_Value| count|
+--------------------------------------+------+
|                  Compensation for ...|161476|
|                        Consulting Fee|104564|
|                    Travel and Lodging| 24711|
|                             Honoraria| 13741|
|                             Education| 12503|
|                    Royalty or License| 10573|
|                  Compensation for ...|  8655|
|                  Cash or cash equi...|  6156|
|                                 Grant|  2222|
|                  In-kind items and...|  2001|
|                  Long term medical...|   998|
|                     Food and Beverage|   951|
|                                  Gift|   578|
|                          Acquisitions|   562|
|                      Debt forgiveness|   438|
|                         Entertainment|    30|
|                  Charitable Contri...|    23|
+--------------------------------------+

The top category is "Compensation for services other than consulting" with 161,476 occurrences, followed by "Consulting Fee" with 104,564 occurrences. Other notable categories include "Travel and Lodging" with 24,711, "Honoraria" with 13,741, "Education" with 12,503, and "Royalty or License" with 10,573.

In [0]:
# 2. What are the top ten Nature of Payments by count?
from pyspark.sql.functions import col

# Load the unified payments table
df = spark.table("Combined_Research_General_Payments")

# Group by Nature of Payment and count, then get top 10
top_nature_by_count = df.groupBy("Nature_of_Payment_or_Transfer_of_Value") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(10)

# Show the result
top_nature_by_count.show()

+--------------------------------------+--------+
|Nature_of_Payment_or_Transfer_of_Value|   count|
+--------------------------------------+--------+
|                     Food and Beverage|13378081|
|                    Travel and Lodging|  545048|
|                  Compensation for ...|  230121|
|                        Consulting Fee|  169540|
|                             Education|  159397|
|                                  Gift|   31695|
|                             Honoraria|   20214|
|                    Royalty or License|   14007|
|                  In-kind items and...|   13639|
|                  Cash or cash equi...|   12646|
+--------------------------------------+--------+



The top ten nature of payments by count are led by Food and Beverage, which is by far the most frequent, with 13,378,081 instances. This is followed by Travel and Lodging, recorded 545,048 times, and Compensation for services other than consulting, with 230,121 instances. Consulting Fee appears next with 169,540 counts, and Education follows closely with 159,397 instances. Other notable categories include Gift with 31,695 counts, Honoraria with 20,214, and Royalty or License with 14,007.In-kind items and services, reported 13,639 times, and Cash or cash equivalent, which appears 12,646 times. These figures suggest that the majority of payments are related to everyday interactions like meals, travel, and professional services.

In [0]:
# 3. What are the top ten Nature of Payments by total amount?
from pyspark.sql.functions import col, sum as spark_sum

# Load the unified payments table
df = spark.table("Combined_Research_General_Payments")

# Group by Nature of Payment and sum total amount
top_nature_by_amount = df.groupBy("Nature_of_Payment_or_Transfer_of_Value") \
    .agg(spark_sum("Total_Amount_of_Payment_USDollars").alias("Total_Amount")) \
    .orderBy(col("Total_Amount").desc()) \
    .limit(10)

# Show the result
top_nature_by_amount.show()


+--------------------------------------+--------------------+
|Nature_of_Payment_or_Transfer_of_Value|        Total_Amount|
+--------------------------------------+--------------------+
|                    Royalty or License|      6.2421668879E8|
|                  Compensation for ...| 5.523960288000003E8|
|                        Consulting Fee|4.9882258721000195E8|
|                     Food and Beverage| 3.743433390699808E8|
|                    Travel and Lodging|1.7943128266999456E8|
|                          Acquisitions|       7.192577676E7|
|                  Cash or cash equi...| 6.185926724000004E7|
|                             Education| 5.541025349000245E7|
|                             Honoraria| 5.523104170999998E7|
|                  Compensation for ...|2.8615945760000005E7|
+--------------------------------------+--------------------+



The category with the highest total payment amount is Royalty or License, followed by Compensation for services other than consulting. In third place is Consulting Fee.
Next is Food and Beverage, followed by Travel and Lodging. Acquisitions comes after that, while Cash or cash equivalent ranks next in line. Education follows, with Honoraria close behind. Finally, another category of Compensation for services completes the top ten.
These results indicate that while some categories like Food and Beverage and Travel and Lodging occur more frequently, categories such as Royalty or License and Consulting Fee are associated with higher overall payment amounts.

In [0]:
#4. What are the top ten physician specialties by total amount?

from pyspark.sql.functions import col, sum as spark_sum

# Load the unified payments table
df = spark.table("Combined_Research_General_Payments")

# Group by specialty and sum the total payment
top_specialties = df.groupBy("Covered_Recipient_Specialty_1") \
    .agg(spark_sum("Total_Amount_of_Payment_USDollars").alias("Total_Amount")) \
    .orderBy(col("Total_Amount").desc()) \
    .limit(10)

# Show the result
top_specialties.show(truncate=False)

+------------------------------------------------------------------------------------------------+--------------------+
|Covered_Recipient_Specialty_1                                                                   |Total_Amount        |
+------------------------------------------------------------------------------------------------+--------------------+
|Allopathic & Osteopathic Physicians|Orthopaedic Surgery                                         |4.0579177970999736E8|
|Allopathic & Osteopathic Physicians|Internal Medicine                                           |1.3559457535999954E8|
|Allopathic & Osteopathic Physicians|Psychiatry & Neurology|Neurology                            |9.472031309999979E7 |
|Allopathic & Osteopathic Physicians|Neurological Surgery                                        |8.697465682000007E7 |
|Allopathic & Osteopathic Physicians|Dermatology                                                 |8.618354141000035E7 |
|Allopathic & Osteopathic Physicians|Int

According to the results, the physician specialty that received the highest total payment amount is Orthopaedic Surgery, which falls under Allopathic & Osteopathic Physicians. The second highest is Internal Medicine, followed by Neurology, categorized under Psychiatry & Neurology.

Next is Neurological Surgery, followed by Dermatology. Specialists in Hematology & Oncology, a branch of Internal Medicine, come next, followed closely by Cardiovascular Disease, also under Internal Medicine.

Adult Reconstructive Orthopaedic Surgery, a subspecialty of Orthopaedic Surgery, ranks next, followed by Psychiatry, which falls under Psychiatry & Neurology. Finally, general Surgery specialists complete the top ten.

These results show that specialties in surgery, internal medicine, and neurology received the highest overall payment amounts.

In [0]:
# 5. Who are the top ten physicians by total amount?
from pyspark.sql.functions import col, sum as spark_sum, concat_ws

# Load the unified payments table
df = spark.table("Combined_Research_General_Payments")

# Create a full name column for readability
df = df.withColumn(
    "Physician_Full_Name",
    concat_ws(" ", col("Covered_Recipient_First_Name"),col("Covered_Recipient_Middle_Name"), col("Covered_Recipient_Last_Name"))
)

# Group by physician name and sum total payments
top_physicians = df.groupBy("Physician_Full_Name") \
    .agg(spark_sum("Total_Amount_of_Payment_USDollars").alias("Total_Amount")) \
    .orderBy(col("Total_Amount").desc()) \
    .limit(10)

# Show the result
top_physicians.show()

+-------------------+--------------------+
|Physician_Full_Name|        Total_Amount|
+-------------------+--------------------+
|   STEPHEN BURKHART|       3.392202493E7|
|     WILLIAM BINDER|       2.943410815E7|
|        KEVIN FOLEY|1.7305782900000002E7|
|        IVAN OSORIO|       1.606551551E7|
|     GEORGE MAXWELL|       1.160032024E7|
|       ROBERT BOOTH|          8459167.19|
|    NEAL ELATTRACHE|           7810628.2|
|    AARON ROSENBERG|   6883627.290000001|
|      ROGER JACKSON|          6615711.26|
|      PETER BONUTTI|   6385096.170000001|
+-------------------+--------------------+



According to the results, the physician who received the highest total payment amount is Stephen Burkhart. He is followed by William Binder, and in third place is Kevin Foley. Next is Ivan Osorio, followed by George Maxwell.

Robert Booth ranks sixth, followed by Neal ElAttrache in seventh place. Aaron Rosenberg is eighth on the list, while Roger Jackson is ninth. Finally, Peter Bonutti completes the top ten.