**Name:** Bhavyasai Chinchugalla

High Performance Computing - 07

**Week 07 - Spark Application**

**Banner ID:** 001321696

### Loads Files Into Cluster

In [None]:
%fs ls /FileStore/tables/

In [None]:
dbutils.fs.cp(
    "dbfs:/FileStore/tables/PGYR2023_P01302025_01212025-5.zip",
    "file:/databricks/driver/data.zip",
    True)

In [None]:
import zipfile

zip_path = "/databricks/driver/data.zip"
extract_path = "/dbfs/FileStore/tables/unzipped/"

try:
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        print("Contents of ZIP:")
        print(zip_ref.namelist())
        zip_ref.extractall(extract_path)
    print("✅ Unzipping complete.")
except Exception as e:
    print("❌ Error while unzipping:", e)

In [None]:
%fs ls /FileStore/tables/

In [None]:
dbutils.fs.cp("dbfs:/FileStore/tables/PGYR2023_P01302025_01212025-5.zip", "file:/tmp/data.zip", True)

In [None]:
%sh
unzip -o /tmp/data.zip -d /tmp/unzipped

In [None]:
%sh
cp /tmp/unzipped/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv /dbfs/FileStore/tables/

In [None]:
%sh
ls -lh /tmp/unzipped/

In [None]:
%sh
echo "Creating sample_general.csv..."
head -n 10001 /tmp/unzipped/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv > /dbfs/FileStore/tables/sample_general.csv
echo "Done!"

In [None]:
df_sample = spark.read.option("header", "true").option("inferSchema", "true").csv("file:/tmp/unzipped/OP_DTL_GNRL_PGYR2023_P01302025_01212025.csv").limit(10000)
df_sample.cache()
display(df_sample)

### Load and Extract Covered Recipient File

In [None]:
dbutils.fs.cp(
    "dbfs:/FileStore/tables/PHPRFL_P01302025_01212025.zip",
    "file:/databricks/driver/recipient.zip",
    True)

In [None]:
import zipfile, os

zip_path = "/databricks/driver/recipient.zip"
extract_path_local = "/databricks/driver/recipient_unzipped"
extract_path_dbfs = "/dbfs/FileStore/tables/recipient_unzipped"
os.makedirs(extract_path_local, exist_ok=True)

try:
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.printdir()
        zip_ref.extractall(extract_path_local)
        print("\n✅ Extraction to local driver complete.")
    os.makedirs(extract_path_dbfs, exist_ok=True)
    for file_name in os.listdir(extract_path_local):
        src = os.path.join(extract_path_local, file_name)
        dst = os.path.join(extract_path_dbfs, file_name)
        dbutils.fs.cp(f"file:{src}", f"dbfs:/FileStore/tables/recipient_unzipped/{file_name}", True)
        print(f"✅ Copied: {file_name}")
except Exception as e:
    print("❌ Error while unzipping or copying:", e)

In [None]:
%fs ls /FileStore/tables/recipient_unzipped/

In [None]:
recipient_df = spark.read.option("header", "true").option("inferSchema", "true").csv(
    "dbfs:/FileStore/tables/recipient_unzipped/OP_CVRD_RCPNT_PRFL_SPLMTL_P01302025_01212025.csv")
display(recipient_df)

In [None]:
joined_df = df_sample.join(
    recipient_df,
    on="Covered_Recipient_Profile_ID",
    how="inner"
)
joined_df.cache()
display(joined_df.limit(10))

### 1. What is the Nature of Payments > $1,000 ordered by count?

In [None]:
from pyspark.sql.functions import col, count as _count

analysis_1 = (
    joined_df.filter(col("Total_Amount_of_Payment_USDollars") > 1000)
    .groupBy("Nature_of_Payment_or_Transfer_of_Value")
    .agg(_count("*").alias("count"))
    .orderBy(col("count").desc())
)
display(analysis_1)

### 2. Top 10 Nature of Payments by count

In [None]:
from pyspark.sql.functions import col, count as _count

analysis_2 = (
    joined_df.groupBy("Nature_of_Payment_or_Transfer_of_Value")
    .agg(_count("*").alias("count"))
    .orderBy(col("count").desc())
    .limit(10)
)
display(analysis_2)

### 3. Top 10 Nature of Payments by total amount

In [None]:
from pyspark.sql.functions import sum as _sum, col

analysis_3 = (
    joined_df.groupBy("Nature_of_Payment_or_Transfer_of_Value")
    .agg(_sum("Total_Amount_of_Payment_USDollars").alias("total_amount"))
    .orderBy(col("total_amount").desc())
    .limit(10)
)
display(analysis_3)

### 4. Top 10 Physician Specialties by Total Amount

In [None]:
analysis_4 = (
    joined_df.groupBy("Covered_Recipient_Specialty_1")
    .agg(_sum("Total_Amount_of_Payment_USDollars").alias("total_amount"))
    .orderBy(col("total_amount").desc())
    .limit(10)
)
display(analysis_4)

### 5. Top 10 Physicians by Total Amount

In [None]:
from pyspark.sql.functions import sum as _sum, col, concat_ws

joined_df = joined_df.withColumn(
    "Physician_Name",
    concat_ws(" ", col("Covered_Recipient_First_Name"), col("Covered_Recipient_Last_Name"))
)

analysis_5 = (
    joined_df.groupBy("Physician_Name")
    .agg(_sum("Total_Amount_of_Payment_USDollars").alias("total_amount"))
    .orderBy(col("total_amount").desc())
    .limit(10)
)
display(analysis_5)

## Summary
This notebook explores the 2023 CMS Open Payments dataset, performing five key analyses using PySpark:
- Nature of payments > $1,000
- Top 10 payment types by count and amount
- Top specialties and physicians by amount
The analysis offers insight into how medical professionals interact with industry payments.
