## 1. Introduction
The main objective for collecting the data is to analyse and predict the usage of a certain medical drug, related to a given geographical area. Other points I am interesetd to find are: popularity of a medical drug and seasonal consumption. To summarise the ideas the questions I am trying to answer are: 

1. Which is the medication prescribed in a certain area? Once the answer is available by analysing the data, I will be trying to build a ststistical model to predict its future consumption.


2. What is the preferred chemical prescribed by GP practices for a given category? For example, the research aims to explore the most prescribed drug for category. Examples of such categories include: "Hypertension and Heart Failure", "Dyspepsia and gastro-oesophageal reflux disease", etc -- to name a few. A full list is available in the BNF (British National Formulary) document.


3. Which medication is more used in a given season? This is an attempt to explore the trend for a medication regarding a specific season. The result will report its category as well.

In order to simplify the number of joins, and at the same time, provide more granularity on the geographic areas, these have been taken from the field County of the files T\*ADDR+BNFT.CSV. Thus the location of the GP practice will have a granularity at county level rather than region area which is definitely bigger, therefore supposed to provide less detail. For example the location for an observation will now indicate "Shrewsbury" instead of "Shropshire and Staffordshire".

Finally, given the huge amount of data collected, the same data will be used to build and validate a prediction model. This could be a useful instrument to make predictions related to point 1. The model will use future data to predict the usage of a certain drug in a given area.

## 2. Pre-processing data
### 2.1 Module imports
The project will be imlemented in Python, combined with the Spark framework. Below, the necessary modules. The user defined bnf module parses a web page where the BNF codes are explained and creates a json file called "sections.json". Each entry corresponds to a section of the BNF book. Given its auxiliary role to the project, this functionality is implemented as module and its source code is available in the appendix.

In [1]:
# Generic imports
import pyspark
import re
import json
import numpy as np

# Spark mllib imports
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.tree import RandomForest
from pyspark.mllib.evaluation import RegressionMetrics

# User defined imports
import bnf


sc = pyspark.SparkContext('local[*]')

### 2.2 BNF sections are parsed and saved

In [2]:
bnf.parse_n_save()

Retrieving from: https://openprescribing.net/bnf/
Results written to sections.json


### 2.3 Reads and parallelizes BNF sections' file
Here the BNF codes are read and parallelised by creating RDDs. The first 10 records are shown below as an example. As can be seen, the entries are not necessarily mutually exclusive. It is up to the user to chose either the leafs of the BNF book or consider nodes in the hierarchy branches. 

In [3]:
with open('sections.json', 'r') as file:
    content = file.read()

sections = json.loads(content)
sections = [tuple(x.values()) for x in sections]

bnf = sc.parallelize(sections)
print(bnf.collect()[:10])

[('01', 'Gastro-Intestinal System'), ('0101', 'Dyspepsia and gastro-oesophageal reflux disease'), ('010101', 'Antacids and simeticone'), ('010102', 'Compound alginates and proprietary indigestion preparations'), ('0102', 'Antispasmodics and other drugs altering gut motility'), ('0103', 'Antisecretory drugs and mucosal protectants'), ('010301', 'H2-receptor antagonists'), ('010302', 'Selective antimuscarinics'), ('010303', 'Chelates and complexes'), ('010304', 'Prostaglandin analogues')]


### 2.4 Helper Functions

In [4]:
def to_Dicts(rows, cols, sep):
    """
    Reads a csv formated line and provides a dictionary as output.
    @param rows: The row to be processed.
    @param cols: A list with the columns headers previously extracted.
    @param sep: The separator used to divide several fields.
    @return: A dictionary with the header names and their corresponding values.
    """
    res = {}
    rows = rows.split(sep)
    for i,j in zip(cols, rows):
        j = re.sub('[\'"]', '', j.strip())
        res.update({i : j})
    return res

In [5]:
def parse_val(row, fx, name):
    """
    Parses a single named value from the PDPI file. It also removes white spaces and and 
    applies a function to the value.
    @param row: a single row in csv format.
    @param fx: a function to be applied to a value.
    @param name: The (column header) name to identity the field.
    @return: A transformed value having a suitable datatype.
    """
    row[name] = re.sub(' ', '', row[name])
    if (row[name].lower() != name):
        return fx(row[name])
    return fx(0)

In [6]:
def parse_header(row):
    """
    Parses a header file from a csv format and extracts the column names.
    @param row: The header row to be parsed.
    @return: A list containing the 'cleaned' field names.
    """
    res = []
    for col in first_row.split(','):
        col = col.strip().lower()
        col = re.sub('[ \'"]', '', col)
        res.append(col)
    return res

### 2.5 Read Prescription data (PDPI)
The main component of our dataset is the data describing prescriptions. In order to read this data and create a RDD out of it two helper funtions have been created. Their task is to respectively parse headers and values as well as to transform them into a more suitable format.

The file containing prescriptions is being read and evaluated.

