In [1]:
import configparser

from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructField,
    StructType,
    StringType,
    DoubleType,
    IntegerType
)
import pyspark.sql.functions as f
from pyspark.sql.functions import udf, monotonically_increasing_id

In [2]:
spark = SparkSession.builder\
            .master('local')\
            .appName('general_practices')\
            .getOrCreate()

In [3]:
config = configparser.ConfigParser()
config.read('dl.cfg')

access_key_id = config.get('CREDENTIALS', 'AWS_ACCESS_KEY_ID')
secret_key_id = config.get('CREDENTIALS', 'AWS_SECRET_ACCESS_KEY')

In [4]:
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key",access_key_id)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key_id)

In [5]:
# files
chemicals_file = 'data/chem_subs.csv'
practices_file = 'data/practices.csv'
practice_prescribing_file = 'data/practice_prescribing.csv'
practice_size_file = 'data/practice_list_size_and_gp_count.csv'
bnf_codes_file = 'data/bnf_codes.csv'

In [6]:
# schema settings for datasets
practice_prescribing_schema = StructType([
    StructField("SHA", StringType(), True),
    StructField("PCT", StringType(), True),
    StructField("PRACTICE", StringType(), True),
    StructField("BNF_CODE", StringType(), True),
    StructField("BNF_NAME", StringType(), True),
    StructField("ITEMS", IntegerType(), True),
    StructField("NIC", DoubleType(), True),
    StructField("ACT_COST", DoubleType(), True),
    StructField("QUANTITY", IntegerType(), True),
    StructField("PERIOD", StringType(), True),
])

chemicals_schema = StructType([
    StructField("CHEM_SUB_CODE", StringType(), True),
    StructField("NAME", StringType(), True),
])

practices_schema = StructType([
    StructField("PERIOD", StringType(), True),
    StructField("PRACTICE_CODE", StringType(), True),
    StructField("PRACTICE_NAME", StringType(), True),
    StructField("ADDRESS_1", StringType(), True),
    StructField("ADDRESS_2", StringType(), True),
    StructField("ADDRESS_3", StringType(), True),
    StructField("ADDRESS_4", StringType(), True),
    StructField("POSTCODE", StringType(), True),
])

practice_size_schema = StructType([
    StructField("COMM_PROV", StringType(), True),
    StructField("GROUP_CODE", StringType(), True),
    StructField("PRACTICE_NAME", StringType(), True),
    StructField("PRACTICE_ADDRESS", StringType(), True),
    StructField("PRACTICE_CODE", StringType(), True),
    StructField("GP_COUNT", IntegerType(), True),
    StructField("DISPENSING_LIST_SIZE", IntegerType(), True),
    StructField("PRESCRIBING_LIST_SIZE", IntegerType(), True),
    StructField("TOTAL_LIST_SIZE", IntegerType(), True),
])

bnf_codes_schema = StructType([
    StructField("BNF_CHAPTER", StringType(), True),
    StructField("BNF_CHAPTER_CODE", StringType(), True),
    StructField("BNF_SECTION", StringType(), True),
    StructField("BNF_SECTION_CODE", StringType(), True),
    StructField("BNF_PARAGRAPH", StringType(), True),
    StructField("BNF_PARAGRAPH_CODE", StringType(), True),
    StructField("BNF_SUBPARAGRAPH", StringType(), True),
    StructField("BNF_SUBPARAGRAPH_CODE", StringType(), True),
    StructField("BNF_CHEMICAL_SUBSTANCE", StringType(), True),
    StructField("BNF_CHEMICAL_SUBSTANCE_CODE", StringType(), True),
    StructField("BNF_PRODUCT", StringType(), True),
    StructField("BNF_PRODUCT_CODE", StringType(), True),
    StructField("BNF_PRESENTATION", StringType(), True),
    StructField("BNF_PRESENTATION_CODE", StringType(), True),
])

In [7]:
get_year = udf(lambda x: int(x[:-2]) if x else None, IntegerType())
get_month = udf(lambda x: int(x[-2:]) if x else None, IntegerType())

get_chapter = udf(lambda x: x[:2] if x else None)
get_section = udf(lambda x: x[:4] if x else None)
get_paragraph = udf(lambda x: x[:6] if x else None)
get_sub_paragraph = udf(lambda x: x[:7] if x else None)
get_chemical = udf(lambda x: x[:9] if x else None)
get_product = udf(lambda x: x[:11] if x else None)

## CHEMICALS TABLE

In [8]:
chemicals = spark.read.csv(chemicals_file, header=True, schema=chemicals_schema)

