# Week 07 - Spark Application

In [0]:
import zipfile
import os
import shutil
from pyspark.sql import functions as F


# File locations
file1 = "dbfs:/FileStore/tables/PGYR2023_P01302025_01212025.zip"
file2 = "dbfs:/FileStore/tables/PHPRFL_P01302025_01212025.zip"

In [0]:
display(dbutils.fs.ls("FileStore/tables/")) 


path,name,size,modificationTime
dbfs:/FileStore/tables/PGYR2023_P01302025_01212025.zip,PGYR2023_P01302025_01212025.zip,789005271,1742783148000
dbfs:/FileStore/tables/PHPRFL_P01302025_01212025.zip,PHPRFL_P01302025_01212025.zip,82966770,1742782606000


In [0]:
!mkdir -p /dbfs/FileStore/tables/PGYR2023_P01302025_01212025
!unzip /dbfs/FileStore/tables/PGYR2023_P01302025_01212025 -d /dbfs/FileStore/tables/PGYR2023_P01302025_01212025/

!mkdir -p /dbfs/FileStore/tables/PHPRFL_P01302025_01212025
!unzip /dbfs/FileStore/tables/PHPRFL_P01302025_01212025 -d /dbfs/FileStore/tables/PHPRFL_P01302025_01212025/


unzip:  cannot find or open /dbfs/FileStore/tables/PGYR2023_P01302025_01212025, /dbfs/FileStore/tables/PGYR2023_P01302025_01212025.zip or /dbfs/FileStore/tables/PGYR2023_P01302025_01212025.ZIP.
unzip:  cannot find or open /dbfs/FileStore/tables/PHPRFL_P01302025_01212025, /dbfs/FileStore/tables/PHPRFL_P01302025_01212025.zip or /dbfs/FileStore/tables/PHPRFL_P01302025_01212025.ZIP.


In [0]:
dbutils.fs.cp(file1, "file:/tmp/g.zip")
dbutils.fs.cp(file2, "file:/tmp/r.zip")

with zipfile.ZipFile("/tmp/g.zip", 'r') as zip_ref:
    zip_ref.extractall("/tmp/opG")

with zipfile.ZipFile("/tmp/r.zip", 'r') as zip_ref:
    zip_ref.extractall("/tmp/opR")


In [0]:
os.makedirs("/dbfs/FileStore/tables/", exist_ok=True)

shutil.copy("/tmp/opG/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv", "/dbfs/FileStore/tables/g.csv")

shutil.copy("/tmp/opR/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv", "/dbfs/FileStore/tables/r.csv")

gdf = spark.read.csv("file:/dbfs/FileStore/tables/g.csv", header=True, inferSchema=True)
df = spark.read.csv("file:/dbfs/FileStore/tables/r.csv", header=True, inferSchema=True)


## 1. What is the Nature of Payments with reimbursement amounts greater than $1,000 ordered by count?




In [0]:
# First, filter the records where the total payment amount is greater than 1,000 USD
FF = gdf.filter(F.col("Total_Amount_of_Payment_USDollars") > 1000)

# Next, group the filtered data by the "Nature_of_Payment_or_Transfer_of_Value" column
na = FF.groupby("Nature_of_Payment_or_Transfer_of_Value")

# Then, count the number of records for each group
co = na.count()

# After that, order the results by the "count" column in descending order
final1 = co.orderBy(F.col("count").desc())

# Finally, show 
final1.show(10)



+--------------------------------------+------+
|Nature_of_Payment_or_Transfer_of_Value| count|
+--------------------------------------+------+
|                  Compensation for ...|164092|
|                        Consulting Fee|105228|
|                    Travel and Lodging| 24738|
|                             Honoraria| 13750|
|                             Education| 13376|
|                    Royalty or License| 11534|
|                  Compensation for ...|  8658|
|                                 Grant|  4922|
|                  Space rental or f...|  4917|
|                  Long term medical...|  2930|
+--------------------------------------+------+
only showing top 10 rows



## 2. What are the top ten Nature of Payments by count?


In [0]:
# Group the data by "Nature_of_Payment_or_Transfer_of_Value"
nc = gdf.groupby("Nature_of_Payment_or_Transfer_of_Value")

# Count the number of records for each group
tc = nc.count()

# Order the counts in descending order
otc = tc.orderBy(F.col("count").desc())

# Limit the result to the top 10 entries
final2 = otc.limit(10)

# Show final
final2.show(10)



+--------------------------------------+--------+
|Nature_of_Payment_or_Transfer_of_Value|   count|
+--------------------------------------+--------+
|                     Food and Beverage|13378464|
|                    Travel and Lodging|  545086|
|                  Compensation for ...|  236628|
|                        Consulting Fee|  170630|
|                             Education|  161078|
|                                  Gift|   31786|
|                             Honoraria|   20232|
|                    Royalty or License|   15865|
|                  Compensation for ...|   12234|
|                         Entertainment|    7967|
+--------------------------------------+--------+



