# Forex Market Prediction

In [1]:
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
import pandas as pd

conf = SparkConf().setAppName("MLlib")
#sc.stop() #if need to 
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")

spark = SparkSession(sc)

Data Pre processing

In [2]:
#Load the training data
train_df = spark.read.format("csv").option("header", "true"). \
            load(r'Train_small.csv')
train_cols = train_df.columns

#Remove Gmt time column from the data and convert the values to float.
train_cols.remove('Gmt time')
train_df = train_df.select(train_cols).rdd.map(lambda x: [float(i) for i in x]).toDF(train_cols)

#Load test data
test_df = spark.read.format("csv").option("header", "true"). \
            load(r'Test_small_feature.csv')
test_cols = test_df.columns

#Remove Gmt time and unnamed index column and convert values in the test dataset to float
test_cols.remove('Gmt time')
test_cols.remove('_c0')
test_df = test_df.select(test_cols).rdd.map(lambda x: [float(i) for i in x]).toDF(test_cols)

Decision Tree Classification with MLlib

In [3]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Create a dense vector of features
stages = []
label_stringIdx = StringIndexer(inputCol = 'up_down', outputCol = 'label')
stages += [label_stringIdx]
assembler = VectorAssembler(inputCols=train_cols, outputCol="features", )
stages += [assembler]

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(train_df)
df = pipelineModel.transform(train_df)
selectedCols = ['label', 'features'] + train_cols
df = df.select(selectedCols)

#Split the data into train and test
train, test = df.randomSplit([0.8, 0.2], seed = 0)

print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

#Using a Decision Tree classifier model to fit the trainig data
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
model_dt = dt.fit(train)

#Predict up_down for test data
predictions = model_dt.transform(test)

#Test accuracy using MulticlassClassificationEvaluator
predictions.select("prediction", "label", "features").show()
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

Training Dataset Count: 17307
Test Dataset Count: 4295
+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       0.0|  0.0|(214,[0,1,2,3,4,5...|
|       0.0|  0.0|(214,[0,1,2,3,4,5...|
|       0.0|  0.0|(214,[0,1,2,3,4,5...|
|       0.0|  0.0|(214,[0,1,2,3,4,5...|
|       0.0|  0.0|(214,[0,1,2,3,4,5...|
|       0.0|  0.0|(214,[0,1,2,3,4,5...|
|       0.0|  0.0|(214,[0,1,2,3,4,5...|
|       0.0|  0.0|(214,[0,1,2,3,4,5...|
|       0.0|  0.0|(214,[0,1,2,3,4,5...|
|       0.0|  0.0|(214,[0,1,2,3,4,5...|
|       0.0|  0.0|(214,[0,1,2,3,4,5...|
|       0.0|  0.0|(214,[0,1,2,3,4,5...|
|       0.0|  0.0|(214,[0,1,2,3,8,9...|
|       0.0|  0.0|(214,[0,1,2,3,8,9...|
|       0.0|  0.0|(214,[0,1,2,3,8,9...|
|       0.0|  0.0|(214,[0,1,2,3,8,9...|
|       0.0|  0.0|(214,[0,1,2,3,8,9...|
|       0.0|  0.0|(214,[0,1,2,3,8,9...|
|       0.0|  0.0|(214,[0,1,2,3,8,9...|
|       0.0|  0.0|(214,[0,1,2,3,8,9...|
+----------+-----+-------

In [4]:
#Precict up_down for test data using Decision Tree Classifier

#Create a dense vector of features 
stages = []
assembler = VectorAssembler(inputCols=test_cols, outputCol="features")
stages += [assembler]

pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(test_df)
test_data = pipelineModel.transform(test_df)
selectedCols = ['features'] + test_cols
test_data = test_data.select(selectedCols)

#Test the model to get the up_down predictions
test_pred_dt = model_dt.transform(test_data)

#Select the prediction column and rename it to uo_down, convert the data frame to pandas
#and copy them into a csv file 
test_pred_dt.select('prediction').withColumnRenamed('prediction', 'up_down').\
    toPandas().to_csv('Test_pred2_dc.csv')

MLP

In [4]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors

df = train_df.rdd.map(lambda r: [r[-1], Vectors.dense(r[:-1])]).\
           toDF(['label','features'])

(trainingData, testData) = df.randomSplit([0.8, 0.2])

#Set layers
layers = [len(train_cols)-1, len(train_cols), 2]

# create the trainer and set its parameters
mlp = MultilayerPerceptronClassifier(maxIter=250, layers=layers, blockSize=128, seed=1234)

# train the model using trainingData
model_mlp = mlp.fit(trainingData)

#precit the up_down for testData
predictions = model_mlp.transform(testData)

#Test accuracy using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Predictions accuracy = %g, Test Error = %g" % (accuracy,(1.0 - accuracy)))

Predictions accuracy = 0.962776, Test Error = 0.0372243


In [5]:
#Precict up_down for test data using MLP Classifier

test_data = test_df.rdd.map(lambda r: [Vectors.dense(r[:])]).toDF(['features'])
test_preds_mlp = model_mlp.transform(test_data)
test_preds_mlp.select('prediction').withColumnRenamed('prediction', 'up_down').toPandas().to_csv('Test_pred2_mlp.csv')

SVM

In [6]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint

#Create a Labeled point dataframe
def parsePoint(line):
    values = [float(x) for x in line]
    return LabeledPoint(values[213], values[0:213])

parsedData = train_df.rdd.map(parsePoint)

#Use SVM with SGD classifier to train the model
model_svm = SVMWithSGD.train(parsedData, iterations=10000)

#Predict the up_down
labelsAndPreds = parsedData.map(lambda p: (p.label, model_svm.predict(p.features)))

#Filter the data in predictions with those that match the test data 
#and calculate the error
trainErr = labelsAndPreds.filter(lambda x: x[0] != x[1]).count() / float(train_df.count())
print("Training Error = " + str(trainErr))

Training Error = 0.37061383205258774


In [7]:
#Precict up_down for test data using SVM with SGD

test_data = test_df.rdd.map(lambda x:[i for i in x])
test_pred_svm = model_svm.predict(test_data)
pred_svm = test_pred_svm.collect()
data = pd.DataFrame({'up_down':pred_svm})
filepath = 'Test_pred2_svm.csv'
data.to_csv(filepath, index=False)