In [2]:
# connecting to spark
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext

# set up spark context
conf = SparkConf().setAppName("myApp")
sc = SparkContext(conf=conf)

# create a sparksession object
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Random Forest Regression for Municipal Bond") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
# import data
df = spark.read.format('com.databricks.spark.csv').\
                        options(header='true', \
                        inferschema='true').load("./price_change_data.csv",header=True);
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- tradeid: double (nullable = true)
 |-- cusip: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- price: double (nullable = true)
 |-- yield: double (nullable = true)
 |-- tradedate: double (nullable = true)
 |-- tradetype: string (nullable = true)
 |-- name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- coupon: double (nullable = true)
 |-- maturity: double (nullable = true)
 |-- issuesize: double (nullable = true)
 |-- issuetype: string (nullable = true)
 |-- issuesource: string (nullable = true)
 |-- bidcount: double (nullable = true)
 |-- rtg: integer (nullable = true)
 |-- dprice: double (nullable = true)
 |-- holdtime: double (nullable = true)



add a column indicating whether "dprice" is positive or negative.

In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
price_change = udf(lambda dprice: "increasing" if dprice >=0 else "decreasing", StringType())
df = df.withColumn("price_change", price_change(df.dprice))
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- tradeid: double (nullable = true)
 |-- cusip: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- price: double (nullable = true)
 |-- yield: double (nullable = true)
 |-- tradedate: double (nullable = true)
 |-- tradetype: string (nullable = true)
 |-- name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- coupon: double (nullable = true)
 |-- maturity: double (nullable = true)
 |-- issuesize: double (nullable = true)
 |-- issuetype: string (nullable = true)
 |-- issuesource: string (nullable = true)
 |-- bidcount: double (nullable = true)
 |-- rtg: integer (nullable = true)
 |-- dprice: double (nullable = true)
 |-- holdtime: double (nullable = true)
 |-- price_change: string (nullable = true)



In [5]:
from pyspark.ml.feature import StringIndexer
stringIndexer1 = StringIndexer(inputCol="name", outputCol="name_indexed")
stringIndexer2 = StringIndexer(inputCol="state", outputCol="state_indexed")
stringIndexer3 = StringIndexer(inputCol="issuetype", outputCol="issuetype_indexed")
stringIndexer4 = StringIndexer(inputCol="issuesource", outputCol="issuesource_indexed")

model1 = stringIndexer1.fit(df)
indexed1 = model1.transform(df)

model2 = stringIndexer2.fit(indexed1)
indexed2 = model2.transform(indexed1)

model3 = stringIndexer3.fit(indexed2)
indexed3 = model3.transform(indexed2)

model4 = stringIndexer4.fit(indexed3)
indexed4 = model4.transform(indexed3)

indexed4.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- tradeid: double (nullable = true)
 |-- cusip: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- price: double (nullable = true)
 |-- yield: double (nullable = true)
 |-- tradedate: double (nullable = true)
 |-- tradetype: string (nullable = true)
 |-- name: string (nullable = true)
 |-- state: string (nullable = true)
 |-- coupon: double (nullable = true)
 |-- maturity: double (nullable = true)
 |-- issuesize: double (nullable = true)
 |-- issuetype: string (nullable = true)
 |-- issuesource: string (nullable = true)
 |-- bidcount: double (nullable = true)
 |-- rtg: integer (nullable = true)
 |-- dprice: double (nullable = true)
 |-- holdtime: double (nullable = true)
 |-- price_change: string (nullable = true)
 |-- name_indexed: double (nullable = true)
 |-- state_indexed: double (nullable = true)
 |-- issuetype_indexed: double (nullable = true)
 |-- issuesource_indexed: double (nullable = true)



In [7]:
# transform data into featuresCol and labelCol structure
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
df_temp = indexed4.rdd.map(lambda data: Row(label=data['dprice'],
                                            features=Vectors.dense([
                                            data['amount'],
                                            data['coupon'],
                                            data['maturity'],
                                            data['issuesize'],
                                            data['bidcount'],
                                            data['dtradedate'],
                                            data['holdtime'],
                                            data["name_indexed"],
                                            data["state_indexed"],
                                            data["issuetype_indexed"],
                                            data["issuesource_indexed"]])
                                            )).toDF()
df_temp.show(5)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 37, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1339, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-7-c347698e3c0f>", line 11, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1489, in __getitem__
    raise ValueError(item)
ValueError: dtradedate

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:441)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1339, in takeUpToNumLeft
    yield next(iterator)
  File "<ipython-input-7-c347698e3c0f>", line 11, in <lambda>
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py", line 1489, in __getitem__
    raise ValueError(item)
ValueError: dtradedate

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


In [None]:
# use VectorIndexer to index categorical predictors in featuresCol
from pyspark.ml.feature import VectorIndexer

# automatically identify categorical features, and index them.
# Set maxCategories so features with > 6 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(maxCategories=6, inputCol='features', outputCol='indexed_features').fit(df_temp)

df_indexed = featureIndexer.transform(df_temp)
#df_indexed.take(5)
df_indexed.show(5)

In [None]:
# split the data into training and test sets (30% held out for testing)
(trainingData, testData) = df_indexed.randomSplit([0.7, 0.3])

Why couldn't we change impurity into "gini"???
see this page for reference: http://takwatanabe.me/pyspark/generated/generated/pyspark.ml.regression.RandomForestRegressor.impurity.html#pyspark.ml.regression.RandomForestRegressor.impurity
Looks like spark doesn't support gini...

In [None]:
# train a RandomForest model
# numTrees = 100, mrse = 1.20581
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol="indexed_features",  impurity="variance", maxDepth=20, seed=42)
# Q: couldn't change impurity into "gini"???
# http://takwatanabe.me/pyspark/generated/generated/pyspark.ml.regression.RandomForestRegressor.impurity.html#pyspark.ml.regression.RandomForestRegressor.impurity

In [None]:
# chain indexer and forest in a Pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

In [None]:
# train model, this also runs the indexer
%time model_rf = pipeline.fit(trainingData)

In [None]:
# make predictions
predictions = model_rf.transform(testData)

In [None]:
# select example from predictions to display
predictions.select("prediction", "label", "features").show(5)

In [None]:
# select (prediction, true label) and compute test error
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(
            labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
rfModel = model_rf.stages[1]
print(rfModel)

In [None]:
#a = rfModel.toDebugString
#print a

In [None]:
predictions.select("prediction", "label").show()

In [None]:
predictions.show(5)

In [None]:
predictions.toPandas().to_csv('prediction.csv')

In [None]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
pred_correct = udf(lambda label, prediction: "correct" if label*prediction >=0 else "wrong", StringType())

In [None]:
pred = predictions.withColumn("correct", pred_correct(predictions.label, predictions.prediction))

In [None]:
pred.show(5)

In [None]:
pred.groupBy("correct").count().show()