## Instantiate Spark Session

In [1]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
c = pyspark.SparkConf().setAppName("test_app").setMaster("local")
sc = pyspark.SparkContext(conf = c)
spark = SparkSession(sc)

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import to_date
from pyspark.sql import Window


## Load Data

In [3]:
data_sk = spark.read.csv("nphies.csv",inferSchema = True , header = True)

In [None]:
#data_sk.printSchema() ## Checks DataType 

## Changed all Date Columns to Date

In [4]:
columns_date_convert = ['Provider Contract Start Date',
 'Provider Contract End Date',
 'Policy Issue Date',
 'Policy Start Date',
 'Policy Expiry Date',
 'Received Date',
 'Treatment Date']

 #['Item Date','Settlement Date'] contains Mixed datatype (mm/dd/yyyy & strings)
 #['Claim Approval Date'] contains mm/dd/yyyy hh:mm

In [5]:
# Define a lambda function to convert date to "mm/dd/yyyy" format
convert_to_date_format = lambda col_name: to_date(col(col_name), "MM/dd/yyyy").alias(col_name)

# Apply the conversion to selected columns
converted_df = data_sk.select(*[convert_to_date_format(col_name) if col_name in columns_date_convert else col(col_name) for col_name in data_sk.columns])

In [7]:
#converted_df.columns

In [8]:
#converted_df.count()

## Business Requirements

In [9]:
# Selected Columns as Business Requirements 
selected_columns = ['Provider Code','Provider Name','Provider Class','Provider City','Customer Name','Policy Number','Policy Start Date','Policy Expiry Date','Member Code','Member Name','Member Relationship','Member National Id','Member DOB',
                    'Marital Status','Gender','Nationality','Mobile',
                    'Diagnosis','Diagnosis Code','Claim Number','Treatment Date','Service Type',
                    'Item Name','Price','Quantity','Prv Net Amount']

In [10]:
#len(selected_columns)

In [11]:
# Create Datafrome of selected columns
selected_df= converted_df.select(*selected_columns)

In [12]:
# Checking Diagnosis column
# selected_df.select( 'Diagnosis','Diagnosis Code').show(100)

In [14]:
# Drop Duplicates
Selected_columns_cleaned = selected_df.dropDuplicates()

### Checking for total Duplicated Records 

In [15]:
# raw = selected_df.count()
# cleaned = Selected_columns_cleaned.count()
# total_no_duplicates = raw - cleaned

In [16]:
# total_no_duplicates # There are 132,049 Duplicate Records

In [17]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

#Selected_columns_cleaned.select('Member DOB')

In [18]:
# Create Age Column
Selected_columns_cleaned = Selected_columns_cleaned.withColumn('Member DOB', to_date(col('Member DOB'), 'MM/dd/yyyy')) \
       .withColumn('Age', (datediff(col('Treatment Date'), col('Member DOB')) / 365).cast('int'))

In [19]:
# Selected_columns_cleaned.select('Age').show(5)

In [20]:
# Selected_columns_cleaned.count() # There are 2796425 Cleaned Records 

In [81]:
Selected_columns_cleaned.createOrReplaceTempView("table")

## Medication Abuse

In [14]:
#Using Spark SQL
Member_Abuse = spark.sql("""
SELECT `Member Code`, `Member Name`, `Item Name`, `Treatment Date`, Duplicate_Treatment_Date,
       DATEDIFF(Duplicate_Treatment_Date, `Treatment Date`) AS Waiting_period
FROM (
    SELECT *,
           LEAD(`Treatment Date`) OVER (PARTITION BY `Member Code`, `Item Name` ORDER BY `Treatment Date`) AS Duplicate_Treatment_Date
    FROM table
    WHERE `Service Type` = 'Pharmacy'
) AS subquery
WHERE DATEDIFF(Duplicate_Treatment_Date, `Treatment Date`) BETWEEN 1 AND 25;
""")

In [19]:
# Member_Abuse.show() # There are 66,509 Claims with member Abuse

## Splitting Claims

In [15]:
split_claims = spark.sql("""
SELECT 
        `Member Code`,
        `Provider Code`,
        COUNT(*) AS Number_of_claims
FROM(
SELECT
    *,
    CASE
        WHEN Date_diff IS NULL THEN 0
        WHEN Date_diff < 1 OR Date_diff > 30 THEN 0
        ELSE 1
    END AS Date_Flag
FROM (
    SELECT
        `Member Code`,
        `Provider Code`,
        `Treatment Date`,
        LEAD(`Treatment Date`) OVER (PARTITION BY `Member Code` ORDER BY `Treatment Date`) AS Next_date,
        DATEDIFF(LEAD(`Treatment Date`) OVER (PARTITION BY `Member Code` ORDER BY `Treatment Date`), `Treatment Date`) AS Date_diff
    FROM
        Table
) AS subquery
WHERE (CASE
        WHEN Date_diff IS NULL THEN 0
        WHEN Date_diff < 1 OR Date_diff > 30 THEN 0
        ELSE 1
    END) = 1
)
GROUP BY `Member Code`, `Provider Code`
HAVING COUNT(*) > 1
""")

