In [1]:
import pandas as pd
import numpy as np
from sklearn.cross_validation import train_test_split
from sklearn.neighbors import KNeighborsRegressor
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import GradientBoostingRegressor



In [3]:
sc = SparkContext.getOrCreate()

In [None]:
###############
#  Question 1 #
###############

In [6]:
# Question 1 partition data
# load option data name as 'option'
option = pd.read_csv('Option_Data_2000.csv')
option.head()
# train80% test20%
x_train,x_test,y_train,y_test = train_test_split(option.drop('Implied Volatility',axis = 1),
                                                 option['Implied Volatility'],test_size = 0.2,random_state=42)

In [None]:
###############
#  Question 2 #
###############

In [None]:
###################
#   sklearn KNN   #
###################

In [9]:
# Models under sklearn
# build k-nn classifier(clf) model
# prevent valueError : unknown label error
clf = KNeighborsRegressor()
clf.fit(x_train,y_train)
# save predictions to KNNpreds
KNNpreds = clf.predict(x_test)
# convert KNNpreds to series data
KNNpreds = pd.DataFrame(KNNpreds)

# add actual IV, implied volatility to GBpreds
# to form a dataframe with column one:IV column two:predict IV
y_test.reset_index(drop=True, inplace=True)
KNNcompare = pd.concat([KNNpreds,y_test],axis = 1)
# To array and convert to RDD
KNNRDD = KNNcompare.as_matrix()
KNNRDD = sc.parallelize(KNNRDD)

# x[0]:actual IV, x[1] predict IV
KNNError = KNNRDD.map(lambda x:abs(x[0]-x[1]))
# get x_test index as RDD
indexRDD = sc.parallelize(x_test.index)
# combine indexRDD with ErrorRDD
indexedKNNError = indexRDD.zip(KNNError)

In [17]:
# get samples of top 10% error
reversedKNNError = indexedKNNError.takeOrdered(int(0.1*len(x_test)))
reversedKNNError

