In [1]:
import pandas as pd
import numpy as np
import requests
import json
import boto3
import io
from io import BytesIO, StringIO
from dags.common.credentials.secrets import *

def read_chunks(file_path, size):
    """
    Read staging table in chunks because it has many rows. Returns a concatenated dataframe.
    """
    data_chunks = []
    chunk_counter = 1
    print(f"Reading {file_path} in chunks...")
    for chunk in pd.read_csv(file_path, chunksize=size, low_memory=False):
        print(f"Reading chunk # {str(chunk_counter)}")
        data_chunks.append(chunk)
        chunk_counter += 1
    dataset = pd.concat(data_chunks)
    print('Read complete.')
    return dataset


def upload_dataframe_to_s3_csv(dataframe, bucket_name, s3_key):
    # Convert DataFrame to CSV string
    csv_buffer = StringIO()
    dataframe.to_csv(csv_buffer, index=False)
    s3 = boto3.client(
        's3',
        aws_access_key_id=s3_aws_access_key_id,
        aws_secret_access_key=s3_secret_access_key,
        region_name=aws_region)
    # Upload the CSV string as a file to S3
    s3.put_object(Body=csv_buffer.getvalue(), Bucket=bucket_name, Key=s3_key)

def get_s3_obj(bucket_name, file_path):
    s3 = boto3.client(
        's3',
        aws_access_key_id=s3_aws_access_key_id,
        aws_secret_access_key=s3_secret_access_key,
        region_name=aws_region)
    s3_object = s3.get_object(Bucket=bucket_name, Key=file_path)
    return s3_object


def read_chunks_bytes(s3_obj, size):
    """
    Read S3 Object in chunks because it has many rows. Returns a concatenated dataframe.
    """
    data_chunks = []
    chunk_counter = 1
    print(f"Reading S3 Object in chunks...")
    for chunk in pd.read_csv(io.BytesIO(s3_obj['Body'].read()), chunksize=size, low_memory=False):
        print(f"Reading byte chunk # {str(chunk_counter)}")
        data_chunks.append(chunk)
        chunk_counter += 1
    dataset = pd.concat(data_chunks)
    print('Read complete.')
    return dataset

In [2]:
# General Payments Ingest

# General Payments File Path
payments_2020_path = "https://download.cms.gov/openpayments/PGYR20_P012023/OP_DTL_GNRL_PGYR2020_P01202023.csv"
payments_2021_path ="https://download.cms.gov/openpayments/PGYR21_P012023/OP_DTL_GNRL_PGYR2021_P01202023.csv"

# Read Payments Data for 2020 and 2021
payments_2020_df = read_chunks(payments_2020_path, 10000)
payments_2021_df = read_chunks(payments_2021_path, 10000)

Reading https://download.cms.gov/openpayments/PGYR20_P012023/OP_DTL_GNRL_PGYR2020_P01202023.csv in chunks...
Reading chunk # 1
Reading chunk # 2
Reading chunk # 3
Reading chunk # 4
Reading chunk # 5
Reading chunk # 6
Reading chunk # 7
Reading chunk # 8
Reading chunk # 9
Reading chunk # 10
Reading chunk # 11
Reading chunk # 12
Reading chunk # 13
Reading chunk # 14
Reading chunk # 15
Reading chunk # 16
Reading chunk # 17
Reading chunk # 18
Reading chunk # 19
Reading chunk # 20
Reading chunk # 21
Reading chunk # 22
Reading chunk # 23
Reading chunk # 24
Reading chunk # 25
Reading chunk # 26
Reading chunk # 27
Reading chunk # 28
Reading chunk # 29
Reading chunk # 30
Reading chunk # 31
Reading chunk # 32
Reading chunk # 33
Reading chunk # 34
Reading chunk # 35
Reading chunk # 36
Reading chunk # 37
Reading chunk # 38
Reading chunk # 39
Reading chunk # 40
Reading chunk # 41
Reading chunk # 42
Reading chunk # 43
Reading chunk # 44
Reading chunk # 45
Reading chunk # 46
Reading chunk # 47
Reading

