In [1]:
import pandas as pd
import os
from datetime import datetime
import zipfile
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from functools import reduce

In [2]:
path =os.path.join(os.path.join(os.path.abspath('')),'Files')

In [3]:
spark = SparkSession.builder.appName("edp_analysis").master("local").config("spark.driver.memory", "15g").getOrCreate()

### Transforming Data into Parquet

In [7]:
for file in os.listdir(path):
    print(file)
    df = spark.read.option("header", "true").csv(os.path.join(path,file))
    file_name = os.path.join(path,file.split('.')[0]+'.parquet')
    df.write.parquet(file_name)

EPD_202206.csv
EPD_202207.csv
EPD_202208.csv


### Read Parquet Files

In [4]:
os.listdir(path)

['EPD_202206.parquet', 'EPD_202207.parquet', 'EPD_202208.parquet']

In [5]:
df_list = []
for file in os.listdir(path):
    df_ = spark.read.option("header","true").option("mergeSchema", "true").load('C:\\Users\\Olist\\OneDrive\\Ambiente de Trabalho\\Projects\\interview\\Files\\EPD_202206.parquet')
    print((df_.count(), len(df_.columns)))
    df_list.append(df_)
df = reduce(DataFrame.unionAll, df_list)
print((df.count(), len(df.columns)))

(17603900, 26)
(17603900, 26)
(17603900, 26)
(52811700, 26)


In [6]:
df.columns

['YEAR_MONTH',
 'REGIONAL_OFFICE_NAME',
 'REGIONAL_OFFICE_CODE',
 'ICB_NAME',
 'ICB_CODE',
 'PCO_NAME',
 'PCO_CODE',
 'PRACTICE_NAME',
 'PRACTICE_CODE',
 'ADDRESS_1',
 'ADDRESS_2',
 'ADDRESS_3',
 'ADDRESS_4',
 'POSTCODE',
 'BNF_CHEMICAL_SUBSTANCE',
 'CHEMICAL_SUBSTANCE_BNF_DESCR',
 'BNF_CODE',
 'BNF_DESCRIPTION',
 'BNF_CHAPTER_PLUS_CODE',
 'QUANTITY',
 'ITEMS',
 'TOTAL_QUANTITY',
 'ADQUSAGE',
 'NIC',
 'ACTUAL_COST',
 'UNIDENTIFIED']

### Creating DF for Prescribers and Prescriptions

In [7]:
prescribers = df.select(['PRACTICE_NAME',
 'PRACTICE_CODE',
 'ADDRESS_1',
 'ADDRESS_2',
 'ADDRESS_3',
 'ADDRESS_4',
 'POSTCODE']).dropDuplicates(['PRACTICE_CODE'])

print((prescribers.count(), len(prescribers.columns)))

(8927, 7)


In [8]:
file_name = os.path.join(path,'prescribers.parquet')
prescribers.write.parquet(file_name)

In [9]:
prescriptions = df.select(['BNF_CHEMICAL_SUBSTANCE',
 'CHEMICAL_SUBSTANCE_BNF_DESCR',
 'BNF_CODE',
 'BNF_DESCRIPTION',
 'BNF_CHAPTER_PLUS_CODE'
]).dropDuplicates(['BNF_CODE'])

print((prescriptions.count(), len(prescriptions.columns)))
file_name = os.path.join(path,'prescriptions.parquet')
prescriptions.write.parquet(file_name)

(21968, 5)


### 10 Most Recurrent Prescribers

In [11]:
df.groupBy(['PRACTICE_NAME',
 'PRACTICE_CODE']).count().sort(desc('count')).show(10)

+--------------------+-------------+-----+
|       PRACTICE_NAME|PRACTICE_CODE|count|
+--------------------+-------------+-----+
|MODALITY PARTNERS...|       B83033|35049|
|MEDICUS HEALTH PA...|       F85002|30201|
|MIDLANDS MEDICAL ...|       M85063|29946|
|       SHORE MEDICAL|       J81012|25506|
|PORTSDOWN GROUP P...|       J82155|24696|
|MODALITY PARTNERS...|       B81048|24210|
|   BAY MEDICAL GROUP|       Y01008|24135|
|SUTTON COLDFIELD ...|       M85046|24084|
|REGIS MEDICAL CENTRE|       M88004|24045|
|HEATON MOOR MEDIC...|       P88026|23544|
+--------------------+-------------+-----+
only showing top 10 rows