The first line has been read and the field names parsed and extracted. From the RDD pdpi_lines we firstly filter lines in order to exclude the headers. Then each line is turned into a Python dictionary. In this way we can address and process each value by its field name.<br>

It is also worth mentioning that it is in this part that I have started to make use of the parallelism offered by Spark. As can be seen, a chain of transformations are set to be applied to the original RDD. According to the lazy evaluation at the heart of Spark, they will be applied as soon as an action will be called upon the RDD. To be more precise, none of the filter and map functions will be applied to the original RDD until the action take(10) has to be executed. The action .take() will apply, in chain, all the transformations prior its appearance and cause the driver to collect ten results.<br>

The ten elements have been collected for illustration purposes. The .py version of the same file, which is meant and provided to be run as a batch program does not collect its partial result, but carries on instead with the other computations to come.

In [7]:
pdpi_lines = sc.textFile("./data/T*PDPI*.CSV")
first_row = pdpi_lines.first()
headers = parse_header(first_row)

pdpi = (pdpi_lines
    .filter(lambda p: p != first_row)
    .map(lambda p: to_Dicts(p, headers, ','))
    .map(lambda p: (p['sha'], p['practice'], p['bnfcode'][:9], 
                    parse_val(p, int, 'items'),
                    parse_val(p, float, 'nic'),
                    parse_val(p, float, 'actcost'),
                    parse_val(p, int, 'quantity'),
                    p['period']))
)


print(pdpi.take(10))

[('Q44', 'Y04937', '0204000R0', 2, 2.01, 2.08, 63, '201611'), ('Q44', 'Y04937', '0205051R0', 1, 2.58, 2.4, 56, '201611'), ('Q44', 'Y04937', '0401010Y0', 1, 0.6, 0.67, 14, '201611'), ('Q44', 'Y04937', '0401010Z0', 2, 1.18, 1.32, 28, '201611'), ('Q44', 'Y04937', '0401020K0', 4, 2.52, 2.79, 90, '201611'), ('Q44', 'Y04937', '0402010AB', 5, 3.95, 4.22, 228, '201611'), ('Q44', 'Y04937', '0402010AB', 1, 79.33, 73.54, 28, '201611'), ('Q44', 'Y04937', '0402010A0', 1, 2.5, 2.43, 28, '201611'), ('Q44', 'Y04937', '0402010S0', 1, 48.94, 45.41, 560, '201611'), ('Q44', 'Y04937', '040201030', 1, 1.13, 1.16, 28, '201611')]


### 2.6 Read GP addresses data (ADDR)

In [8]:
addr_cols = ['date', 'practice_code', 'name', 'first_line_addr', 'street', 'city', 'county', 'postcode']

addr_lines = sc.textFile("./data/T*ADDR*.CSV")

addrs = (addr_lines
    .map(lambda a: to_Dicts(a, addr_cols, ','))
    .filter(lambda a: a['county'] != '')
    .map(lambda a: (a['date'], a['practice_code'], a['county']))
)

print(addrs.take(10))

[('201612', 'A81001', 'CLEVELAND'), ('201612', 'A81002', 'CLEVELAND'), ('201612', 'A81004', 'CLEVELAND'), ('201612', 'A81006', 'CLEVELAND'), ('201612', 'A81007', 'CLEVELAND'), ('201612', 'A81008', 'CLEVELAND'), ('201612', 'A81009', 'CLEVELAND'), ('201612', 'A81011', 'CLEVELAND'), ('201612', 'A81014', 'CLEVELAND'), ('201612', 'A81015', 'CLEVELAND')]


### 2.7 Read drug related data (Chemicals)

In [9]:
chem_lines = sc.textFile("./data/T*CHEM*.CSV")
header = chem_lines.first()

chem = (chem_lines
    .filter(lambda x: x != header)
    .map(lambda x: tuple(y.strip() for y in x.split(',')[:-1]))
    .distinct()
)

print(chem.take(10))

[('0101010A0', 'Alexitol Sodium'), ('0101010C0', 'Aluminium Hydroxide'), ('0101010E0', 'Hydrotalcite'), ('0101010F0', 'Magnesium Carbonate'), ('0101010I0', 'Magnesium Oxide'), ('0101010J0', 'Magnesium Trisilicate'), ('0101010L0', 'Aluminium & Magnesium & Act Simeticone'), ('0101010M0', 'Magaldrate'), ('0101010R0', 'Simeticone'), ('0101010T0', 'Magnesium Sulphate')]


## 3 Data Analysis
### 3.1 Joining datasets
In this part data coming from separate files/RDDs will be combined together in order to perform the three objectives of the data analysis, presented in the introduction. Initially I will start with the prescriptions data, which will undergo a set of transformations and named t1 (Table 1). Although I will be using the Spark RDDs for this task, I found it useful to reason in SQL terms, thus this choice of nomenclature (t1_t2_t3 means join of t1,t2 and t3). Also, the analysis I intend to do on the dataset has been presented as SQL queries as a more intuitive approach.