[(23, 0.051800000000000013),
 (29, 0.20848),
 (30, 0.10687999999999998),
 (32, 0.090700000000000003),
 (44, 0.083320000000000005),
 (45, 0.015079999999999982),
 (49, 0.080319999999999947),
 (56, 0.20100000000000001),
 (59, 0.14094000000000007),
 (63, 0.082519999999999982),
 (65, 0.055739999999999956),
 (67, 0.11635999999999999),
 (69, 0.0043600000000000305),
 (70, 0.014479999999999993),
 (73, 0.10450000000000001),
 (76, 0.034799999999999998),
 (78, 0.033599999999999963),
 (99, 0.089439999999999992),
 (100, 0.086499999999999994),
 (109, 0.030879999999999963),
 (111, 0.04226000000000002),
 (115, 0.050320000000000004),
 (120, 0.038739999999999997),
 (123, 0.34243999999999991),
 (124, 0.38330000000000003),
 (128, 0.015039999999999998),
 (135, 0.00084000000000000741),
 (162, 0.067840000000000011),
 (163, 0.088940000000000019),
 (168, 0.043240000000000056),
 (173, 0.023499999999999993),
 (175, 0.029420000000000002),
 (185, 0.045940000000000036),
 (188, 0.030100000000000016),
 (194, 0.0069400

In [19]:
# get samples of bottom 10% error
sortedKNNError = indexedKNNError.sortBy(lambda x:x[1])
sortedKNNError.take(int(0.1*len(x_test)))

[(1745, 0.00013999999999991797),
 (1100, 0.00017999999999998573),
 (792, 0.00023999999999996247),
 (1766, 0.00025999999999992696),
 (712, 0.00029999999999996696),
 (1987, 0.00061999999999995392),
 (1550, 0.00068000000000001393),
 (135, 0.00084000000000000741),
 (1190, 0.0008799999999999919),
 (1640, 0.0010999999999999899),
 (1487, 0.0011000000000000454),
 (433, 0.0011400000000000299),
 (1541, 0.0012599999999999278),
 (630, 0.0013000000000000234),
 (1897, 0.0014799999999999813),
 (610, 0.0017000000000000071),
 (1659, 0.0018400000000000083),
 (1767, 0.0019000000000000128),
 (1741, 0.0019799999999998708),
 (1310, 0.0022199999999999998),
 (411, 0.002260000000000012),
 (824, 0.0023400000000000087),
 (1164, 0.0023799999999999932),
 (787, 0.0024000000000000132),
 (1464, 0.0027400000000000202),
 (203, 0.0030000000000000027),
 (1782, 0.0031200000000000117),
 (1813, 0.0031200000000001227),
 (212, 0.0031800000000000161),
 (730, 0.0032000000000000084),
 (1225, 0.0034200000000000064),
 (1233, 0.003

In [None]:
###################
#   sklearn GB    #
###################

In [20]:
# build GB model
params = {'n_estimators':1000,'max_depth':6,'min_samples_split':2,'learning_rate':0.01,'loss':'ls'}
gb = GradientBoostingRegressor(**params)
gb.fit(x_train,y_train)
# save predictions to GBpreds
GBpreds = gb.predict(x_test)

# convert GBpreds to series data
GBpreds = pd.DataFrame(GBpreds)
# add actual IV, implied volatility to GBpreds
# to form a dataframe with column one:IV column two:predict IV
y_test.reset_index(drop=True, inplace=True)
GBcompare = pd.concat([GBpreds,y_test],axis = 1)
# To array and convert to RDD
GBRDD = GBcompare.as_matrix()
GBRDD = sc.parallelize(GBRDD)

# x[0]:actual IV, x[1] predict IV
GBError = GBRDD.map(lambda x:abs(x[0]-x[1]))
# get x_test index as RDD
indexRDD = sc.parallelize(x_test.index)
# combine indexRDD with ErrorRDD
indexedGBError = indexRDD.zip(GBError)

In [24]:
# get samples of top 10% error
reversedGBError = indexedGBError.takeOrdered(int(0.1*len(x_test)))
reversedGBError

[(23, 0.063195153324744457),
 (29, 0.013855456163301527),
 (30, 0.045339279041976321),
 (32, 0.049258871675226523),
 (44, 0.079317050728843097),
 (45, 0.047240137812337657),
 (49, 0.041338561700472981),
 (56, 0.043022774051756918),
 (59, 0.12680052655083957),
 (63, 0.12397819818334893),
 (65, 0.003628423917705681),
 (67, 0.076498201045796049),
 (69, 0.031813598988998826),
 (70, 0.014660572443932707),
 (73, 0.15044956343513846),
 (76, 0.019225190949456766),
 (78, 0.076832890144398536),
 (99, 0.004785646704587615),
 (100, 0.0071576513425320454),
 (109, 0.041490808097703058),
 (111, 0.0080941459162026108),
 (115, 0.01501204930131339),
 (120, 0.011371412830498612),
 (123, 0.023481003756251562),
 (124, 0.065243867979530995),
 (128, 0.013513840497212548),
 (135, 0.043891923600209881),
 (162, 0.022032403635743436),
 (163, 0.039683358279559711),
 (168, 0.023303886004079011),
 (173, 0.0075163490563118496),
 (175, 0.0043491759871339408),
 (185, 0.0022348939888435226),
 (188, 0.034705286651028955

In [25]:
# get samples of bottom 10% error
sortedGBError = indexedGBError.sortBy(lambda x:x[1])
sortedGBError.take(int(0.1*len(x_test)))

[(1754, 8.9803157614365414e-05),
 (1510, 9.9706183353420741e-05),
 (1233, 0.00021606157011888616),
 (993, 0.00041412438869675716),
 (1100, 0.00063626623256388126),
 (1245, 0.00067034201368390556),
 (307, 0.00068477237622988074),
 (1794, 0.00076118638304650821),
 (620, 0.00084116152344620998),
 (1756, 0.00091019684238563547),
 (534, 0.0010129386418914199),
 (888, 0.0011644412273803528),
 (1621, 0.0011906543387777968),
 (1242, 0.0012336416790506566),
 (1103, 0.0014711156835513217),
 (1563, 0.0014959494995072653),
 (572, 0.001675338209990801),
 (1911, 0.0019639491446526436),
 (788, 0.0019956505435512195),
 (535, 0.0020773115455924029),
 (1116, 0.0020798601873592837),
 (185, 0.0022348939888435226),
 (1818, 0.0024277785186761491),
 (591, 0.0024546559079760555),
 (422, 0.0026571332398348524),
 (1562, 0.0026909332683838771),
 (1105, 0.0034611106036376849),
 (65, 0.003628423917705681),
 (305, 0.0036845815328005738),
 (855, 0.0036874881779368207),
 (1173, 0.0037368284975521637),
 (905, 0.003911

In [26]:
# compare MSE of gradientboosting and KNN
GBMSE = GBRDD.map(lambda v: (v[0] - v[1])**2).sum() / float(y_test.count())
KNNMSE = KNNRDD.map(lambda v: (v[0] - v[1])**2).sum() / float(y_test.count())
GBMSE

0.0091367752425267172

In [27]:
KNNMSE

0.012650478147000002

In [None]:
###############
#  Question 4 #
###############

In [117]:
# SVM and GB under Spark
# train80% test20%
trainData, testData = train_test_split(option,test_size=0.2,random_state=42)
train = trainData.as_matrix()
test = testData.as_matrix()
def parsePoint(line):
    return LabeledPoint(line[7],line[0:7])
# create RDD
trainRDD = sc.parallelize(train)
testRDD = sc.parallelize(test)
trainLP = trainRDD.map(parsePoint)
testLP = testRDD.map(parsePoint)

In [122]:
# build GB model
GBmodel = GradientBoostedTrees.trainRegressor(trainLP,
                                            categoricalFeaturesInfo={5:2}, numIterations=3)
predictions = GBmodel.predict(testLP.map(lambda x: x.features))
sparkGBError = testLP.map(lambda lp: lp.label).zip(predictions)
# compute MSE
testMSE = sparkGBError.map(lambda v: (v[0] - v[1])**2).sum() / float(testLP.count())

In [124]:
testMSE

0.019821288888369395

In [111]:
# build SVM model
from pyspark.mllib.classification import SVMWithSGD, SVMModel

In [115]:
clfSVM = SVMWithSGD.train(trainLP, iteration=100)
svmLabelAndPreds = testLP.map(lambda p: (p.label, clfSVM.predict(p.features)))

Py4JJavaError: An error occurred while calling o404.trainSVMModelWithSGD.
: org.apache.spark.SparkException: Input validation failed.
	at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:256)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainRegressionModel(PythonMLLibAPI.scala:92)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainSVMModelWithSGD(PythonMLLibAPI.scala:248)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Unknown Source)


In [116]:
trainLP.take(5)

[LabeledPoint(0.4453, [111.26,121.0,7.0,0.0026,0.05,0.0,0.161134794]),
 LabeledPoint(0.2832, [19.0,19.0,35.0,0.0026,0.65,1.0,0.245222124]),
 LabeledPoint(0.3192, [44.03,35.0,553.0,0.0026,2.0,1.0,0.236108918]),
 LabeledPoint(0.2766, [44.03,43.0,63.0,0.0026,4.4,0.0,0.236108918]),
 LabeledPoint(0.8867, [7.17,11.0,35.0,0.0026,0.25,0.0,0.706962799])]