In [0]:
#List Uploaded Files with dbutils
display(dbutils.fs.ls("FileStore/tables/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025-1.csv,OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025-1.csv,357419225,1742753006000
dbfs:/FileStore/tables/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025-2.csv,OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025-2.csv,357419225,1742753143000
dbfs:/FileStore/tables/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv,OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv,357419225,1742354821000
dbfs:/FileStore/tables/OP_CVRD_RCPNT_PRFL_SPLMTL_README_P01302025-1.txt,OP_CVRD_RCPNT_PRFL_SPLMTL_README_P01302025-1.txt,3674,1742752969000
dbfs:/FileStore/tables/OP_CVRD_RCPNT_PRFL_SPLMTL_README_P01302025-2.txt,OP_CVRD_RCPNT_PRFL_SPLMTL_README_P01302025-2.txt,3674,1742753146000
dbfs:/FileStore/tables/OP_CVRD_RCPNT_PRFL_SPLMTL_README_P01302025.txt,OP_CVRD_RCPNT_PRFL_SPLMTL_README_P01302025.txt,3674,1742354799000
dbfs:/FileStore/tables/OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025-1.csv,OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025-1.csv,1827342,1742753009000
dbfs:/FileStore/tables/OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025-2.csv,OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025-2.csv,1827342,1742753036000
dbfs:/FileStore/tables/OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025.csv,OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025.csv,1827342,1742354824000
dbfs:/FileStore/tables/OP_DTL_RSRCH_PGYR2023_P01302025_01212025-1.csv,OP_DTL_RSRCH_PGYR2023_P01302025_01212025-1.csv,992593986,1742753177000


In [0]:
# Copy the uploaded ZIP files from FileStore to /tmp/
dbutils.fs.cp("dbfs:/FileStore/tables/PGYR2023_P01302025_01212025-3.zip", "file:/tmp/general.zip")
dbutils.fs.cp("dbfs:/FileStore/tables/PHPRFL_P01302025_01212025-3.zip", "file:/tmp/recipient.zip")

Out[22]: True

In [0]:
#Extract the ZIP files from /tmp/
import zipfile

# Extract general payments data
with zipfile.ZipFile("/tmp/general.zip", 'r') as zip_ref:
    zip_ref.extractall("/tmp/openpayments_general")

# Extract recipient profiles data
with zipfile.ZipFile("/tmp/recipient.zip", 'r') as zip_ref:
    zip_ref.extractall("/tmp/openpayments_recipient")

In [0]:
#Confirm the CSV files were extracted
import os

print("General Payment Files:")
print(os.listdir("/tmp/openpayments_general"))

print("Recipient Files:")
print(os.listdir("/tmp/openpayments_recipient"))


General Payment Files:
['OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025.csv', 'OP_DTL_RSRCH_PGYR2023_P01302025_01212025.csv', 'OP_REMOVED_DELETED_PGYR2023_P01302025_01212025.csv', 'OP_PGYR2023_README_P01302025.txt', 'OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv']
Recipient Files:
['OP_CVRD_RCPNT_PRFL_SPLMTL_README_P01302025.txt', 'OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv']


In [0]:
import shutil
import os

# Make sure destination exists
os.makedirs("/dbfs/FileStore/tables/", exist_ok=True)

# Copy general file to FileStore
shutil.copy("/tmp/openpayments_general/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv", "/dbfs/FileStore/tables/general.csv")

# Copy recipient file
shutil.copy("/tmp/openpayments_recipient/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv", "/dbfs/FileStore/tables/recipient.csv")


Out[7]: '/dbfs/FileStore/tables/recipient.csv'

In [0]:
#dbfs file list
import os

print("✅ Files in /dbfs/FileStore/tables/:")
print(os.listdir("/dbfs/FileStore/tables/"))


✅ Files in /dbfs/FileStore/tables/:
['recipient.csv', 'general.csv']


In [0]:
# Load files using 'file'
general_df = spark.read.csv("file:/dbfs/FileStore/tables/general.csv", header=True, inferSchema=True)
recipient_df = spark.read.csv("file:/dbfs/FileStore/tables/recipient.csv", header=True, inferSchema=True)

# Show previews
print("✅ General Payments Preview:")
general_df.show(5)

print("✅ Recipient Profiles Preview:")
recipient_df.show(5)


✅ General Payments Preview:
+-----------+----------------------+---------------------+--------------------+----------------------+----------------------------+---------------------+----------------------------+-----------------------------+---------------------------+-----------------------------+-----------------------------------------------+-----------------------------------------------+---------------+---------------+------------------+-----------------+------------------+---------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------------+-------------------------------------+-------------------------------------+----

In [0]:
#1. What is the Nature of Payments with reimbursement amounts > $1,000, ordered by count?
from pyspark.sql.functions import col, count

general_df.filter(col("Total_Amount_of_Payment_USDollars") > 1000) \
    .groupBy("Nature_of_Payment_or_Transfer_of_Value") \
    .agg(count("*").alias("Count")) \
    .orderBy(col("Count").desc()) \
    .show(truncate=False)


+--------------------------------------------------------------------------------------------------------------------------------------------------+------+
|Nature_of_Payment_or_Transfer_of_Value                                                                                                            |Count |
+--------------------------------------------------------------------------------------------------------------------------------------------------+------+
|Compensation for services other than consulting, including serving as faculty or as a speaker at a venue other than a continuing education program|164092|
|Consulting Fee                                                                                                                                    |105228|
|Travel and Lodging                                                                                                                                |24738 |
|Honoraria                                                      

In [0]:
#2. What are the Top 10 Nature of Payments by count?
general_df.groupBy("Nature_of_Payment_or_Transfer_of_Value") \
    .agg(count("*").alias("Count")) \
    .orderBy(col("Count").desc()) \
    .show(10, truncate=False)


+--------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|Nature_of_Payment_or_Transfer_of_Value                                                                                                            |Count   |
+--------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|Food and Beverage                                                                                                                                 |13378464|
|Travel and Lodging                                                                                                                                |545086  |
|Compensation for services other than consulting, including serving as faculty or as a speaker at a venue other than a continuing education program|236628  |
|Consulting Fee                                     

In [0]:
#3. What are the Top 10 Nature of Payments by total amount?
from pyspark.sql.functions import sum as spark_sum

general_df.groupBy("Nature_of_Payment_or_Transfer_of_Value") \
    .agg(spark_sum("Total_Amount_of_Payment_USDollars").alias("Total_Payment")) \
    .orderBy(col("Total_Payment").desc()) \
    .show(10, truncate=False)


+--------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+
|Nature_of_Payment_or_Transfer_of_Value                                                                                                            |Total_Payment       |
+--------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+
|Royalty or License                                                                                                                                |1.19217456302E9     |
|Compensation for services other than consulting, including serving as faculty or as a speaker at a venue other than a continuing education program|5.946326876500002E8 |
|Consulting Fee                                                                                                                                    |5.

In [0]:
# get column names 
print("🔎 Columns in general_df:")
for col_name in general_df.columns:
    print(col_name)


🔎 Columns in general_df:
Change_Type
Covered_Recipient_Type
Teaching_Hospital_CCN
Teaching_Hospital_ID
Teaching_Hospital_Name
Covered_Recipient_Profile_ID
Covered_Recipient_NPI
Covered_Recipient_First_Name
Covered_Recipient_Middle_Name
Covered_Recipient_Last_Name
Covered_Recipient_Name_Suffix
Recipient_Primary_Business_Street_Address_Line1
Recipient_Primary_Business_Street_Address_Line2
Recipient_City
Recipient_State
Recipient_Zip_Code
Recipient_Country
Recipient_Province
Recipient_Postal_Code
Covered_Recipient_Primary_Type_1
Covered_Recipient_Primary_Type_2
Covered_Recipient_Primary_Type_3
Covered_Recipient_Primary_Type_4
Covered_Recipient_Primary_Type_5
Covered_Recipient_Primary_Type_6
Covered_Recipient_Specialty_1
Covered_Recipient_Specialty_2
Covered_Recipient_Specialty_3
Covered_Recipient_Specialty_4
Covered_Recipient_Specialty_5
Covered_Recipient_Specialty_6
Covered_Recipient_License_State_code1
Covered_Recipient_License_State_code2
Covered_Recipient_License_State_code3
Covered_R

In [0]:
#4. What are the Top 10 Physician Specialties by total amount?
from pyspark.sql.functions import sum as spark_sum, col

general_df.groupBy("Covered_Recipient_Specialty_1") \
    .agg(spark_sum("Total_Amount_of_Payment_USDollars").alias("Total_Payment")) \
    .orderBy(col("Total_Payment").desc()) \
    .show(10, truncate=False)



+------------------------------------------------------------------------------------------------+--------------------+
|Covered_Recipient_Specialty_1                                                                   |Total_Payment       |
+------------------------------------------------------------------------------------------------+--------------------+
|null                                                                                            |7.936674692300001E8 |
|Allopathic & Osteopathic Physicians|Orthopaedic Surgery                                         |4.0345021308999825E8|
|Allopathic & Osteopathic Physicians|Internal Medicine                                           |1.3136300307000063E8|
|Allopathic & Osteopathic Physicians|Psychiatry & Neurology|Neurology                            |8.979213626000014E7 |
|Allopathic & Osteopathic Physicians|Neurological Surgery                                        |8.608847857000013E7 |
|Allopathic & Osteopathic Physicians|Der

In [0]:
# 5. Who are the Top 10 Physicians by total amount?
from pyspark.sql.functions import concat_ws, col, sum as spark_sum

# Combine first and last name into one column
general_df_with_names = general_df.withColumn(
    "Physician_Name",
    concat_ws(" ", col("Covered_Recipient_First_Name"), col("Covered_Recipient_Last_Name"))
)

# Group by Physician_Name and get total amount
general_df_with_names.groupBy("Physician_Name") \
    .agg(spark_sum("Total_Amount_of_Payment_USDollars").alias("Total_Payment")) \
    .orderBy(col("Total_Payment").desc()) \
    .show(10, truncate=False)

+----------------+-------------------+
|Physician_Name  |Total_Payment      |
+----------------+-------------------+
|                |7.933900857900002E8|
|STEPHEN BURKHART|3.392202493E7      |
|WILLIAM BINDER  |2.943437497E7      |
|KEVIN FOLEY     |1.73059378E7       |
|IVAN OSORIO     |1.606551551E7      |
|GEORGE MAXWELL  |1.160032024E7      |
|ROBERT BOOTH    |8459167.19         |
|NEAL ELATTRACHE |7810628.2          |
|AARON ROSENBERG |6883627.290000001  |
|ROGER JACKSON   |6615711.26         |
+----------------+-------------------+
only showing top 10 rows