In [9]:
chemicals = chemicals.withColumn("CHAPTER_CODE", get_chapter(chemicals['CHEM_SUB_CODE']))
chemicals = chemicals.withColumn("SECTION_CODE", get_section(chemicals['CHEM_SUB_CODE']))
chemicals = chemicals.withColumn("PRARGRAPH_CODE", get_paragraph(chemicals['CHEM_SUB_CODE']))
chemicals = chemicals.withColumn("SUB_PARAGRAPH_CODE", get_sub_paragraph(chemicals['CHEM_SUB_CODE']))

In [10]:
chemicals.show(3)

+-------------+--------------------+------------+------------+--------------+------------------+
|CHEM_SUB_CODE|                NAME|CHAPTER_CODE|SECTION_CODE|PRARGRAPH_CODE|SUB_PARAGRAPH_CODE|
+-------------+--------------------+------------+------------+--------------+------------------+
|    010101000|Other Antacid & S...|          01|        0101|        010101|           0101010|
|    0101010A0|     Alexitol Sodium|          01|        0101|        010101|           0101010|
|    0101010B0|          Almasilate|          01|        0101|        010101|           0101010|
+-------------+--------------------+------------+------------+--------------+------------------+
only showing top 3 rows



In [11]:
# path = 's3a://capstone-health/staging/' + 'chemicals.parquet'
# pathy = 'chemicals.parquet'
# print(path)
# chemicals.write.parquet(path, mode='overwrite')

## PRACTICES TABLE

In [12]:
practices = spark.read.csv(practices_file, header=False, schema=practices_schema)

In [13]:
practices = practices.withColumn("YEAR", get_year(practices.PERIOD))
practices = practices.withColumn("MONTH", get_month(practices.PERIOD))

In [14]:
practices = practices.drop('PERIOD')

In [15]:
practices.schema

StructType(List(StructField(PRACTICE_CODE,StringType,true),StructField(PRACTICE_NAME,StringType,true),StructField(ADDRESS_1,StringType,true),StructField(ADDRESS_2,StringType,true),StructField(ADDRESS_3,StringType,true),StructField(ADDRESS_4,StringType,true),StructField(POSTCODE,StringType,true),StructField(YEAR,IntegerType,true),StructField(MONTH,IntegerType,true)))

In [16]:
practices.show(3)

+-------------+--------------------+--------------------+-------------+----------------+---------+--------+----+-----+
|PRACTICE_CODE|       PRACTICE_NAME|           ADDRESS_1|    ADDRESS_2|       ADDRESS_3|ADDRESS_4|POSTCODE|YEAR|MONTH|
+-------------+--------------------+--------------------+-------------+----------------+---------+--------+----+-----+
|       A81001| THE DENSHAM SURGERY|   THE HEALTH CENTRE|LAWSON STREET|STOCKTON-ON-TEES|CLEVELAND|TS18 1HU|2018|   12|
|       A81002|QUEENS PARK MEDIC...|QUEENS PARK MEDIC...|FARRER STREET|STOCKTON ON TEES|CLEVELAND|TS18 2AW|2018|   12|
|       A81004|BLUEBELL MEDICAL ...|      TRIMDON AVENUE|       ACKLAM|   MIDDLESBROUGH|      N/A| TS5 8SB|2018|   12|
+-------------+--------------------+--------------------+-------------+----------------+---------+--------+----+-----+
only showing top 3 rows



## LINK THIS TO PRACTICES FOR MORE INFORMATION ABOUT PRACTICES

In [17]:
practice_size_and_group = spark.read.csv(practice_size_file, header=True, schema=practice_size_schema)

In [18]:
practice_groups = practice_size_and_group.select(
    'GROUP_CODE',
    'COMM_PROV',
).dropDuplicates()
practice_groups = practice_groups.na.drop(subset=["GROUP_CODE"])

In [19]:
practice_groups.show(5)

+----------+--------------------+
|GROUP_CODE|           COMM_PROV|
+----------+--------------------+
|     RQX00|HOMERTON UNIV HPL...|
|     RWK00|EAST LONDON NHS F...|
|     NLG00|COMMUNITAS CLINIC...|
|     Q6200|NORTH WEST LONDON...|
|     NLO00|              VOCARE|
+----------+--------------------+
only showing top 5 rows



In [20]:
practice_size = practice_size_and_group.select(
    'PRACTICE_CODE',
    'GROUP_CODE',
    'GP_COUNT',
    'DISPENSING_LIST_SIZE',
    'PRESCRIBING_LIST_SIZE',
    'TOTAL_LIST_SIZE'
)
practice_size = practice_size.na.drop(subset=["PRACTICE_CODE"])

In [21]:
practice_size.show(5)