## 3. What are the top ten Nature of Payments by total amount?


In [0]:
# Group the data by "Nature_of_Payment_or_Transfer_of_Value" column
ng = gdf.groupby("Nature_of_Payment_or_Transfer_of_Value")

# Sum the "Total_Amount_of_Payment_USDollars" for each group
tp = ng.agg(F.sum("Total_Amount_of_Payment_USDollars").alias("Total_Payment"))

# Order the result by "Total_Payment" in descending order
on = tp.orderBy(F.col("Total_Payment").desc())

# Limit the result to the top 10 entries
final3 = on.limit(10)

# Show final
final3.show(10)



+--------------------------------------+--------------------+
|Nature_of_Payment_or_Transfer_of_Value|       Total_Payment|
+--------------------------------------+--------------------+
|                    Royalty or License|     1.19217456302E9|
|                  Compensation for ...| 5.946326876500002E8|
|                        Consulting Fee| 5.148558758999996E8|
|                     Food and Beverage| 3.744878240099897E8|
|                    Travel and Lodging|1.7954842378000867E8|
|                                 Grant|      1.1188856182E8|
|                          Acquisitions| 7.192577675999999E7|
|                             Education| 6.469532594000257E7|
|                             Honoraria| 5.585182388999997E7|
|                  Long term medical...|       3.009879195E7|
+--------------------------------------+--------------------+



## 4. What are the top ten physician specialties by total amount?

In [0]:
# Cast the Covered_Recipient_NPI columns to string in both dataframes to ensure the join works
gdf = gdf.withColumn("Covered_Recipient_NPI", F.col("Covered_Recipient_NPI").cast("string"))
df = df.withColumn("Covered_Recipient_NPI", F.col("Covered_Recipient_NPI").cast("string"))

# Perform the join between the two dataframes
jdf = gdf.join(df, gdf["Covered_Recipient_NPI"] == df["Covered_Recipient_NPI"], "left")

# Group by "Physician_Specialty" and aggregate the total amount of payment for each specialty
sg = jdf.groupby("Covered_Recipient_Specialty_1")  # Using "Covered_Recipient_Specialty_1" from df1

# Sum the "Total_Amount_of_Payment_USDollars" for each specialty
ss = sg.agg(F.sum("Total_Amount_of_Payment_USDollars").alias("Total_Payment"))

# Order the result by "Total_Payment" in descending order
so = ss.orderBy(F.col("Total_Payment").desc())

# Limit the result to the top 10 specialties
final4 = so.limit(10)

# Show the result
final4.show(10)


+-----------------------------+--------------------+
|Covered_Recipient_Specialty_1|       Total_Payment|
+-----------------------------+--------------------+
|                         null| 7.936674692299995E8|
|         Allopathic & Oste...|4.0345021308999807E8|
|         Allopathic & Oste...|1.3136300307000302E8|
|         Allopathic & Oste...| 8.979213626000012E7|
|         Allopathic & Oste...| 8.608847857000005E7|
|         Allopathic & Oste...|  8.32026477400006E7|
|         Allopathic & Oste...| 7.022084115000014E7|
|         Allopathic & Oste...|  6.94689412100008E7|
|         Allopathic & Oste...| 6.677283748999998E7|
|         Allopathic & Oste...| 6.329825407000052E7|
+-----------------------------+--------------------+



## 5. Who are the top ten physicians by total amount?

In [0]:
# Group the "general" dataframe by the physician's first and last names
ggp = gdf.groupby("Covered_Recipient_First_Name", "Covered_Recipient_Last_Name")

# Sum the "Total_Amount_of_Payment_USDollars" for each physician
pt = ggp.agg(F.sum("Total_Amount_of_Payment_USDollars").alias("Total_Payment"))

# Order the result by "Total_Payment" in descending order
op = pt.orderBy(F.col("Total_Payment").desc())

# Limit the result to the top 10 physicians
final5 = op.limit(10)

# Show final
final5.show(10)

+----------------------------+---------------------------+-------------------+
|Covered_Recipient_First_Name|Covered_Recipient_Last_Name|      Total_Payment|
+----------------------------+---------------------------+-------------------+
|                        null|                       null|7.933900857900002E8|
|                     STEPHEN|                   BURKHART|      3.392202493E7|
|                     WILLIAM|                     BINDER|      2.943437497E7|
|                       KEVIN|                      FOLEY|       1.73059378E7|
|                        IVAN|                     OSORIO|      1.606551551E7|
|                      GEORGE|                    MAXWELL|      1.160032024E7|
|                      ROBERT|                      BOOTH|         8459167.19|
|                        NEAL|                 ELATTRACHE|          7810628.2|
|                       AARON|                  ROSENBERG|  6883627.290000001|
|                       ROGER|                    JA