In [0]:
#List Uploaded Files with dbutils
display(dbutils.fs.ls("FileStore/tables/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025-1.csv,OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025-1.csv,357419225,1742748308000
dbfs:/FileStore/tables/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv,OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv,357419225,1742661658000
dbfs:/FileStore/tables/OP_DTL_RSRCH_PGYR2023_P01302025_01212025.csv,OP_DTL_RSRCH_PGYR2023_P01302025_01212025.csv,992593986,1742750329000
dbfs:/FileStore/tables/PGYR2023_P01302025_01212025-1.zip,PGYR2023_P01302025_01212025-1.zip,789005271,1742755857000
dbfs:/FileStore/tables/PGYR2023_P01302025_01212025-2.zip,PGYR2023_P01302025_01212025-2.zip,789005271,1742765254000
dbfs:/FileStore/tables/PGYR2023_P01302025_01212025-3.zip,PGYR2023_P01302025_01212025-3.zip,789005271,1742765764000
dbfs:/FileStore/tables/PGYR2023_P01302025_01212025-4.zip,PGYR2023_P01302025_01212025-4.zip,789005271,1742769088000
dbfs:/FileStore/tables/PGYR2023_P01302025_01212025-5.zip,PGYR2023_P01302025_01212025-5.zip,789005271,1742779113000
dbfs:/FileStore/tables/PGYR2023_P01302025_01212025-6.zip,PGYR2023_P01302025_01212025-6.zip,789005271,1742780516000
dbfs:/FileStore/tables/PGYR2023_P01302025_01212025.zip,PGYR2023_P01302025_01212025.zip,789005271,1742749660000


In [0]:
# Copy the uploaded ZIP files from FileStore to /tmp/
dbutils.fs.cp("dbfs:/FileStore/tables/PGYR2023_P01302025_01212025-3.zip", "file:/tmp/general.zip")
dbutils.fs.cp("dbfs:/FileStore/tables/PHPRFL_P01302025_01212025-3.zip", "file:/tmp/recipient.zip")

Out[2]: True

In [0]:
#Extract the ZIP files from /tmp/
import zipfile

# Extract general payments data
with zipfile.ZipFile("/tmp/general.zip", 'r') as zip_ref:
    zip_ref.extractall("/tmp/openpayments_general")

# Extract recipient profiles data
with zipfile.ZipFile("/tmp/recipient.zip", 'r') as zip_ref:
    zip_ref.extractall("/tmp/openpayments_recipient")

In [0]:
#Confirm the CSV files were extracted
import os

print("General Payment Files:")
print(os.listdir("/tmp/openpayments_general"))

print("Recipient Files:")
print(os.listdir("/tmp/openpayments_recipient"))

General Payment Files:
['OP_REMOVED_DELETED_PGYR2023_P01302025_01212025.csv', 'OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv', 'OP_PGYR2023_README_P01302025.txt', 'OP_DTL_OWNRSHP_PGYR2023_P01302025_01212025.csv', 'OP_DTL_RSRCH_PGYR2023_P01302025_01212025.csv']
Recipient Files:
['OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv', 'OP_CVRD_RCPNT_PRFL_SPLMTL_README_P01302025.txt']


In [0]:
import shutil
import os

# Make sure destination exists
os.makedirs("/dbfs/FileStore/tables/", exist_ok=True)

# Copy general file to FileStore
shutil.copy("/tmp/openpayments_general/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv", "/dbfs/FileStore/tables/general.csv")

# Copy recipient file
shutil.copy("/tmp/openpayments_recipient/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv", "/dbfs/FileStore/tables/recipient.csv")

Out[5]: '/dbfs/FileStore/tables/recipient.csv'

In [0]:
#dbfs file list
import os

print("Files in /dbfs/FileStore/tables/:")
print(os.listdir("/dbfs/FileStore/tables/"))

Files in /dbfs/FileStore/tables/:
['recipient.csv', 'general.csv']


In [0]:
 # Load files using 'file'
general_df = spark.read.csv("file:/dbfs/FileStore/tables/general.csv", header=True, inferSchema=True)
recipient_df = spark.read.csv("file:/dbfs/FileStore/tables/recipient.csv", header=True, inferSchema=True)

# Show previews
print("General Payments Preview:")
general_df.show(5)

print("Recipient Profiles Preview:")
recipient_df.show(5)

General Payments Preview:
+-----------+----------------------+---------------------+--------------------+----------------------+----------------------------+---------------------+----------------------------+-----------------------------+---------------------------+-----------------------------+-----------------------------------------------+-----------------------------------------------+---------------+---------------+------------------+-----------------+------------------+---------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------------+-------------------------------------+-------------------------------------+------

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Ensure Spark session is created (if not already running)
spark = SparkSession.builder.appName("OpenPaymentsAnalysis").getOrCreate()

# 1. Nature of Payments with reimbursement amounts greater than $1,000 ordered by count
nature_high_reimb = (
    general_df.filter(F.col("Total_Amount_of_Payment_USDollars") > 1000)
    .groupby("Nature_of_Payment_or_Transfer_of_Value")
    .agg(F.count("*").alias("count"))
    .orderBy(F.col("count").desc())
)

print("Nature of Payments with reimbursement > $1,000 (ordered by count):")
nature_high_reimb.show(10)


Nature of Payments with reimbursement > $1,000 (ordered by count):
+--------------------------------------+------+
|Nature_of_Payment_or_Transfer_of_Value| count|
+--------------------------------------+------+
|                  Compensation for ...|164092|
|                        Consulting Fee|105228|
|                    Travel and Lodging| 24738|
|                             Honoraria| 13750|
|                             Education| 13376|
|                    Royalty or License| 11534|
|                  Compensation for ...|  8658|
|                                 Grant|  4922|
|                  Space rental or f...|  4917|
|                  Long term medical...|  2930|
+--------------------------------------+------+
only showing top 10 rows



In [0]:
#2. Top ten Nature of Payments by count
top_nature_by_count = (
    general_df.groupby("Nature_of_Payment_or_Transfer_of_Value")
    .count()
    .orderBy(F.col("count").desc())
    .limit(10)
)

print("Top 10 Nature of Payments by count:")
top_nature_by_count.show(10)

Top 10 Nature of Payments by count:
+--------------------------------------+--------+
|Nature_of_Payment_or_Transfer_of_Value|   count|
+--------------------------------------+--------+
|                     Food and Beverage|13378464|
|                    Travel and Lodging|  545086|
|                  Compensation for ...|  236628|
|                        Consulting Fee|  170630|
|                             Education|  161078|
|                                  Gift|   31786|
|                             Honoraria|   20232|
|                    Royalty or License|   15865|
|                  Compensation for ...|   12234|
|                         Entertainment|    7967|
+--------------------------------------+--------+



In [0]:
#3. Top ten Nature of Payments by total amount
top_nature_by_total = (
    general_df.groupby("Nature_of_Payment_or_Transfer_of_Value")
    .agg(F.sum("Total_Amount_of_Payment_USDollars").alias("Total_Payment"))
    .orderBy(F.col("Total_Payment").desc())
    .limit(10)
)

print("Top 10 Nature of Payments by total amount:")
top_nature_by_total.show(10)

Top 10 Nature of Payments by total amount:
+--------------------------------------+--------------------+
|Nature_of_Payment_or_Transfer_of_Value|       Total_Payment|
+--------------------------------------+--------------------+
|                    Royalty or License|     1.19217456302E9|
|                  Compensation for ...| 5.946326876500002E8|
|                        Consulting Fee| 5.148558758999996E8|
|                     Food and Beverage| 3.744878240099897E8|
|                    Travel and Lodging|1.7954842378000867E8|
|                                 Grant|      1.1188856182E8|
|                          Acquisitions| 7.192577675999999E7|
|                             Education| 6.469532594000257E7|
|                             Honoraria| 5.585182388999997E7|
|                  Long term medical...|       3.009879195E7|
+--------------------------------------+--------------------+



In [0]:
# 4. Top ten physician specialties by total amount
top_specialties_by_total = (
    general_df.join(
        recipient_df,
        general_df["Covered_Recipient_NPI"] == recipient_df["Covered_Recipient_NPI"],
        "left"
    )
    .groupby("Covered_Recipient_Profile_Primary_Specialty")
    .agg(F.sum("Total_Amount_of_Payment_USDollars").alias("Total_Payment"))
    .orderBy(F.col("Total_Payment").desc())
    .limit(10)
)

print("Top 10 Physician Specialties by Total Amount:")
top_specialties_by_total.show(10)

Top 10 Physician Specialties by Total Amount:
+-------------------------------------------+--------------------+
|Covered_Recipient_Profile_Primary_Specialty|       Total_Payment|
+-------------------------------------------+--------------------+
|                                       null| 8.712047840900062E8|
|                       Allopathic & Oste...|3.7775549672999775E8|
|                       Allopathic & Oste...| 8.680715036999996E7|
|                       Allopathic & Oste...| 8.587046903000017E7|
|                       Allopathic & Oste...| 8.076589144000049E7|
|                       Allopathic & Oste...| 7.277526617999996E7|
|                       Allopathic & Oste...| 6.576093498000079E7|
|                       Allopathic & Oste...| 6.569819551000166E7|
|                       Allopathic & Oste...|6.5481500610000595E7|
|                       Allopathic & Oste...| 6.305718157000022E7|
+-------------------------------------------+--------------------+



In [0]:
# 5. Top ten physicians by total amount
top_physicians_by_total = (
    general_df.groupby("Covered_Recipient_First_Name", "Covered_Recipient_Last_Name")
    .agg(F.sum("Total_Amount_of_Payment_USDollars").alias("Total_Payment"))
    .orderBy(F.col("Total_Payment").desc())
    .limit(10)
)

print("Top 10 Physicians by Total Amount:")
top_physicians_by_total.show(10)

Top 10 Physicians by Total Amount:
+----------------------------+---------------------------+-------------------+
|Covered_Recipient_First_Name|Covered_Recipient_Last_Name|      Total_Payment|
+----------------------------+---------------------------+-------------------+
|                        null|                       null|7.933900857900002E8|
|                     STEPHEN|                   BURKHART|      3.392202493E7|
|                     WILLIAM|                     BINDER|      2.943437497E7|
|                       KEVIN|                      FOLEY|       1.73059378E7|
|                        IVAN|                     OSORIO|      1.606551551E7|
|                      GEORGE|                    MAXWELL|      1.160032024E7|
|                      ROBERT|                      BOOTH|         8459167.19|
|                        NEAL|                 ELATTRACHE|          7810628.2|
|                       AARON|                  ROSENBERG|  6883627.290000001|
|                