Reading chunk # 411
Reading chunk # 412
Reading chunk # 413
Reading chunk # 414
Reading chunk # 415
Reading chunk # 416
Reading chunk # 417
Reading chunk # 418
Reading chunk # 419
Reading chunk # 420
Reading chunk # 421
Reading chunk # 422
Reading chunk # 423
Reading chunk # 424
Reading chunk # 425
Reading chunk # 426
Reading chunk # 427
Reading chunk # 428
Reading chunk # 429
Reading chunk # 430
Reading chunk # 431
Reading chunk # 432
Reading chunk # 433
Reading chunk # 434
Reading chunk # 435
Reading chunk # 436
Reading chunk # 437
Reading chunk # 438
Reading chunk # 439
Reading chunk # 440
Reading chunk # 441
Reading chunk # 442
Reading chunk # 443
Reading chunk # 444
Reading chunk # 445
Reading chunk # 446
Reading chunk # 447
Reading chunk # 448
Reading chunk # 449
Reading chunk # 450
Reading chunk # 451
Reading chunk # 452
Reading chunk # 453
Reading chunk # 454
Reading chunk # 455
Reading chunk # 456
Reading chunk # 457
Reading chunk # 458
Reading chunk # 459
Reading chunk # 460


Reading chunk # 237
Reading chunk # 238
Reading chunk # 239
Reading chunk # 240
Reading chunk # 241
Reading chunk # 242
Reading chunk # 243
Reading chunk # 244
Reading chunk # 245
Reading chunk # 246
Reading chunk # 247
Reading chunk # 248
Reading chunk # 249
Reading chunk # 250
Reading chunk # 251
Reading chunk # 252
Reading chunk # 253
Reading chunk # 254
Reading chunk # 255
Reading chunk # 256
Reading chunk # 257
Reading chunk # 258
Reading chunk # 259
Reading chunk # 260
Reading chunk # 261
Reading chunk # 262
Reading chunk # 263
Reading chunk # 264
Reading chunk # 265
Reading chunk # 266
Reading chunk # 267
Reading chunk # 268
Reading chunk # 269
Reading chunk # 270
Reading chunk # 271
Reading chunk # 272
Reading chunk # 273
Reading chunk # 274
Reading chunk # 275
Reading chunk # 276
Reading chunk # 277
Reading chunk # 278
Reading chunk # 279
Reading chunk # 280
Reading chunk # 281
Reading chunk # 282
Reading chunk # 283
Reading chunk # 284
Reading chunk # 285
Reading chunk # 286


Reading chunk # 647
Reading chunk # 648
Reading chunk # 649
Reading chunk # 650
Reading chunk # 651
Reading chunk # 652
Reading chunk # 653
Reading chunk # 654
Reading chunk # 655
Reading chunk # 656
Reading chunk # 657
Reading chunk # 658
Reading chunk # 659
Reading chunk # 660
Reading chunk # 661
Reading chunk # 662
Reading chunk # 663
Reading chunk # 664
Reading chunk # 665
Reading chunk # 666
Reading chunk # 667
Reading chunk # 668
Reading chunk # 669
Reading chunk # 670
Reading chunk # 671
Reading chunk # 672
Reading chunk # 673
Reading chunk # 674
Reading chunk # 675
Reading chunk # 676
Reading chunk # 677
Reading chunk # 678
Reading chunk # 679
Reading chunk # 680
Reading chunk # 681
Reading chunk # 682
Reading chunk # 683
Reading chunk # 684
Reading chunk # 685
Reading chunk # 686
Reading chunk # 687
Reading chunk # 688
Reading chunk # 689
Reading chunk # 690
Reading chunk # 691
Reading chunk # 692
Reading chunk # 693
Reading chunk # 694
Reading chunk # 695
Reading chunk # 696


