In [98]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, split, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import DateType, StringType
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
from pyspark.ml.linalg import Vectors, DenseVector, VectorUDT
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [2]:
from datetime import datetime

In [3]:
sc = SparkContext()

In [4]:
sqlc = SparkSession(sc)

In [5]:
reading = sqlc.read.csv('Admissions 2015-16.csv', header=True)

In [59]:
reading.show(5)

+------------+--------+---------------+-----------+-------------+--------------+--------+------------+
|Removal Date|Hospital|      Specialty|  Procedure|       Doctor|Patient Number|Priority|Waiting Days|
+------------+--------+---------------+-----------+-------------+--------------+--------+------------+
|  03/08/2015|     Mel|General Surgery|Lap Banding|Joseph Miller|     111365825|       C|        3396|
|  06/07/2015|     Mel|General Surgery|Lap Banding|Joseph Miller|     109970143|       C|        3356|
|  03/08/2015|     Mel|General Surgery|Lap Banding|Joseph Miller|     106770523|       C|        3244|
|  19/08/2015|     Mel|General Surgery|Lap Banding|Joseph Miller|     111176864|       C|        3229|
|  09/08/2015|     Mel|General Surgery|Lap Banding|Joseph Miller|     107085813|       C|        3190|
+------------+--------+---------------+-----------+-------------+--------------+--------+------------+
only showing top 5 rows



In [6]:
change_to_month_func = udf(lambda record: datetime.strftime(datetime.strptime(record, '%d/%m/%Y'), '%m-%Y'), StringType())

In [15]:
reading_mod = reading.withColumn('Date', change_to_month_func(col('Removal Date'))).drop('Removal Date'
                                        ).withColumnRenamed('count(Removal Date)', 'patients_removed')

In [16]:
grouped = reading_mod.groupby('Date').agg({'Date': 'count'})

In [17]:
grouped.orderBy('Date').show(100)

+-------+-----------+
|   Date|count(Date)|
+-------+-----------+
|01-2016|       1094|
|02-2016|       1443|
|03-2016|       1364|
|04-2016|       1808|
|05-2016|       2200|
|06-2015|         86|
|06-2016|       2006|
|07-2015|       1217|
|08-2015|       1230|
|09-2015|       1305|
|10-2015|       1191|
|11-2015|       1384|
|12-2015|       1143|
+-------+-----------+



In [18]:
change_to_date_func = udf(lambda record: datetime.strptime(record, '%m-%Y'), DateType())

In [19]:
grouped_with_date = grouped.withColumn('Date', change_to_date_func(col('Date')))

In [20]:
window_row = Window().orderBy('Date')

In [21]:
grouped_new = grouped_with_date.withColumn('id', row_number().over(window_row))

In [22]:
grouped_new.show(100)

+----------+-----------+---+
|      Date|count(Date)| id|
+----------+-----------+---+
|2015-06-01|         86|  1|
|2015-07-01|       1217|  2|
|2015-08-01|       1230|  3|
|2015-09-01|       1305|  4|
|2015-10-01|       1191|  5|
|2015-11-01|       1384|  6|
|2015-12-01|       1143|  7|
|2016-01-01|       1094|  8|
|2016-02-01|       1443|  9|
|2016-03-01|       1364| 10|
|2016-04-01|       1808| 11|
|2016-05-01|       2200| 12|
|2016-06-01|       2006| 13|
+----------+-----------+---+



In [23]:
to_vector = udf(lambda record: Vectors.dense(record), VectorUDT())

In [24]:
grouped_new_1 = grouped_new.withColumn('id', to_vector(col('id')))

In [25]:
testing_df = grouped_new_1.where(col('Date') > datetime(2016,4,2))

In [26]:
training_df = grouped_new_1.filter(col('Date') < datetime(2016,4,2))

In [106]:
grouped_new_1.show(100)

+----------+-----------+------+
|      Date|count(Date)|    id|
+----------+-----------+------+
|2015-06-01|         86| [1.0]|
|2015-07-01|       1217| [2.0]|
|2015-08-01|       1230| [3.0]|
|2015-09-01|       1305| [4.0]|
|2015-10-01|       1191| [5.0]|
|2015-11-01|       1384| [6.0]|
|2015-12-01|       1143| [7.0]|
|2016-01-01|       1094| [8.0]|
|2016-02-01|       1443| [9.0]|
|2016-03-01|       1364|[10.0]|
|2016-04-01|       1808|[11.0]|
|2016-05-01|       2200|[12.0]|
|2016-06-01|       2006|[13.0]|
+----------+-----------+------+



In [68]:
lr = LinearRegression(maxIter=50, regParam=0.0, solver="normal", labelCol="count(Date)", featuresCol="id")

In [69]:
model_lr = lr.fit(training_df.select('count(Date)', 'id'))

In [70]:
model_lr.coefficients

DenseVector([85.1545])

In [71]:
transformed_lr = model_lr.transform(testing_df)

In [72]:
transformed_lr.show(10)

+----------+-----------+------+------------------+
|      Date|count(Date)|    id|        prediction|
+----------+-----------+------+------------------+
|2016-05-01|       2200|[12.0]|1716.8363636363633|
|2016-06-01|       2006|[13.0]|1801.9909090909089|
+----------+-----------+------+------------------+



In [75]:
dt = DecisionTreeRegressor(maxDepth=3, labelCol="count(Date)", featuresCol="id")

In [76]:
model_dt = dt.fit(training_df.select('count(Date)', 'id'))

In [77]:
transformed_dt = model_dt.transform(testing_df)

In [78]:
transformed_dt.show(10)