+-------------+----------+--------+--------------------+---------------------+---------------+
|PRACTICE_CODE|GROUP_CODE|GP_COUNT|DISPENSING_LIST_SIZE|PRESCRIBING_LIST_SIZE|TOTAL_LIST_SIZE|
+-------------+----------+--------+--------------------+---------------------+---------------+
|       Y05205|     NI300|       2|                null|                 null|           null|
|       Y00898|     NI300|      37|                null|                 null|           null|
|       Y05727|     NI300|       2|                null|                 null|           null|
|       Y06182|     NI300|       1|                null|                 null|           null|
|       Y06053|     NI300|       1|                null|                 null|           null|
+-------------+----------+--------+--------------------+---------------------+---------------+
only showing top 5 rows



## PRESCRIBING FACTS TABLE

In [22]:
practice_prescribing = spark.read.csv(practice_prescribing_file, header=True, schema=practice_prescribing_schema)

In [23]:
practice_prescribing = practice_prescribing.withColumn("PRESCRIPTION_ID", monotonically_increasing_id())

In [24]:
practice_prescribing = practice_prescribing.withColumn("YEAR", get_year(practice_prescribing.PERIOD))
practice_prescribing = practice_prescribing.withColumn("MONTH", get_month(practice_prescribing.PERIOD))
practice_prescribing = practice_prescribing.drop('PERIOD')
practice_prescribing = practice_prescribing.withColumn("BNF_CHAPTER_CODE", get_chapter(practice_prescribing['BNF_CODE']))
practice_prescribing = practice_prescribing.withColumn("BNF_SECTION_CODE", get_section(practice_prescribing['BNF_CODE']))
practice_prescribing = practice_prescribing.withColumn("BNF_PARAGRAPH_CODE", get_paragraph(practice_prescribing['BNF_CODE']))
practice_prescribing = practice_prescribing.withColumn("BNF_SUBPARAGRAPH_CODE", get_sub_paragraph(practice_prescribing['BNF_CODE']))
practice_prescribing = practice_prescribing.withColumn("BNF_CHEMICAL_CODE", get_chemical(practice_prescribing['BNF_CODE']))
practice_prescribing = practice_prescribing.withColumn("BNF_PRODUCT_CODE", get_product(practice_prescribing['BNF_CODE']))

In [42]:
practice_prescribing.limit(20).toPandas()

Unnamed: 0,SHA,PCT,PRACTICE,BNF_CODE,BNF_NAME,ITEMS,NIC,ACT_COST,QUANTITY,PRESCRIPTION_ID,YEAR,MONTH,BNF_CHAPTER_CODE,BNF_SECTION_CODE,BNF_PARAGRAPH_CODE,BNF_SUBPARAGRAPH_CODE,BNF_CHEMICAL_CODE,BNF_PRODUCT_CODE
0,Q44,01C,N81002,0101021B0AAALAL,Sod Algin/Pot Bicarb_Susp S/F,1,10.24,9.52,1000,0,2018,12,1,101,10102,101021,0101021B0,0101021B0AA
1,Q44,01C,N81002,0101021B0AAAPAP,Sod Alginate/Pot Bicarb_Tab Chble 500mg,1,3.07,2.86,60,1,2018,12,1,101,10102,101021,0101021B0,0101021B0AA
2,Q44,01C,N81002,0101021B0BEACAH,Gaviscon_Liq Orig Aniseed Relief,1,12.99,12.07,900,2,2018,12,1,101,10102,101021,0101021B0,0101021B0BE
3,Q44,01C,N81002,0101021B0BEADAJ,Gaviscon Infant_Sach 2g (Dual Pack) S/F,3,81.94,76.09,255,3,2018,12,1,101,10102,101021,0101021B0,0101021B0BE
4,Q44,01C,N81002,0101021B0BEAIAL,Gaviscon Advance_Liq (Aniseed) (Reckitt),11,55.29,51.76,5400,4,2018,12,1,101,10102,101021,0101021B0,0101021B0BE
5,Q44,01C,N81002,0101021B0BEBEAL,Gaviscon Advance_Liq (Peppermint) S/F,3,10.24,9.64,1000,5,2018,12,1,101,10102,101021,0101021B0,0101021B0BE
6,Q44,01C,N81002,0102000L0AAACAC,Glycopyrronium Brom_Tab 1mg,1,430.66,399.84,56,6,2018,12,1,102,10200,102000,0102000L0,0102000L0AA
7,Q44,01C,N81002,0102000N0AAABAB,Hyoscine Butylbrom_Tab 10mg,12,54.11,50.57,1010,7,2018,12,1,102,10200,102000,0102000N0,0102000N0AA
8,Q44,01C,N81002,0102000N0BBAAAB,Buscopan_Tab 10mg,4,19.5,18.25,364,8,2018,12,1,102,10200,102000,0102000N0,0102000N0BB
9,Q44,01C,N81002,0102000P0AAABAB,Mebeverine HCl_Tab 135mg,6,17.87,17.16,424,9,2018,12,1,102,10200,102000,0102000P0,0102000P0AA