### Most Used Prescriptions in volume and Quantity

In [12]:
df.groupBy(['BNF_CODE','BNF_CHEMICAL_SUBSTANCE']).agg(sum('QUANTITY').alias("sum_quantity"), \
         count("items").alias("total_items")).sort(desc('sum_quantity')).show(10)

+---------------+----------------------+------------+-----------+
|       BNF_CODE|BNF_CHEMICAL_SUBSTANCE|sum_quantity|total_items|
+---------------+----------------------+------------+-----------+
|090402000BBRRA0|             090402000|   2.96001E8|      37638|
|090402000BBAJA0|             090402000|  2.501838E8|      34302|
|090402000BBSIA0|             090402000|2.27826375E8|      41475|
|090402000BBVTA0|             090402000|  1.993374E8|      35010|
|090402000BBUBA0|             090402000|1.83060375E8|      33660|
|090401000BBGFA0|             090401000|   1.27299E8|      19464|
|090402000BBLLA0|             090402000|   9.61812E7|       4365|
|090402000BBNTA0|             090402000|  9.594882E7|      12849|
|090402000BBRZA0|             090402000| 8.6700798E7|       6513|
|090402000BBVWA0|             090402000|   8.24634E7|      12489|
+---------------+----------------------+------------+-----------+
only showing top 10 rows



In [13]:
df.groupBy(['BNF_CODE','BNF_CHEMICAL_SUBSTANCE']).agg(sum('QUANTITY').alias("sum_quantity"), \
         count("items").alias("total_items")).sort(desc('total_items')).show(10)

+---------------+----------------------+------------+-----------+
|       BNF_CODE|BNF_CHEMICAL_SUBSTANCE|sum_quantity|total_items|
+---------------+----------------------+------------+-----------+
|0408010G0AAABAB|             0408010G0| 2.9737434E7|     225804|
|0603020T0AAACAC|             0603020T0| 1.3733571E7|     225042|
|040702040AAAAAA|             040702040| 2.2018131E7|     215415|
|0407010H0AAAMAM|             0407010H0| 2.8013712E7|     213042|
|0901020G0AAAGAG|             0901020G0|   6137526.0|     177027|
|0601022B0AAABAB|             0601022B0| 2.0288823E7|     175956|
|0403030E0AAAAAA|             0403030E0| 1.0567467E7|     172305|
|0401020K0AAAHAH|             0401020K0|   5408526.0|     171921|
|0403010B0AAAGAG|             0403010B0| 1.2715602E7|     162612|
|0407020C0AAAEAE|             0407020C0| 1.4909421E7|     147600|
+---------------+----------------------+------------+-----------+
only showing top 10 rows



#### Most Prescriptions By Month

In [14]:
df.createOrReplaceTempView('edp_data')

In [24]:
spark.sql(''' 
    Select *
    from(
    select YEAR_MONTH,CHEMICAL_SUBSTANCE_BNF_DESCR, PRACTICE_NAME,COUNT(*) AS PRESCRIPTIONS, RANK() OVER ( PARTITION BY YEAR_MONTH,CHEMICAL_SUBSTANCE_BNF_DESCR ORDER BY COUNT(*) DESC) as rnk
    from edp_data
    group by 1, 2, 3
    order by 4 desc)
    where rnk <=10
''').show()

+----------+----------------------------+--------------------+-------------+---+
|YEAR_MONTH|CHEMICAL_SUBSTANCE_BNF_DESCR|       PRACTICE_NAME|PRESCRIPTIONS|rnk|
+----------+----------------------------+--------------------+-------------+---+
|    202206|        Wound Management ...|BCHC MEDICINES MA...|         3024|  1|
|    202206|              Ileostomy Bags|MARSS NHS WIRRAL CCG|         2220|  1|
|    202206|              Colostomy Bags|MARSS NHS WIRRAL CCG|         1902|  1|
|    202206|              Colecalciferol|        PARK SURGERY|         1617|  1|
|    202206|        Wound Management ...|NON MEDICAL PRESC...|         1569|  2|
|    202206|        Wound Management ...|BLACKPOOL LOCALIT...|         1521|  3|
|    202206|        Methadone hydroch...|DRUG AND ALCOHOL ...|         1425|  1|
|    202206|        Wound Management ...|BURY COMMUNITY SE...|         1416|  4|
|    202206|              Colecalciferol|TRINITY MEDICAL C...|         1392|  2|
|    202206|        Wound Ma