Reading chunk # 1055
Reading chunk # 1056
Reading chunk # 1057
Reading chunk # 1058
Reading chunk # 1059
Reading chunk # 1060
Reading chunk # 1061
Reading chunk # 1062
Reading chunk # 1063
Reading chunk # 1064
Reading chunk # 1065
Reading chunk # 1066
Reading chunk # 1067
Reading chunk # 1068
Reading chunk # 1069
Reading chunk # 1070
Reading chunk # 1071
Reading chunk # 1072
Reading chunk # 1073
Reading chunk # 1074
Reading chunk # 1075
Reading chunk # 1076
Reading chunk # 1077
Reading chunk # 1078
Reading chunk # 1079
Reading chunk # 1080
Reading chunk # 1081
Reading chunk # 1082
Reading chunk # 1083
Reading chunk # 1084
Reading chunk # 1085
Reading chunk # 1086
Reading chunk # 1087
Reading chunk # 1088
Reading chunk # 1089
Reading chunk # 1090
Reading chunk # 1091
Reading chunk # 1092
Reading chunk # 1093
Reading chunk # 1094
Reading chunk # 1095
Reading chunk # 1096
Reading chunk # 1097
Reading chunk # 1098
Reading chunk # 1099
Reading chunk # 1100
Reading chunk # 1101
Reading chunk

In [3]:
keep_cols = ['Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID', 
                              'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Name',
                              'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_State',
                              'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Country',
                                'Change_Type','Record_ID','Covered_Recipient_NPI', 'Covered_Recipient_Profile_ID','Covered_Recipient_Type',
                          'Covered_Recipient_Primary_Type_1','Covered_Recipient_Specialty_1',
                          'Total_Amount_of_Payment_USDollars','Date_of_Payment','Program_Year','Payment_Publication_Date',
                          'Number_of_Payments_Included_in_Total_Amount',
                          'Form_of_Payment_or_Transfer_of_Value','Nature_of_Payment_or_Transfer_of_Value',
                          'Third_Party_Payment_Recipient_Indicator',
                           'Name_of_Third_Party_Entity_Receiving_Payment_or_Transfer_of_Value','Charity_Indicator', 
                          'Third_Party_Equals_Covered_Recipient_Indicator',
                          'Delay_in_Publication_Indicator', 'Related_Product_Indicator', 'City_of_Travel','State_of_Travel', 
                          'Country_of_Travel','Dispute_Status_for_Publication', 'Physician_Ownership_Indicator', 
                          'Teaching_Hospital_CCN','Teaching_Hospital_ID', 'Teaching_Hospital_Name']

payments_2020_df = payments_2020_df[keep_cols]
payments_2021_df = payments_2021_df[keep_cols]

# upload payments file to S3
payments_2020_file_name = f"{source_s3_key}{general_payments_2020}"
# upload_dataframe_to_s3_csv(payments_2020_df, bucket_name, payments_2020_file_name)

payments_2021_file_name = f"{source_s3_key}{general_payments_2021}"
# upload_dataframe_to_s3_csv(payments_2021_df, bucket_name, payments_2021_file_name)

In [4]:
###################################################################
# Function to transform general payment data to get multiple tables
###################################################################

# General Payment dim table:  

def transform_dim__gp_manufacturer_gpo(df):    
    # Creating a sub dataframe to contain values related to dimension table for 2020 general payments
    dim_gp_manufacturer_gpo = df[['Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID', 
                              'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Name',
                              'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_State',
                              'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Country']]

    # Check for nulls
    null_values = dim_gp_manufacturer_gpo.isnull().sum()
    null_values

    # Check for duplicates
    duplicate_rows=dim_gp_manufacturer_gpo[dim_gp_manufacturer_gpo.duplicated(subset=['Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID', 
                          'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Name',
                          'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_State',
                          'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Country'], keep=False)]
    duplicate_rows
    
    # remove duplicates
    dim_gp_manufacturer_gpo = dim_gp_manufacturer_gpo.drop_duplicates()
    
    # Check for duplicates again
    duplicate = dim_gp_manufacturer_gpo[dim_gp_manufacturer_gpo.duplicated()]
    duplicate
    
    # Renaming columns for consistency 
    dim_gp_manufacturer_gpo.rename(columns = {'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID':'Manufacturer_ID', 
                              'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Name': 'Manufacturer_Name',
                              'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_State':'Manufacturer_State',
                              'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_Country': 'Manufacturer_Country'}, inplace = True)

    # Check for data type errors
    dim_gp_manufacturer_gpo.info()
     
    # Handle data type errors
    
    return dim_gp_manufacturer_gpo