+----------+-----------+------+----------+
|      Date|count(Date)|    id|prediction|
+----------+-----------+------+----------+
|2016-05-01|       2200|[12.0]|    1808.0|
|2016-06-01|       2006|[13.0]|    1808.0|
+----------+-----------+------+----------+



In [41]:
grouped_doctor = reading_mod.groupby('Date', 'Doctor').agg({'Doctor': 'count'})

In [42]:
grouped_doctor.show(10)

+-------+-----------------+-------------+
|   Date|           Doctor|count(Doctor)|
+-------+-----------------+-------------+
|02-2016|    Piers Stewart|            6|
|01-2016|Katherine McGrath|           16|
|09-2015|         Ian Rees|            1|
|03-2016|      Leah Wilson|           11|
|04-2016|      Leah Wilson|           14|
|08-2015|    Piers Stewart|            6|
|01-2016|   Melanie Glover|            2|
|04-2016|  Andrew Johnston|            7|
|09-2015|      Sean Fisher|            9|
|02-2016|      Carl Gibson|            1|
+-------+-----------------+-------------+
only showing top 10 rows



In [43]:
grouped_doctor_with_date = grouped_doctor.withColumn('Date', change_to_date_func(col('Date')))

In [53]:
window_row_doctor = Window().partitionBy('Doctor').orderBy('Date')

In [54]:
grouped_doctor_with_date_new = grouped_doctor_with_date.withColumn('id', row_number().over(window_row_doctor))

In [80]:
grouped_doctor_with_date_new.show(30)

+----------+-----------------+-------------+---+
|      Date|           Doctor|count(Doctor)| id|
+----------+-----------------+-------------+---+
|2015-06-01|       Sam Slater|            1|  1|
|2015-07-01|       Sam Slater|            1|  2|
|2015-08-01|       Sam Slater|            1|  3|
|2015-09-01|       Sam Slater|            7|  4|
|2015-10-01|       Sam Slater|            2|  5|
|2015-11-01|       Sam Slater|            5|  6|
|2015-12-01|       Sam Slater|            2|  7|
|2016-01-01|       Sam Slater|            1|  8|
|2016-02-01|       Sam Slater|            2|  9|
|2016-03-01|       Sam Slater|            1| 10|
|2016-04-01|       Sam Slater|            3| 11|
|2016-05-01|       Sam Slater|            4| 12|
|2016-06-01|       Sam Slater|            4| 13|
|2015-06-01|Alexander Skinner|            7|  1|
|2015-07-01|Alexander Skinner|           22|  2|
|2015-08-01|Alexander Skinner|           20|  3|
|2015-09-01|Alexander Skinner|           11|  4|
|2015-10-01|Alexande

In [59]:
to_vectors = udf(lambda col_a, col_b: Vectors.sparse(col_a, col_b))

In [85]:
strindexer = StringIndexer(inputCol="Doctor", outputCol="Doctor_idx")
model_strindexer = strindexer.fit(grouped_doctor_with_date_new)
grouped_doctor_with_date_new_indexed = model_strindexer.transform(grouped_doctor_with_date_new)

In [93]:
encoder = OneHotEncoder(inputCol="Doctor_idx", outputCol="Doctor_coded")
grouped_doctor_with_date_new_encoded = encoder.transform(grouped_doctor_with_date_new_indexed)

In [94]:
grouped_doctor_with_date_new_encoded.show(2)

+----------+----------+-------------+---+----------+---------------+
|      Date|    Doctor|count(Doctor)| id|Doctor_idx|   Doctor_coded|
+----------+----------+-------------+---+----------+---------------+
|2015-06-01|Sam Slater|            1|  1|       9.0|(177,[9],[1.0])|
|2015-07-01|Sam Slater|            1|  2|       9.0|(177,[9],[1.0])|
+----------+----------+-------------+---+----------+---------------+
only showing top 2 rows



In [100]:
assembler = VectorAssembler(inputCols=["id", "Doctor_idx"], outputCol=["features"])
grouped_doctor_with_date_new_1 = assembler.transform(grouped_doctor_with_date_new_encoded)

TypeError: Invalid param value given for param "outputCol". Could not convert <type 'list'> to string type

In [97]:
grouped_doctor_with_date_new_1.show(10)

Py4JJavaError: An error occurred while calling o735.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 231.0 failed 1 times, most recent failure: Lost task 1.0 in stage 231.0 (TID 10859, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<ipython-input-59-9bf27d747674>", line 1, in <lambda>
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/ml/linalg/__init__.py", line 759, in sparse
    return SparseVector(size, *args)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/ml/linalg/__init__.py", line 489, in __init__
    pairs = sorted(pairs)
TypeError: 'float' object is not iterable

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
	at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
	at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
	at sun.reflect.GeneratedMethodAccessor117.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<ipython-input-59-9bf27d747674>", line 1, in <lambda>
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/ml/linalg/__init__.py", line 759, in sparse
    return SparseVector(size, *args)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/ml/linalg/__init__.py", line 489, in __init__
    pairs = sorted(pairs)
TypeError: 'float' object is not iterable

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:124)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:68)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:785)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more


In [101]:
training_df = grouped_doctor_with_date_new_1.filter(col('Date') < datetime(2016,4,2))

In [102]:
testing_df = grouped_doctor_with_date_new_1.where(col('Date') > datetime(2016,4,2))

In [103]:
dt = DecisionTreeRegressor(maxDepth=3, labelCol="count(Date)", featuresCol=["id", "Doctor_idx"])

TypeError: Invalid param value given for param "featuresCol". Could not convert <type 'list'> to string type

In [None]:
model_dt = dt.fit(training_df.select('count(Date)', 'id'))