Before joing two RDDs together, a typical sequence of operations taking place below consists in:
* Projection -- chose the fields of interest.
* Chose the fields to use as keys for each RDD.
* Perform an inner join transformation on the designated fields.
* Chose a reduce policy. This boils down to grouping together partitions having the same keys and performing aggregation operations on them.
* After a join, new keys will be chosen, perform a new join and so on -- until a sort of view t1_t2_t3 will be ready for use. All the exploratory and machine learning analysis will be based on this data structure.
* Eventually, t1_t2_t3 RDD will be persisted in memory since its use is required several times.

### 3.2 Which are the drugs more frequently used in a certain area ?
The result will also act as a base view to be used in other queries.

SELECT <br>
<blockquote>pdpi.bnf_code,<br>
    sum(pdpi.items),<br>
    sum(pdpi.act_cost),<br>
    sum(pdpi.quantity),<br>
    chem.name,<br>
    addrs.county,<br>
    bnf.desc,<br>
    count(*) AS NPrescriptions<br></blockquote>
FROM <br>
<blockquote>PDPI pdpi join ADDR addrs ON (pdpi.practice = addrs.code) AND<br>
    pdpi join CHEM chem ON (pdpi.code = chem.code) AND<br>
    pdpi join BNF bnf ON (pdpi.bnf_code = bnf.code)<br></blockquote>
GROUP BY<br>
<blockquote>pdpi.bnf_code, addrs.county<br></blockquote>
ORDER BY<br>
<blockquote>addrs.county, NPrescriptions DESC<br></blockquote>

In [26]:
# E.g.: Initial State ('Q44', 'Y04937', '0204000R0', 2, 2.01, 2.08, 63, '201611')
t1 = (pdpi
    .map(lambda x: x[1:4] + x[5:])
    .keyBy(lambda x: x[1])
    .join(chem)
    .values()
    .map(lambda x: x[0] + (x[1],))
    .keyBy(lambda x: (x[0],x[5]))
)

# E.g.: Initial State ('201612', 'A81001', 'CLEVELAND')
t2 = addrs.map(lambda x: ((x[1], x[0]), (x[2])))

t1_t2 = (t1
    .join(t2)
    .mapValues(lambda x: x[0] + (x[1], ))
    .values()
    .keyBy(lambda x: x[1][:7].rstrip('0'))
)

# E.g.: Initial State ('01', 'Gastro-Intestinal System')
t3 = bnf.filter(lambda x: len(x[0]) >= 6)

t1_t2_t3 = ( t1_t2
    .join(t3)
    .mapValues(lambda x: x[0] + (x[1],1))
    .values()
    .map(lambda x: x[:5] + x[6:])            
    .keyBy(lambda x: (x[1], x[6]))
    .map(lambda x: (x[0], x[1][1:]))
    .reduceByKey(lambda x,y: (x[0], x[1] + y[1], round(x[2] + y[2], 3), x[3] + y[3], x[4], 
                              x[5], x[6], x[7] + y[7]))
    .sortBy(lambda x: (x[0][1], x[1][7]), ascending=False)
    .persist()
) 

print(t1_t2_t3.take(10))