# General Payment fact table:  
def transform_fact_general_payment(df):    
        
    # Creating a sub dataframe to contain values related to fact table for 2020 general payments

    fact_general_payment = df[['Change_Type','Record_ID','Covered_Recipient_NPI', 'Covered_Recipient_Profile_ID','Covered_Recipient_Type',
                          'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID',
                          'Covered_Recipient_Primary_Type_1','Covered_Recipient_Specialty_1',
                          'Total_Amount_of_Payment_USDollars','Date_of_Payment','Program_Year','Payment_Publication_Date',
                          'Number_of_Payments_Included_in_Total_Amount',
                          'Form_of_Payment_or_Transfer_of_Value','Nature_of_Payment_or_Transfer_of_Value',
                          'Third_Party_Payment_Recipient_Indicator', 
                          'Delay_in_Publication_Indicator', 'Related_Product_Indicator','Dispute_Status_for_Publication', 'Physician_Ownership_Indicator']]

    
    # Checking for data type errors   
    # Updating Date data types
    fact_general_payment['Date_of_Payment'] = pd.to_datetime(fact_general_payment["Date_of_Payment"])
    fact_general_payment['Payment_Publication_Date'] = pd.to_datetime(fact_general_payment["Payment_Publication_Date"])
    
    # Update any other data type error
    
    # Check for nulls
    null_values = fact_general_payment.isnull().sum()
    print(null_values)
    
    # drop null values/subset
    fact_general_payment.dropna(subset = ['Covered_Recipient_Profile_ID'], inplace=True)
    fact_general_payment.dropna(subset = ['Covered_Recipient_NPI'], inplace=True)
    
    # Replace Nulls with NULL
    fact_general_payment["Covered_Recipient_Primary_Type_1"].fillna("NULL", inplace = True)
    fact_general_payment["Covered_Recipient_Specialty_1"].fillna("NULL", inplace = True)

    # Check for duplicates
    duplicate_rows=fact_general_payment[fact_general_payment.duplicated(subset=['Change_Type','Record_ID','Covered_Recipient_NPI', 'Covered_Recipient_Profile_ID','Covered_Recipient_Type',
                          'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID',
                          'Covered_Recipient_Primary_Type_1','Covered_Recipient_Specialty_1',
                          'Total_Amount_of_Payment_USDollars','Date_of_Payment','Program_Year','Payment_Publication_Date',
                          'Number_of_Payments_Included_in_Total_Amount',
                          'Form_of_Payment_or_Transfer_of_Value','Nature_of_Payment_or_Transfer_of_Value',
                          'Third_Party_Payment_Recipient_Indicator', 
                          'Delay_in_Publication_Indicator', 'Related_Product_Indicator','Dispute_Status_for_Publication', 'Physician_Ownership_Indicator'], keep=False)]
    print(duplicate_rows)
    
    # remove duplicates
    fact_general_payment = fact_general_payment.drop_duplicates()
    
    # Check for duplicates again
    duplicate = fact_general_payment[fact_general_payment.duplicated()]
    duplicate
    
    # Renaming columns for consistency 
    fact_general_payment.rename(columns = {'Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID':'Manufacturer_ID'}, inplace = True)
    
    dim_gpo = transform_dim__gp_manufacturer_gpo(df)                                                      
                                       
    return fact_general_payment, dim_gpo


In [5]:
payments_2020_df, payments_gpo_2020_df = transform_fact_general_payment(payments_2020_df)
payments_2021_df, payments_gpo_2021_df = transform_fact_general_payment(payments_2021_df)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


