In [None]:
#Use PySpark SQL
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import pyspark.sql.functions as fn

#other modules
import re
import os
import numpy as np
#Saving/reading data to/from disk
import dill
#python dataframe module
import pandas as pd
#Plotting
from matplotlib import pyplot as plt
#Geo-data plotting
import geojson
import folium

In [None]:
#Set up Spark
#Create a SparkContext
#sc.stop()
sc = SparkContext("local[*]", "pyspark_df")

#Create a SQLContext
sqlContext = SQLContext(sc)


In [None]:
#Functions
def localpath(path):
    return 'file://' + os.path.join(os.path.abspath(os.path.curdir), path)

In [None]:
#Read 18-month records during 201702-201807 using Spark
df_grouped = []
for n in [201702,201703,201704,201705,201706,201707,201708,201709,201710,201711,201712,201801,201802,201803,201804,201805,201806,201807]:
    records = sc.textFile(localpath('data/PDPI/'+str(n)+'/'))
    df_rec = records.map(lambda line: line.split(','))\
                    .filter(lambda u: u[1] != 'PCT')\
                    .map(lambda u: (u[2],float(u[6]),int(u[9]))).toDF()
    df_rec = df_rec.withColumnRenamed("_1", "Practice_Code")\
                   .withColumnRenamed("_2", "NIC")\
                   .withColumnRenamed("_3", "Date")
    #print(df_rec.head())
    if df_grouped == []:
        df_grouped = df_rec.groupBy(['Practice_Code','Date'])\
                           .agg(fn.count('NIC').alias('Count'),fn.sum('NIC').alias('NIC_sum'))
    else:
        df_grouped = df_grouped.union(\
                                      df_rec.groupBy(['Practice_Code','Date'])\
                                     .agg(fn.count('NIC').alias('Count'),fn.sum('NIC').alias('NIC_sum'))\
                                     )

#dill is an advanced version of pickle. Allows easy writing/reading data to/from disk.
#Except for gathering info from the records, all dataframe manipulations were done using Pandas
dill.dump(df_grouped.toPandas(), open('grouped_records_cost.pkd', 'wb'))

In [None]:
#Filter out the dummy practices
df_grouped = dill.load(open('grouped_records_cost.pkd', 'rb'))
df_grouped.loc[df_grouped['Practice_Code'].apply(lambda s: 'Y' not in s)]
dill.dump(df_grouped,open('grouped_records_cost.pkd', 'wb'))

In [None]:
#Compute the statistics with a GroupBy.
df_grouped = dill.load(open('grouped_records_cost.pkd', 'rb'))
df_stats_all = df_grouped.groupby('Practice_Code')['Count','NIC_sum']\
                .agg([np.median,np.var]).dropna()
#Rename the columns
df_stats_all.columns = [' '.join(col).strip() for col in df_stats_all.columns.values]
#df_stats_all

In [None]:
practices = sc.textFile(localpath('data/ADDR/'))

df_prac_age = practices.map(lambda line: line.split(','))\
              .map(lambda u: (u[0].strip(),u[1].strip()))\
              .filter(lambda v: 'Y' not in v[1])\
              .toDF()\
              .withColumnRenamed('_1','Date').withColumnRenamed('_2','Practice_Code')
df_prac_age = df_prac_age.groupBy('Practice_Code').count()\
                           .withColumnRenamed('count','Months')

dill.dump(df_prac_age.toPandas(), open('df_prac_age.pkd', 'wb'))

K-means. Filter the data first.

In [None]:
#Make sure all entry has 12-month worth of data
df_prac_age = dill.load(open('df_prac_age.pkd', 'rb'))

filter2 = pd.DataFrame()
filter2['Months'] = df_grouped.groupby('Practice_Code')['Date'].count()
filter2 = filter2.loc[filter2.Months ==18]

#Join all filters
df = df_stats_all.join(filter2,how='inner')
df = df.drop(columns=['Months'])
filtered_data = df.join(df_prac_age.set_index('Practice_Code'),how='inner').drop(columns=['Months'])

Perform clustering analysis using scikit-learn's k-means class

In [None]:
from sklearn.cluster import KMeans

#Somewhat arbitary cut
sample = filtered_data.loc[filtered_data['Count median'] >300]
sample = sample.loc[sample['Count var'] < 6000].sort_values('Count var')
y = sample['Count var'].tolist()

estimator = KMeans(n_clusters=5).fit(np.asarray(y).reshape(-1, 1))
print(estimator.cluster_centers_)

The labels generated are not ordered, so relabel them. Plot the histograms with labels.

In [None]:
keys = sample.index
dict_label = {}
for i in range(len(keys)):
    dict_label[keys[i]] = int(estimator.labels_[i])

df_labels = pd.DataFrame.from_dict(dict_label,orient='index',columns=['Label'])
kmeans = sample.join(df_labels)

#A dictionary for which kmeans label correspond to how many GPs
dict_kcenter = {0:2,1:4,2:1,3:5,4:3}
kmeans['nGP'] = kmeans['Label'].apply(lambda n: dict_kcenter[n])

dill.dump(kmeans, open('df_kmeans.pkd', 'wb'))

Work on the geo-location data. Plot the data to postcode areas using folium. The generated html files are interactive maps.

In [None]:
#Read address book of csv's and generate a table for geolocation of practices
#I decided to use the Postcode areas for cleaner information

def find_areacode(string):
    s = ''
    for x in string:
        if x.isalpha():
            s = s+x
        else:
            break
    return s

#Create an RDD for the addr files
practices = sc.textFile(localpath('data/ADDR/'))

#Read lines, extract data and convert to Spark SQL dataframe
df_geo = practices.map(lambda line: line.split(','))\
              .map(lambda u: (int(u[0]),u[1].strip(),find_areacode(u[7])))\
              .distinct().toDF()
df_geo = df_geo.withColumnRenamed("_1", "Date")\
               .withColumnRenamed("_2", "Practice_Code").withColumnRenamed("_3", "Postcode_Area")

#In case of a practice with more than one location, keep the latest one.
lastentry_udf_str = udf(lambda u: u[-1], StringType())#Define the udf
df_geo = df_geo.orderBy(['Date','Practice_Code'])\
               .groupBy('Practice_Code').agg(fn.collect_list('Postcode_Area').alias('list'))
df_geo = df_geo.withColumn('Postcode_Area',lastentry_udf_str(df_geo.list))\
               .select('Practice_Code','Postcode_Area')

#Save to disk
dill.dump(df_geo.toPandas(), open('df_geo.pkd', 'wb'))
#df_geo = dill.load(open('df_geo.pkd', 'rb'))