## BNF CHAPTER, SECTION, PARAGRAPH, SUBPARAGRAPH, PRODUCT, PRESENTATION TABLES

In [27]:
bnf_codes = spark.read.csv(bnf_codes_file, header=True, schema=bnf_codes_schema)

In [28]:
bnf_chapters = bnf_codes.select('BNF_CHAPTER_CODE', 'BNF_CHAPTER').dropDuplicates()
bnf_sections = bnf_codes.select('BNF_SECTION_CODE', 'BNF_SECTION').dropDuplicates()
bnf_paragraphs = bnf_codes.select('BNF_PARAGRAPH_CODE', 'BNF_PARAGRAPH').dropDuplicates()
bnf_subparagraph = bnf_codes.select('BNF_SUBPARAGRAPH_CODE', 'BNF_SUBPARAGRAPH').dropDuplicates()
bnf_chemicals = bnf_codes.select('BNF_CHEMICAL_SUBSTANCE_CODE', 'BNF_CHEMICAL_SUBSTANCE').dropDuplicates()
bnf_products = bnf_codes.select('BNF_PRODUCT_CODE', 'BNF_PRODUCT').dropDuplicates()
bnf_presentations = bnf_codes.select('BNF_PRESENTATION_CODE', 'BNF_PRESENTATION').dropDuplicates()

In [43]:
bnf_presentations.limit(20).toPandas()

Unnamed: 0,BNF_PRESENTATION_CODE,BNF_PRESENTATION
0,0101010I0AAAJAJ,Mag Ox_Cap 10mg
1,010102100AAAIAI,Bism Subsalic_Tab 262.5mg Chble S/F
2,0102000H0BFAAA0,Actonorm Sed_Gel
3,0102000N0AAANAN,Hyoscine Butylbrom_Liq Spec 4.9mg/5ml
4,0103050P0AAARAR,Omeprazole_Liq Spec 25mg/5ml
5,0107010N0AAADAD,Gppe Crm_Hemocane
6,0202010D0AAAQAQ,Chloroth_Cap 100mg
7,0202010F0AAACAC,Chlortalidone_Tab 500mg
8,0202010L0AAAHAH,Hydchloroth_Oral Liq @spec
9,0202030S0AAA5A5,Spironol_Susp 10mg/5ml S/F


In [44]:
practice_prescribing.filter("BNF_CODE = '0202040T0AAAAAA'").collect()

[Row(SHA='Q44', PCT='01C', PRACTICE='N81070', BNF_CODE='0202040T0AAAAAA', BNF_NAME='Spironol/Furosemide_Cap 50mg/20mg', ITEMS=1, NIC=15.94, ACT_COST=14.81, QUANTITY=56, PRESCRIPTION_ID=20430, YEAR=2018, MONTH=12, BNF_CHAPTER_CODE='02', BNF_SECTION_CODE='0202', BNF_PARAGRAPH_CODE='020204', BNF_SUBPARAGRAPH_CODE='0202040', BNF_CHEMICAL_CODE='0202040T0', BNF_PRODUCT_CODE='0202040T0AA'),
 Row(SHA='Q44', PCT='02D', PRACTICE='N81051', BNF_CODE='0202040T0AAAAAA', BNF_NAME='Spironol/Furosemide_Cap 50mg/20mg', ITEMS=1, NIC=15.94, ACT_COST=14.81, QUANTITY=56, PRESCRIPTION_ID=68155, YEAR=2018, MONTH=12, BNF_CHAPTER_CODE='02', BNF_SECTION_CODE='0202', BNF_PARAGRAPH_CODE='020204', BNF_SUBPARAGRAPH_CODE='0202040', BNF_CHEMICAL_CODE='0202040T0', BNF_PRODUCT_CODE='0202040T0AA'),
 Row(SHA='Q44', PCT='02D', PRACTICE='N81127', BNF_CODE='0202040T0AAAAAA', BNF_NAME='Spironol/Furosemide_Cap 50mg/20mg', ITEMS=2, NIC=15.94, ACT_COST=14.82, QUANTITY=56, PRESCRIPTION_ID=80035, YEAR=2018, MONTH=12, BNF_CHAPTER_C

In [21]:
chemicals.count()

3481