[(('0403040W0', 'MERSEYSIDE'), ('0403040W0', 64, 580.16, 1029, 'Venlafaxine', 'MERSEYSIDE', 'Other antidepressant drugs', 18)), (('0402010AB', 'MERSEYSIDE'), ('0402010AB', 95, 337.89, 2093, 'Quetiapine', 'MERSEYSIDE', 'Antipsychotic drugs', 16)), (('040201060', 'MERSEYSIDE'), ('040201060', 25, 27.51, 370, 'Olanzapine', 'MERSEYSIDE', 'Antipsychotic drugs', 12)), (('0402010AD', 'MERSEYSIDE'), ('0402010AD', 38, 35.7, 567, 'Aripiprazole', 'MERSEYSIDE', 'Antipsychotic drugs', 8)), (('0403040X0', 'MERSEYSIDE'), ('0403040X0', 99, 192.56, 1328, 'Mirtazapine', 'MERSEYSIDE', 'Other antidepressant drugs', 8)), (('0403030E0', 'MERSEYSIDE'), ('0403030E0', 21, 48.93, 938, 'Fluoxetine Hydrochloride', 'MERSEYSIDE', 'Selective serotonin re-uptake inhibitors', 7)), (('040201030', 'MERSEYSIDE'), ('040201030', 12, 8.41, 252, 'Risperidone', 'MERSEYSIDE', 'Antipsychotic drugs', 7)), (('0408010AE', 'MERSEYSIDE'), ('0408010AE', 21, 539.0, 504, 'Pregabalin', 'MERSEYSIDE', 'Control of epilepsy', 6)), (('0401020

### 3.3 What is the preferred chemical by GP practices for a given category? 
I am interested to spot any overwhelming tendency in recommending a certain product for a condition.

SELECT <br>
<blockquote>pdpi.bnf_code,<br>
    pdpi.items,<br>
    pdpi.act_cost,<br>
    pdpi.quantity,<br>
    chem.name,<br>
    addrs.county,<br>
    bnf.desc,<br>
    max(count(*)) AS NPrescriptions<br></blockquote>
FROM <br>
<blockquote>PDPI pdpi join ADDR addrs ON (pdpi.practice = addrs.code) AND<br>
    pdpi join CHEM chem ON (pdpi.code = chem.code) AND<br>
    pdpi join BNF bnf ON (pdpi.bnf_code = bnf.code)<br></blockquote>
GROUP BY<br>
<blockquote>bnf.desc<br></blockquote>

In [27]:
most_used_chem = (t1_t2_t3
    .values()
    .map(lambda x: x[0:])
    .keyBy(lambda x: x[6])
    .reduceByKey(lambda x,y: (x if x[7] > y[7] else y))
)

print(most_used_chem.take(10))

[('Control of epilepsy', ('0408010AE', 21, 539.0, 504, 'Pregabalin', 'MERSEYSIDE', 'Control of epilepsy', 6)), ('Drugs used for mania and hypomania', ('0402030Q0', 21, 128.98, 476, 'Valproic Acid', 'MERSEYSIDE', 'Drugs used for mania and hypomania', 4)), ('Antihistamines', ('0304010G0', 14, 24.98, 1442, 'Chlorphenamine Maleate', 'CHESHIRE', 'Antihistamines', 8)), ('Non-opioid analgesics and compound preparations', ('0407010F0', 61, 139.81, 3124, 'Co-Codamol (Codeine Phos/Paracetamol)', 'CHESHIRE', 'Non-opioid analgesics and compound preparations', 15)), ('Antibacterials', ('1103010C0', 34, 62.28, 346, 'Chloramphenicol', 'CHESHIRE', 'Antibacterials', 7)), ('Antifungal preparations', ('1310020H0', 15, 19.5, 330, 'Clotrimazole', 'CHESHIRE', 'Antifungal preparations', 3)), ('Antimalarials', ('0504010Y0', 2, 0.81, 9, 'Quinine Sulfate', 'CHESHIRE', 'Antimalarials', 2)), ('Selective serotonin re-uptake inhibitors', ('0403030E0', 21, 48.93, 938, 'Fluoxetine Hydrochloride', 'MERSEYSIDE', 'Selec

### 3.4 Which medication is more used in a given season?
In this part I will attempt to explore the trend for a medication with regard to the time of the year.

SELECT <br>
<blockquote>pdpi.bnf_code,<br>
    sum(pdpi.act_cost),<br>
    season = CASE<br>
        WHEN pdpi.period in (12,1,2)  THEN "Winter"<br>
        WHEN pdpi.period in (3,4,5)   THEN "Spring"<br>
        WHEN pdpi.period in (6,7,8)   THEN "Summer"<br>
        WHEN pdpi.period in (9,10,11) THEN "Autumn"<br>
    END<br>
    chem.name,<br>
    addrs.county,<br>
    bnf.desc,<br>
    max(count(*)) AS NPrescriptions<br></blockquote>
FROM <br>
<blockquote>PDPI pdpi join ADDR addrs ON (pdpi.practice = addrs.code) AND<br>
    pdpi join CHEM chem ON (pdpi.code = chem.code) AND<br>
    pdpi join BNF bnf ON (pdpi.bnf_code = bnf.code)<br></blockquote>
GROUP BY<br>
<blockquote>season<br></blockquote>
ORDER BY<br>
<blockquote>season, bnf.desc DESC<br></blockquote>

In [28]:
def season(date):
    if (len(date) != 6):
        return None

    month = int(date[-2:])
    if (month in [12,1,2]):
        return 'Winter'
    elif (month in [3,4,5]):
        return 'Spring'
    elif (month in [6,7,8]):
        return 'Summer'
    elif (month in [9,10,11]):
        return 'Autumn'
    else:
        return None

In [29]:
chem_per_seas = (t1_t2
    .join(t3)
    .mapValues(lambda x: x[0] + (x[1],1))
    .values()
    .map(lambda x: (x[1], x[3], season(x[5]), x[6], x[8], x[9]))
    .keyBy(lambda x: (x[0], x[2]))
    .reduceByKey(lambda x,y: (x[0], round(x[1] + y[1], 3), x[2], x[3], x[4], x[5] + y[5]))
    .sortBy(lambda x: (x[1][2], x[1][5]), ascending=False)
)

print(chem_per_seas.take(10))

[(('0403040W0', 'Winter'), ('0403040W0', 357.7, 'Winter', 'Venlafaxine', 'Other antidepressant drugs', 14)), (('0402010AB', 'Winter'), ('0402010AB', 446.09, 'Winter', 'Quetiapine', 'Antipsychotic drugs', 14)), (('040201060', 'Winter'), ('040201060', 26.34, 'Winter', 'Olanzapine', 'Antipsychotic drugs', 12)), (('0403040X0', 'Winter'), ('0403040X0', 162.37, 'Winter', 'Mirtazapine', 'Other antidepressant drugs', 11)), (('0401010Z0', 'Winter'), ('0401010Z0', 38.92, 'Winter', 'Zopiclone', 'Hypnotics', 8)), (('0401020K0', 'Winter'), ('0401020K0', 50.35, 'Winter', 'Diazepam', 'Anxiolytics', 8)), (('0402010AD', 'Winter'), ('0402010AD', 20.04, 'Winter', 'Aripiprazole', 'Antipsychotic drugs', 7)), (('0403030E0', 'Winter'), ('0403030E0', 36.95, 'Winter', 'Fluoxetine Hydrochloride', 'Selective serotonin re-uptake inhibitors', 6)), (('0403030Q0', 'Winter'), ('0403030Q0', 51.18, 'Winter', 'Sertraline Hydrochloride', 'Selective serotonin re-uptake inhibitors', 6)), (('0407010H0', 'Winter'), ('0407010

## 4 Predictive analysis
### 4.1 Methods considered
The other objective was also to use the same data to do some inference analysis. The aim is to be able to predict the number of prescriptions for a certain drug in the future. In order to achieve this result some predictive models will be built and their performance assesed.

The result of the first query which yields the RDD t1_t2_t3 has counted the frequency of the prescriptions for a medication in a certain English county. Now I would like to build a model which allows for this quantity to be predicted, if the same features are provided but no label. In order to achieve this result, three different techniques have been explored. The objective is to chose the most performing model for my dataset. The models explored are:<br>

* Linear Regression As the simplest method to build, understand and interpret I will start by evaluating this model. However there is a caveat to consider. My dataset has three categorical fields which could not be evaluated by the Linear regression in its original form. These three fields will be then dummified, thus generating much more variables that there was initially.
* Decision Tree As it is simple to build and interpret and it can handle categorical variables as well. Moreover it does not "pretend" extensive tunning, so it would be good to capture any underlying structure which is nott linear. 
* Random Forests. It benefits from the same characteristics of decision trees -- in fact, it combines many trees together. This method has also hyperparameters that need to be assesed before use.

Below is the data as astarting point. Each partition (line) has been paired with a unique index for testing purposes only. This new field will not affect computation.

In [53]:
df = (t1_t2_t3
    .values()
    .zipWithUniqueId()
    .map(lambda x: ((x[1],) + x[0][1:]))
)

original_data = df.first()
print(original_data)

(0, 64, 580.16, 1029, 'Venlafaxine', 'MERSEYSIDE', 'Other antidepressant drugs', 18)


### 4.2 Handling categorical variables
Categorical data need to be dummified before applying a Linear model upon them. In order to do so, an auxiliary function which stores all the possible categories for each variable has been created. This new data structure is useful to associate a number to each category value for each variable. In our case the positional index for the categorical variables goes from four to seven in the original dataset.

The categorical length for all the variables generated after dummifiyng them will be the sum of all the numbers of categories. For example if we have 153 bnf_codes, 4 counties and 80 types of usage (drug categories) the new dataset will be long 153 + 4 + 80 + 3 (numerical variables) = 240 predictors.

Finally in this occasion I am also spliting the dataset between training and testing sets. The idea is to follow a ratio 80/20 percent. Although, as can be seen from the cell output, the function randomSplit provides an approximation rather than an exact split by following this ratio.

In [55]:
def encode(rdd, idx):
    """
    For each categorical variable encode a category to a number.
    @param rdd: The data rdd
    @param idx: The positional column index
    @return: A map storing the categories for each field.
    """
    result = (rdd
        .map(lambda x: x[idx])
        .distinct()
        .zipWithIndex()
        .collectAsMap()
    )
    return result


mappings = []
for i in range(4, 7):
    mappings.append(encode(df, i)) 


# The total number of all the categories times all the qualitative variables. This will be the total
# number of the variables after dummifiying the categorical ones.
catg_len = sum(map(len, mappings))

train_set, test_set = df.randomSplit([0.8, 0.2], seed=1)

print('TrainSet = {}, TestSet = {}\n'.format(train_set.count(), test_set.count()))

print("The dataset has:")
for i,j in zip(mappings, ['bnf_code', 'county', 'usage']):
    print('{}: {} distinct categories'.format(j, len(i)))

TrainSet = 151, TestSet = 54

The dataset has:
bnf_code: 153 distinct categories
county: 4 distinct categories
usage: 80 distinct categories


### 4.3 Dummification

Below there are two helper functions designed to retrieve the features and the labels respective of each observation. The function get_LRfeatures is relative to linear regression method; as only in this context we need to perform extensive dummification of the categorical variables with OneHotEncoded method. The Decision tree methods will not make use of the same variables. Whereas numerical variables are just converted into the right format, float in this case.

In [56]:
# E.g.: record = [(64, 580.16, 1029, 'Venlafaxine', 'MERSEYSIDE', 'Other Antidepressant Drugs', 18)]
def get_LRfeatures(record, Min, Mid, Max):
    """
    Performs the one-hot-encoding for each categorical variable.
    @param record: One single record from a RDD
    @paramm Min: The predictors are supposed to have contiguous indexes. This is the starting index 
    of categorical variables.
    @param Mid: The first index of numerical variables.
    @param Max: The ending index for the predictors.
    @return: The categorical data have been dummified
    """
    catg_vars = np.zeros(catg_len)
    i = 0
    step = 0
    for col in record[Mid:Max]:
        idx = mappings[i][col]
        catg_vars[idx + step] = 1
        step = step + len(mappings[i])
        i = i + 1
    
    num_vars = np.array([float(i) for i in record[Min:Mid]])
    return np.concatenate((catg_vars, num_vars))

def get_label(record):
    """
    Simply extracts the label (target) field. In our case at position -1.
    @param record: One single record from a RDD
    @return: The label field as a float datatype
    """
    return float(record[-1])

#get_LRfeatures(train_set.take(1), 1, 4, 7)

Below the training data has been converted into LabeledPoint objects as the machine learning algorithms part of the package mllib expect the data to be arranged in this format.

In [57]:
lr_train_data = train_set.map(lambda x: LabeledPoint(get_label(x), get_LRfeatures(x, 1, 4, 7)))

first_point = lr_train_data.first()

print('Label value: {}\n'.format(str(first_point.label)))
print('Dummified data: {}\n'.format(str(first_point.features)))
print('New Length: {}'.format(len(first_point.features)))

Label value: 18.0
Dummified data: [0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,64.0,580.16,1029.

### 4.4 Tuning of the parameters for the Linear Regression
In this section I am going to make a research for the parameters to use into the Linear Regression trainning algorithm. The criteria for this choice would be the minimum value of the mean squared error. The two metrics I am planning to tune are:number of iterations and the step size. The optimal number of iterations will be searched into the set [10, 100, 1000] and the step's size into [0.0001, 0.001, 0.01, 0.1]. Several advise suggests that the step's size has to be a small value. The function below will iterate all the combinations (iteration, step) from the two sets and return those that minimize MSE value.

In [60]:
def find_lr_parameters(data, iterations, steps):
    """
    Iterates through a set of numbers corresponding to iterations and step size and print
    the optimal values.
    @param data: The RDD containing the data of interest.
    @param iterations: A list of possible iterations.
    @param steps: a list of steps sizes
    """
    min_error = 999999
    n_iter = iterations[0]
    n_step = steps[0]
    for i in iterations:
        for j in steps:
            lm = LinearRegressionWithSGD.train(data, iterations=i, step=j, intercept=True)
            lm_pred = data.map(lambda x: (x.label, float(lm.predict(x.features))))
            metrics = RegressionMetrics(lm_pred)
            mse = metrics.meanSquaredError
            if (mse < min_error):
                min_error = mse
                n_iter = i
                n_step = j
    print('Estimating parameters for Linear Regression:\n=====')
    print('MSE = {}, iterations = {}, step = {}'.format(min_error, n_iter, n_step))

iterations = [10, 100, 1000]
steps = [0.0001, 0.001, 0.01, 0.1]
    
find_lr_parameters(lr_train_data, iterations, steps)

Estimating parameters for Linear Regression:
=====
MSE = 999999, iterations = 10, step = 0.0001


### 4.5 Training and application of Linear Regression
The estimated values for iterations and steps return a very high MSE. The reason for such a high value is the presence of NAN values for almost all the combinations of iterations and steps used to train the model. In this case, the suggested value is returned as the only acceptable value rather than the best value. So, the linear model is not expected to produce acceptable results. A wide variety of values have been tried but the situation has not improved.

However, just for completeness, the model has been applied to the training set. As can be evinced from the experiment below, the MSE obtained is indeed very huge, therefore impracticable.

In [61]:
lr_test_data = train_set.map(lambda x: LabeledPoint(get_label(x), get_LRfeatures(x, 1, 4, 7)))
lm = LinearRegressionWithSGD.train(lr_train_data, iterations=10, step=0.0001, intercept=True)

print(lm.weights[:5])

lr_predicted_values = lr_test_data.map(lambda p: (p.label, float(lm.predict(p.features))))
print('Linear Regression predictions: {}\n'.format(str(lr_predicted_values.take(5))))

metrics = RegressionMetrics(lr_predicted_values)
print('TestSet MSE = {}'.format(str(metrics.meanSquaredError)))

[-8.79205334e+34 -7.03542219e+33 -6.62130992e+33 -2.34583918e+33
 -1.30800998e+35]
Linear Regression predictions: [(18.0, -1.921393898375354e+46), (12.0, -6.865284139838406e+45), (8.0, -1.0519019356888455e+46), (8.0, -2.466370032434899e+46), (7.0, -1.739796741441163e+46)]

TestSet MSE = 5.0397229070660793e+95


### 4.6 Preparing features for Decision Trees
In this session the strategy will change as I will be applying a decision tree model. This choice impilies a change in the dataset as the dummification of categorical variables is no longer needed. But, the Decision tree algorithm though expects to receive an numeric code for the various categories instead of their original string values. In order to do this the unique code for a given catogory will be retrieved from the mappings data structure created in 4.2. Mappings resemples a two dimensional array where the first index denotes the field and the second one the category code.

In [62]:
# Important. Categorical variables need to be represented by numbers to avoid 
# ValueError: could not convert string to float: 'abc'. The constructor of LabeledPoint will create numpy arrays
def get_features_trees(record):
    """
    Given an observation in input, the function returns the numeric code for each category.
    @param record: A single observation
    @return: A new observation where the nominal variables are represented by their mapping value
    """
    result = []
    idx = 0
    for i in record[1:7]:
        try:
            result.append(float(i))
        except ValueError:
            result.append(float(mappings[idx][i]))
            idx = idx + 1
    return np.array(result)

tree_train_data = train_set.map(lambda r: LabeledPoint(get_label(r), get_features_trees(r)))
tree_test_data = test_set.map(lambda r: LabeledPoint(get_label(r), get_features_trees(r)))

tree_train_data.take(1)
first_point_dt = tree_train_data.first()

print('Training Decision Tree:\n=====')
print('Decision Tree features: {}'.format(str(first_point_dt.features)))

Training Decision Tree:
=====
Decision Tree features: [64.0,580.16,1029.0,63.0,0.0,63.0]


### 4.7 Training Decision Tree Model
The decision tree model provided with default parameters, except for maxBins (which was a minimum requirement for the problem at hand) is definitely returning a much lower MSE error than the Linear Regression. The change is also refleted in computing performance.

In [63]:
dtree_model = DecisionTree.trainRegressor(tree_train_data, {}, maxBins=54)
preds = dtree_model.predict(tree_train_data.map(lambda p: p.features))
target_trn = tree_train_data.map(lambda p: p.label)
dt_predicted_values = target_trn.zip(preds.map(lambda x: float(x)))

print('Depth: {}'.format(str(dtree_model.depth())))
print('Number of nodes: {}'.format(str(dtree_model.numNodes())))

metrics_dt = RegressionMetrics(dt_predicted_values)
print('MSE = {}'.format(str(metrics_dt.meanSquaredError)))

Depth: 5
Number of nodes: 51
MSE = 0.49781473125843984


### 4.8 Application of Decision Tree Model

In [64]:
preds = dtree_model.predict(tree_test_data.map(lambda p: p.features))
target_tst = tree_test_data.map(lambda p: p.label)
dt_predicted_values = target_tst.zip(preds.map(lambda x: float(x)))

print('Decision Tree predictions: {}'.format(str(dt_predicted_values.take(5))))

metrics_dt = RegressionMetrics(dt_predicted_values)
print('TestSet MSE = {}'.format(str(metrics_dt.meanSquaredError)))

Decision Tree predictions: [(16.0, 9.666666666666666), (6.0, 4.0), (4.0, 5.25), (4.0, 7.0), (1.0, 1.0)]
TestSet MSE = 3.193440241962322


### 4.9 Training Random Forests Model
The last model considered in this study is the Random Forest one. For optimal results this model needs too tune its parameters numTrees and maxDepth. This the function find_rf_parameters is provided to iterate through the parameters and pick the optimal values. Once found the model will be trained with these parameters and the MSE compared with the one achieved by the simple decision tree.

In [66]:
def find_rf_parameters(data, ntrees, depths, target):
    """
    Iterates through a set of numbers corresponding to numTrees and maxDepth and print
    the optimal values.
    @param data: The RDD containing the data of interest.
    @param ntrees: A list of possible iterations.
    @param depths: A list of steps sizes
    @param target: The list containing the labels of the training set. 
    """
    min_error = 99999999
    n_trees = ntrees[0]
    n_depth = depths[0]
    for i in ntrees:
        for j in depths:
            rf_model = RandomForest.trainRegressor(data, categoricalFeaturesInfo={}, numTrees=i,
                                        featureSubsetStrategy="auto", impurity="variance", maxDepth=j, maxBins=54)
            preds2 = rf_model.predict(data.map(lambda x: x.features))
            rf_values = target.zip(preds2.map(lambda x: float(x)))
            metrics_rf = RegressionMetrics(rf_values)
            mse = metrics_rf.meanSquaredError
            if (mse < min_error):
                min_error = mse
                n_trees = i
                n_depth = j
    print('Estimating Parameters for Random Forests:\n=====')
    print('MSE = {}, trees = {}, depth = {}'.format(min_error, n_trees, n_depth))


ntrees = [2, 6, 10, 20]
depths = [3, 6, 10]
    
find_rf_parameters(tree_train_data, ntrees, depths, target_trn)

Estimating Parameters for Random Forests:
=====
MSE = 0.6052934684069543, trees = 20, depth = 10


### 4.10 Application of Random Forest Model
The Random Forest model has been trained and run with the values numTrees=20 and maxDepth=10 the mean squared error has decreased again in the test set by reaching the lowest value.

In [67]:
rf_model = RandomForest.trainRegressor(tree_train_data, categoricalFeaturesInfo={}, numTrees=20, 
        featureSubsetStrategy="auto", impurity="variance", maxDepth=10, maxBins=54)
preds2 = rf_model.predict(tree_test_data.map(lambda x: x.features))
rf_predicted_values = target_tst.zip(preds2.map(lambda x: float(x)))

print('Random Forest predictions: {}'.format(str(rf_predicted_values.take(5))))
print('Number of trees: {}'.format(str(rf_model.numTrees())))
print('Number of nodes: {}'.format(str(rf_model.totalNumNodes())))

metrics_rf = RegressionMetrics(rf_predicted_values)
print('TestSet MSE = {}'.format(metrics_rf.meanSquaredError))

Random Forest predictions: [(16.0, 8.4), (6.0, 7.1), (4.0, 7.9944444444444445), (4.0, 6.644444444444444), (1.0, 1.3)]
Number of trees: 20
Number of nodes: 1790
TestSet MSE = 2.4164909836534063


## 5 Conclusions
The objectives of this study were both explorative and predictive. By relying on  the big data collected and the power of the Spark framework some interesting results came to light.

1) The most used drugs in a certain area. These exploratory results allow to see the most used drugs at a certain area. To notice the formation of clusters as in the example below suggesting the most problematic condition over a year in this geographical area.

('0403040W0', 64, 580.16, 1029, 'Venlafaxine', 'MERSEYSIDE', 'Other Antidepressant Drugs', 18)),<br>
('0402010AB', 95, 337.89, 2093, 'Quetiapine', 'MERSEYSIDE', 'Antipsychotic Drugs', 16)),<br>
('040201060', 25, 27.51, 370, 'Olanzapine', 'MERSEYSIDE', 'Antipsychotic Drugs', 12)),<br>
('0402010AD', 38, 35.7, 567, 'Aripiprazole', 'MERSEYSIDE', 'Antipsychotic Drugs', 8)),<br>
('0403040X0', 99, 192.56, 1328, 'Mirtazapine', 'MERSEYSIDE', 'Other Antidepressant Drugs', 8)),<br>
('0403030E0', 21, 48.93, 938, 'Fluoxetine Hydrochloride', 'MERSEYSIDE', 'Selective Serotonin Re-Uptake Inhibitors', 7)<br/>

2) The second result allows to see tendencies of the most prescribed drug for a given category. For example, by counting the prrescriptions it looks like 'Chlorphenamine Maleate' has been the most reccomended drug among Antihistamines and soon.<br>
('0304010G0', 14, 24.98, 1442, 'Chlorphenamine Maleate', 'CHESHIRE', 'Antihistamines', 8)),<br>

3) In the most prescribed drug for season we have achieved a result similar to point 1, although in this case we care more for the time of the year rather than the geographical area.

The second part of this project was concerned with building a predictive model that, based on the same variables, would inefere the number of prescriptions in a certain area. I examined three different techniques and the results were as follows.

<b>Linear Regression</b> The model built provides a very high MSE so the model was impracticable with its value of MSE = 5.039722907066078e+95 in the test set.

<b>Decision Tree</b> The predictive model returned a MSE of:<br>
0.384 for the training set <br>
4.136 for the test set <br>

<b>Random Forests</b> The predictive model returned a MSE of:<br>
0.653 for the training set <br>
2.410 for the test set <br>

So the Decision Tree was the model to provide the best fit for the training set, but Random Forests was able to generalize better and be more accurate on the test set.

## Appendix 1
### Module for scraping BNF codes.

In [68]:
import requests
import re
import json
from bs4 import BeautifulSoup


def get_soup_object(url):
    # set a user-agent to be sent with request
    headers = {
        "user-agent":"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) \
        Chrome/61.0.3163.100 Safari/537.36"
    }

    print('Retrieving from: {}'.format(url))

    response  = requests.get(url, headers)
    if (response.ok):
        # parse the raw HTML into a 'soup' object
        soupObj = BeautifulSoup(response.text, 'html.parser')

        return soupObj
    else:
        print('There was a problem reading from the URL: {}'.format(url))
        return None


def parse_n_save():
        bsObj = get_soup_object('https://openprescribing.net/bnf/')
        data = bsObj.find_all('a')
        pattern = re.compile('/bnf/[0-9]{2,6}')

        results = []

        for i in data:
                temp_dict = {}
                href = i.attrs['href']
                match = re.search(pattern, href)
                if match:
                        try:
                                code = href.split('/')[2]
                                desc = i.text.split(':')[1].strip()
                                re.sub(r'[^a-zA-Z0-9_]', '', desc)
                                temp_dict['code'] = code
                                temp_dict['desc'] = desc
                                results.append(temp_dict)
                        except Exception as e:
                                print(str(e))

        with open('sections.json', 'w') as output_file:
                json.dump(results, output_file)
                print('Results written to sections.json')