Change_Type                                                        0
Record_ID                                                          0
Covered_Recipient_NPI                                          35794
Covered_Recipient_Profile_ID                                   32406
Covered_Recipient_Type                                             0
Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID        0
Covered_Recipient_Primary_Type_1                               32406
Covered_Recipient_Specialty_1                                  32549
Total_Amount_of_Payment_USDollars                                  0
Date_of_Payment                                                    0
Program_Year                                                       0
Payment_Publication_Date                                           0
Number_of_Payments_Included_in_Total_Amount                        0
Form_of_Payment_or_Transfer_of_Value                               0
Nature_of_Payment_or_Transfer_of_V

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._update_inplace(new_data)


Empty DataFrame
Columns: [Change_Type, Record_ID, Covered_Recipient_NPI, Covered_Recipient_Profile_ID, Covered_Recipient_Type, Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID, Covered_Recipient_Primary_Type_1, Covered_Recipient_Specialty_1, Total_Amount_of_Payment_USDollars, Date_of_Payment, Program_Year, Payment_Publication_Date, Number_of_Payments_Included_in_Total_Amount, Form_of_Payment_or_Transfer_of_Value, Nature_of_Payment_or_Transfer_of_Value, Third_Party_Payment_Recipient_Indicator, Delay_in_Publication_Indicator, Related_Product_Indicator, Dispute_Status_for_Publication, Physician_Ownership_Indicator]
Index: []
<class 'pandas.core.frame.DataFrame'>
Int64Index: 1676 entries, 0 to 5822989
Data columns (total 4 columns):
 #   Column                Non-Null Count  Dtype 
---  ------                --------------  ----- 
 0   Manufacturer_ID       1676 non-null   int64 
 1   Manufacturer_Name     1676 non-null   object
 2   Manufacturer_State    1496 non-null   object


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


Change_Type                                                          0
Record_ID                                                            0
Covered_Recipient_NPI                                            35420
Covered_Recipient_Profile_ID                                     30604
Covered_Recipient_Type                                               0
Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID          0
Covered_Recipient_Primary_Type_1                                 30604
Covered_Recipient_Specialty_1                                    30787
Total_Amount_of_Payment_USDollars                                    0
Date_of_Payment                                                      0
Program_Year                                                         0
Payment_Publication_Date                                             0
Number_of_Payments_Included_in_Total_Amount                          0
Form_of_Payment_or_Transfer_of_Value                                 0
Nature

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  self._update_inplace(new_data)


Empty DataFrame
Columns: [Change_Type, Record_ID, Covered_Recipient_NPI, Covered_Recipient_Profile_ID, Covered_Recipient_Type, Applicable_Manufacturer_or_Applicable_GPO_Making_Payment_ID, Covered_Recipient_Primary_Type_1, Covered_Recipient_Specialty_1, Total_Amount_of_Payment_USDollars, Date_of_Payment, Program_Year, Payment_Publication_Date, Number_of_Payments_Included_in_Total_Amount, Form_of_Payment_or_Transfer_of_Value, Nature_of_Payment_or_Transfer_of_Value, Third_Party_Payment_Recipient_Indicator, Delay_in_Publication_Indicator, Related_Product_Indicator, Dispute_Status_for_Publication, Physician_Ownership_Indicator]
Index: []
<class 'pandas.core.frame.DataFrame'>
Int64Index: 1656 entries, 0 to 11407767
Data columns (total 4 columns):
 #   Column                Non-Null Count  Dtype 
---  ------                --------------  ----- 
 0   Manufacturer_ID       1656 non-null   int64 
 1   Manufacturer_Name     1656 non-null   object
 2   Manufacturer_State    1471 non-null   object

In [10]:
payments_all_df = pd.concat([payments_2020_df, payments_2021_df])
gpo_all_df = pd.concat([payments_gpo_2020_df, payments_gpo_2021_df])

payments_all_df.columns = payments_all_df.columns.str.lower()
gpo_all_df.columns = gpo_all_df.columns.str.lower()

In [13]:
# upload payments file to S3
payments_all_file_name = f"{transform_s3_key}fact_general_payments.csv"
upload_dataframe_to_s3_csv(payments_all_df, bucket_name, payments_all_file_name)

gp_gpo_file_name = f"{transform_s3_key}general_dim_manufacture_gpo.csv"
upload_dataframe_to_s3_csv(gpo_all_df, bucket_name, gp_gpo_file_name)