In [188]:
 # split_claims.count() There are 56745 of the occurences

56745

## Member Abuse

In [16]:
MemberAbuse = spark.sql("""
    SELECT 
        `Member Code`, 
        `Provider Code`, 
        `Treatment Date`,
        Claims_Count
    FROM (
        SELECT 
            `Member Code`, 
            `Provider Code`, 
            `Treatment Date`,
            Claims_Count,
            DATEDIFF(`Treatment Date`, LAG(`Treatment Date`, 1) OVER (PARTITION BY `Member Code`, `Service Type` ORDER BY `Treatment Date`)) AS Date_Diff
        FROM (
            SELECT 
                `Member Code`, 
                `Provider Code`, 
                `Service Type`, 
                `Treatment Date`,
                COUNT(*) OVER (PARTITION BY `Member Code`, `Service Type` ORDER BY `Treatment Date`) AS Claims_Count
            FROM 
                table
        ) AS subquery
    ) AS filtered_data
    WHERE 
        Date_Diff <= 30
""")

In [24]:
MemberAbuse.count() 

1912345

In [26]:
#MemberAbuse.show(50)

## Different Area or City Claims 

In [17]:
CityClaims = spark.sql("""
    WITH rod_with_diff AS (
        SELECT *,
            DATEDIFF(`Treatment Date`, 
                     LAG(`Treatment Date`, 1) OVER (PARTITION BY `Member Code`, `Member Name`, `Diagnosis` ORDER BY `Treatment Date`)) AS date_diff,
            LAG(`Provider City`, 1) OVER (PARTITION BY `Member Code`, `Member Name`, `Diagnosis` ORDER BY `Treatment Date`) AS prev_provider_city
        FROM table
    )
    SELECT 
         `Member Code`,
         `Member Name`,
         `Diagnosis`,
         date_diff,
         `Treatment Date`,
         `Provider City`
    FROM rod_with_diff
    WHERE date_diff <= 7 AND `Provider City` != prev_provider_city
""")

In [155]:
#result_df.show() 

## Chronic Disease at Young Age

In [18]:
Chronic_Age = spark.sql("""
SELECT
    `Member Code`,
    `Diagnosis`,
    `Age`,
    COUNT(*) AS `Claim Count`
FROM
    table
WHERE
    `Age` < 30
    AND
    (`Diagnosis` LIKE '%hypertension%' 
    OR `Diagnosis` LIKE '%dyslipidemia%' 
    OR `Diagnosis` LIKE '%ischemic heart disease%' 
    OR `Diagnosis` LIKE '%chronic obstructive pulmonary disease%')
GROUP BY
    `Member Code`,
    `Diagnosis`,
    `Age`
ORDER BY
    `Claim Count`
""")



In [163]:
#Chronic_Age.show()

## Charging Consultation within the follow-up Period

In [36]:
# This Table Capture data that groups member code and diagnosis 

con_2 = spark.sql("""
SELECT *
FROM (
    SELECT 
        *,
        LEAD(`Member Code`) OVER (ORDER BY `Treatment Date`) AS Next_member_code,
        LEAD(`Diagnosis code`) OVER (ORDER BY `Treatment Date`) AS Next_daignosis,
        LEAD(`Treatment date`) OVER (PARTITION BY `Member Code`, `Diagnosis code` ORDER BY `Treatment Date`) AS next_treatment_date,
        DATEDIFF(LEAD(`Treatment date`) OVER (PARTITION BY `Member Code`, `Diagnosis code` ORDER BY `Treatment Date`), `Treatment date`) AS date_diff
    FROM 
        table
) AS subquery
 
""")

In [64]:
# This table captures consultation from same member and diagnosis with 14 days period 

Cons_Df = con_2.select('Member Code','Next_member_code', 'Diagnosis code', 'Next_daignosis', 'next_treatment_date', 'date_diff', 'Service Type', 'Treatment date') \
    .filter((con_2['Service Type'] == 'Consultation') & (con_2['Member Code'] == con_2['Next_member_code']) & (con_2['Member Code'] == con_2['Next_member_code']) 
           & (col('date_diff').between(1, 14))) 


