In [1]:
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: C:\Users\ajl0618\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
import pandas as pd
import pyodbc
import numpy as np
from datetime import date

In [3]:
spark = SparkSession.builder.appName("PBPPlanBuilder").getOrCreate()

In [4]:
#system parameters\
PBP_SOURCE_FOLDER = 'PBP_Benefits_2025/'
DR_TARGET_FOLDER = 'PBP_Benefits_2025_Results/'

In [5]:
# year parameter for CMS

PLAN_YEAR = 2025

In [6]:
# load all required files
def load_csv(csv_file_path):
    return spark.read.format("csv") \
    .option("delimiter", "\t") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(csv_file_path)

# drop views from memory if the view exist
for t in spark.catalog.listTables():
    spark.catalog.dropTempView(t.name)

df_pbp_section_A = load_csv(PBP_SOURCE_FOLDER + 'pbp_Section_A.txt')
df_pbp_section_A.createTempView('pbp_section_A')

df_pbp_mrx_tier = load_csv(PBP_SOURCE_FOLDER + 'pbp_mrx_tier.txt')
df_pbp_mrx_tier.createTempView('pbp_mrx_tier')

In [7]:
#List of utility functions
def write_to_csv_file(df, file_name):
    pandas_df = df.toPandas()
    pandas_df.to_csv(DR_TARGET_FOLDER + file_name + '.csv', index=False)

def read_pd_from_csv_file(file_name):
    return pd.read_csv(DR_TARGET_FOLDER + file_name + '.csv')

def write_pd_to_csv(df, file_name):
    df.to_csv(DR_TARGET_FOLDER + file_name + '.csv', index=False)

def convert_to_int(field, null_value):
    if field is None:
        return null_value
    return int(field)

def convert_to_currency(float_field):
    return '${:,.2f}'.format(float_field)

def convert_to_currency_no_decimal(float_field):
    return '${:,.0f}'.format(float_field)

def drop_pbp_mrx_columns(df):
	pbp_mrx_columns = []
	for column_name in df.columns:
		if column_name.lower().startswith('pbp_') or column_name.lower().startswith('mrx_'):
			pbp_mrx_columns.append(column_name)
	df = df.drop(pbp_mrx_columns, axis=1)
	return df

def drop_pbp_mrx_columns(df):
	pbp_mrx_columns = []
	for column_name in df.columns:
		if column_name.lower().startswith('pbp_') or column_name.lower().startswith('mrx_'):
			pbp_mrx_columns.append(column_name)
	df = df.drop(pbp_mrx_columns, axis=1)
	return df

def get_medicare_site_url(qid):
    contractid = qid[:5]
    planid = qid[5:8]
    segmentid = qid[8:]
    return f'https://www.medicare.gov/plan-compare/#/plan-details/{PLAN_YEAR}-{contractid}-{planid}-{int(segmentid)}?year={PLAN_YEAR}&lang=en#benefits'


In [9]:
query = '''
select 
    a.pbp_a_hnumber as ContractID,	
	a.pbp_a_plan_identifier as PlanID, 	
	a.segment_id as SegmentID, 
    10 as COVERAGE_LEVEL,
    30 as DAYS_SUPPLY,
    mrx_tier_id as TIER,
    mrx_tier_label_list as TIER_DESC,
    mrx_tier_rstd_copay_1m, 
    mrx_tier_rstd_coins_1m,
    mrx_tier_rsstd_copay_1m,
    mrx_tier_rsstd_coins_1m,
    mrx_tier_rspfd_copay_1m,
    mrx_tier_rspfd_coins_1m
    
	from pbp_mrx_tier m 
	inner join pbp_Section_A a on m.bid_id = a.bid_id
where CAST(a.pbp_a_plan_identifier AS INT) < 800 and cast(pbp_a_eghp_yn as int) = 2  
'''

df_rx_tier_benefits = spark.sql(query)
write_to_csv_file(df_rx_tier_benefits, 'RxTierBenefits_DataSource')
df_rx_tier_benefits = read_pd_from_csv_file('RxTierBenefits_DataSource')
df_rx_tier_benefits['PlanYear'] = PLAN_YEAR
    
from PBP_2025_Benefit_Text import  Plan # Logic implemented in Benefit Module

def get_pref_cost_type(x):
    if not pd.isna(x.mrx_tier_rspfd_copay_1m):
        return 10
    if not pd.isna( x.mrx_tier_rspfd_coins_1m):
        return 20
    return 0

def get_pref_copay_amt(x):
    if not pd.isna(x.mrx_tier_rspfd_copay_1m):
        return x.mrx_tier_rspfd_copay_1m
    if not pd.isna( x.mrx_tier_rspfd_coins_1m):
        return  x.mrx_tier_rspfd_coins_1m / 100
    return 0

def get_std_cost_type(x):
    if not pd.isna(x.mrx_tier_rstd_copay_1m) or not pd.isna(x.mrx_tier_rsstd_copay_1m):
        return 10
    if not pd.isna( x.mrx_tier_rstd_coins_1m) or not pd.isna(x.mrx_tier_rsstd_coins_1m):
        return 20
    return 0

def get_std_copay_amt(x):
    if not pd.isna(x.mrx_tier_rstd_copay_1m):
        return x.mrx_tier_rstd_copay_1m
    if not pd.isna(x.mrx_tier_rsstd_copay_1m):
        return x.mrx_tier_rsstd_copay_1m
    if not pd.isna( x.mrx_tier_rstd_coins_1m):
        return  x.mrx_tier_rstd_coins_1m / 100
    if not pd.isna( x.mrx_tier_rsstd_coins_1m):
        return  x.mrx_tier_rsstd_coins_1m / 100
    return 0

df_rx_tier_benefits['QID'] = df_rx_tier_benefits.apply(lambda x:Plan.get_QID(x), axis=1)

df_rx_tier_benefits['COST_TYPE_PreferredRetail'] = df_rx_tier_benefits.apply(lambda x: get_pref_cost_type(x), axis=1)
df_rx_tier_benefits['COST_AMT_PreferredRetail'] = df_rx_tier_benefits.apply(lambda x: get_pref_copay_amt(x), axis=1)
df_rx_tier_benefits['COST_TYPE_Retail'] = df_rx_tier_benefits.apply(lambda x: get_std_cost_type(x), axis=1)
df_rx_tier_benefits['COST_AMT_Retail'] = df_rx_tier_benefits.apply(lambda x: get_std_copay_amt(x), axis=1)
df_rx_tier_benefits = drop_pbp_mrx_columns(df_rx_tier_benefits)
write_pd_to_csv(df_rx_tier_benefits, 'RxTierBenefits')

: 