## Maternity Services After Delivery

In [99]:
Maternity_Delivery = spark.sql("""
WITH delivery_table AS (
    SELECT 
       `Treatment Date`,
       `Member Code`,
       `Item Name`,
       FIRST_VALUE(`Treatment Date`) OVER(PARTITION BY `Member Code`) AS delivery_date
    FROM table 
    WHERE `Item Name` RLIKE '(?i)delivery'
),
Claims_table AS (
    SELECT 
        *
    FROM TABLE
)
SELECT 
    d.`Member Code`,
    COUNT(c.`Claim Number`) AS claim_count,
    c.`Treatment Date`
FROM delivery_table AS d
INNER JOIN Claims_table AS c 
    ON d.`Member Code`= c.`Member Code`
WHERE c.`Treatment Date` BETWEEN d.delivery_date AND DATE_ADD(d.delivery_date, 60)
GROUP BY d.`Member Code`,c.`Treatment Date`
""")

In [111]:
#Maternity_Delivery.show()
#Maternity_Delivery.count() # There are 2555 claims that meets the criteria

## Optical and Dental Claims for ages Less than 1 year 

In [105]:
Opt_den_1 = spark.sql("""
SELECT 
  *
FROM table 
WHERE `Service Type` RLIKE '(?i)optical' AND `Age` < 1 OR `Service Type` RLIKE '(?i)dental' AND `Age` < 1
 
""")

In [110]:
#Opt_den_1.select("Member Code","Service Type","Age").show()
#Opt_den_1.count() # There 146 Claims with the criteria

## Dental Abuse 

In [22]:
Dental_abuse = spark.sql("""

SELECT 
      `Claim Number`,
      `Treatment Date`,
       COUNT(`Claim Number`) AS No_of_Claims
FROM 
     table
WHERE 
     `Service Type` RLIKE '(?i)dental' 
GROUP BY 
     `Claim Number`,`Treatment Date`
HAVING 
     No_of_Claims >= 3 

""")

In [24]:
Dental_abuse.count() # There are 5,723 claims with this abuse

5723

In [26]:
# Dental_abuse.show()

## Dental Limit

In [37]:
Dental_limit = spark.sql("""
SELECT *,
       Next_treatment,
       Last_treatment
FROM
    (SELECT *,
           LEAD(`Treatment Date`) OVER(PARTITION BY `Member Code` ORDER BY `Treatment Date`) AS Next_treatment,
           LAST_VALUE(`Treatment Date`) OVER(PARTITION BY `Member Code` ORDER BY `Treatment Date` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS Last_treatment
    FROM table
    WHERE `Service Type` RLIKE '(?i)dental' AND datediff(`Policy Expiry Date`, `Treatment Date`) < 14
    ) AS subquery
""")

In [41]:


# Subtract 'Treatment Date' from 'Policy Expiry Date' and create a new column
Dental_limit_select = Dental_limit_select.withColumn('Date_Difference', expr("datediff(`Policy Expiry Date`, `Treatment Date`)"))

# Select required columns
Dental_limit_select = Dental_limit_select.select('Policy Expiry Date', 'Treatment Date', 'Member Code', 'Date_Difference')


In [46]:
# There are some Inconsistency in the Policy Expiry Date . 
#|2021-07-19|2022-01-27|3241763|-192|

# Dental_limit_select.show()
# Dental_limit_select.count() #There are 2,113 records 

## Optical Limit

In [50]:
Optical_limit = spark.sql("""
SELECT *,
       Next_treatment,
       Last_treatment
FROM
    (SELECT *,
           LEAD(`Treatment Date`) OVER(PARTITION BY `Member Code` ORDER BY `Treatment Date`) AS Next_treatment,
           LAST_VALUE(`Treatment Date`) OVER(PARTITION BY `Member Code` ORDER BY `Treatment Date` ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS Last_treatment
    FROM table
    WHERE `Service Type` RLIKE '(?i)optical' AND datediff(`Policy Expiry Date`, `Treatment Date`) < 14
    ) AS subquery
""")

In [52]:


# Subtract 'Treatment Date' from 'Policy Expiry Date' and create a new column
Optical_limit_Selected = Optical_limit.withColumn('Date_Difference', expr("datediff(`Policy Expiry Date`, `Treatment Date`)"))

# Select required columns
Optical_limit = Optical_limit_Selected.select('Policy Expiry Date', 'Treatment Date', 'Member Code', 'Date_Difference')


In [53]:
# There are some Inconsistency in the Policy Expiry Date . 
#|2022-02-03|2022-03-08|5807601|-33|
Optical_limit.show()
Optical_limit.count() # There are 1008 Record


+------------------+--------------+-----------+---------------+
|Policy Expiry Date|Treatment Date|Member Code|Date_Difference|
+------------------+--------------+-----------+---------------+
|        2022-01-19|    2022-01-11|    5784637|              8|
|        2022-02-13|    2022-02-08|    5136010|              5|
|        2022-02-09|    2022-02-03|    6390210|              6|
|        2022-02-16|    2022-02-10|    4107798|              6|
|        2022-01-22|    2022-01-12|    5786529|             10|
|        2022-01-22|    2022-01-11|    5786529|             11|
|        2022-02-16|    2022-02-10|    4107798|              6|
|        2022-02-06|    2022-02-02|    5819871|              4|
|        2022-02-06|    2022-02-06|    5820318|              0|
|        2022-02-03|    2022-01-30|    5807598|              4|
|        2022-01-25|    2022-01-22|    5162306|              3|
|        2022-01-31|    2022-01-18|    3903088|             13|
|        2022-02-06|    2022-02-03|    5

## Unbundle services

In [56]:
# Here I used Regexp to get all records containing each words.
Unbundle_services = spark.sql("""
SELECT 
       *
FROM
      table
WHERE 
     `Item Name` RLIKE '(?i)(Lipid\s*Profile|LDL|HDL|Cholesterol|Triglyceride)' AND `Service Type` = 'Lab' 
     OR
     `Item Name` RLIKE '(?i)(Renal\s*Profile|Sodium|Na|Potassium|K|Chloride|Urea|Creatine|Calcium|Bicarbonate)' AND `Service Type` = 'Lab'
     OR
     `Item Name` RLIKE '(?i)(Cardiac\s*Enzyme|troponin|CK|Ck-MB)' AND `Service Type` = 'Lab'
     OR
     `Item Name` RLIKE '(?i)(Electrolyte|sodium|NA|Potassium|K|chloride|CL|magnesium|MG|calcium|CA|Phosphate|PO4|bicarbonates)' AND `Service Type` = 'Lab'
""")


In [63]:
# Unbundle_services.count() #There are 137,136

In [58]:
# For comparism , I checked using regexp with just package name

Unbundle_services_2 = spark.sql("""
SELECT 
       *
FROM
      table
WHERE 
     `Item Name` RLIKE '(?i)(Lipid\s*Profile)' AND `Service Type` = 'Lab' 
     OR
     `Item Name` RLIKE '(?i)(Renal\s*Profile)' AND `Service Type` = 'Lab'
     OR
     `Item Name` RLIKE '(?i)(Cardiac\s*Enzyme)' AND `Service Type` = 'Lab'
     OR
     `Item Name` RLIKE '(?i)(Electrolyte)' AND `Service Type` = 'Lab'
""")


In [64]:
# Unbundle_services_2.select('Item Name','Claim Number','Service Type').show()
# Unbundle_services_2.count() # There are 1,737 Records 


## Work Related Claims with Relation self 

In [65]:
Work_related_claims = spark.sql("""
SELECT 
       *
FROM
      table
WHERE 
     `Diagnosis Code` RLIKE '(?i)(s)' AND `Member Relationship` = 'Self' 

""")

In [68]:
# Work_related_claims.select("Member Relationship","Diagnosis Code").show()
# Work_related_claims.count() #  There are 27,313 records 

## Infertility Claims 

In [69]:
Infertility_claims = spark.sql("""
SELECT 
       *
FROM
      table
WHERE 
     `Item Name` RLIKE '(?i)(Luteinizing\s*hormone|Lh|Follicle\s*stimulating\s*hormone|FSH)' AND `Age` BETWEEN 15 AND 50 
""")

In [71]:
Infertility_claims.count()

775

## Herbal Medication 

In [76]:
# Import Herbal Datasheet 

Herbal = spark.read.csv('Herbs.csv',header=True)

In [80]:
Herbal.createOrReplaceTempView("Herbs") # Create a Table

In [85]:
#Attempt to join both Tables Using Spark SQL 
# INNER JOIN Return ALL Matching records on Item name / Ingredient column
# There might be a better way to find these records 
Herbal_Medication = spark.sql("""
SELECT 
    *
FROM 
    table T
INNER JOIN 
    Herbs H ON T.`Item Name` = H.`Ingredient`
""")

In [86]:
Herbal_Medication.count() #There are 19,471 records 

19471

In [91]:
# Herbal_Medication.select('H.Ingredient','T.Item Name').show(100,False)