In [6]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import SVMWithSGD
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

def mapLabel(st):
    if st == 'up':
        return 0.0
    elif st == 'do':
        return 1.0
    elif st == 'st':
        return 2.0
    else:
        return 3.0
        

def mapLabels(d):
    label = d['Dir']
    Features = d['Child'][0]['WordVec'].toArray().tolist()
    for vec in map(lambda l:l['WordVec'],d['Child'][1:]):
        Features = Features + vec.toArray().tolist()
    
    return (mapLabel(label),Features)

rawData = sc.pickleFile('./FeatureVector').\
            filter(lambda l:l['Symbol']!='SNP500').\
            map(mapLabels).\
            sample(False,0.01).\
            filter(lambda l:len(l[1])==100*30).\
            map(lambda l:(l[0],Vectors.dense(l[1]))).cache()
            
#sample(False,0.01).\
            



In [7]:
df = sqlContext.createDataFrame(rawData,["label", "features"]).cache()
df.show()
mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[100*30, 1*30, 3], seed=11)
model = mlp.fit(df)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  2.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.02716893302805...|
|  2.0|[0.00893773724359...|
|  2.0|[0.00353481636151...|
|  1.0|[0.02290219965803...|
|  2.0|[0.01039333985355...|
|  2.0|[-0.0131412900936...|
|  0.0|[0.03101800782838...|
|  2.0|[-0.0148460686700...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  2.0|[0.01144807768629...|
|  1.0|[0.05400354475929...|
|  0.0|[0.00413107639440...|
|  0.0|[-0.0334426911193...|
|  2.0|[0.00154690247466...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.00749564381784...|
|  2.0|[-0.0270764063113...|
|  2.0|[-0.0085780138738...|
|  0.0|[0.0,0.0,0.0,0.0,...|
+-----+--------------------+
only showing top 20 rows



In [17]:
#testDf = sqlContext.createDataFrame(rawData.map(lambda l:[l[1]]),["features"]).cache()
#testDf.show()
#model.transform(testDf).show()
test = sc.pickleFile('./FeatureVectorTest/').\
        filter(lambda l:l['Symbol']!='SNP500').\
        cache()

testData = test.\
            map(mapLabels).\
            filter(lambda l:len(l[1])==100*30).\
            map(lambda l:(l[0],Vectors.dense(l[1]))).cache()

In [26]:
testDf = sqlContext.createDataFrame(testData.map(lambda l:[l[1]]),["features"]).cache()
#testDf.show()
model.transform(testDf).rdd.\
    zip(testData).\
    map(lambda (a,b):(a.prediction,b[0])).\
    take(1)

[(0.0, 1.0)]

In [35]:
test.map(lambda l:(map(lambda l:l['Date'],l['Child']),len(l['Child']))).take(1)

[([datetime.datetime(2015, 4, 7, 0, 0),
   datetime.datetime(2015, 4, 6, 0, 0),
   datetime.datetime(2015, 4, 2, 0, 0),
   datetime.datetime(2015, 4, 1, 0, 0),
   datetime.datetime(2015, 3, 31, 0, 0),
   datetime.datetime(2015, 3, 30, 0, 0),
   datetime.datetime(2015, 3, 27, 0, 0),
   datetime.datetime(2015, 3, 26, 0, 0),
   datetime.datetime(2015, 3, 25, 0, 0),
   datetime.datetime(2015, 3, 24, 0, 0),
   datetime.datetime(2015, 3, 23, 0, 0),
   datetime.datetime(2015, 3, 20, 0, 0),
   datetime.datetime(2015, 3, 19, 0, 0),
   datetime.datetime(2015, 3, 18, 0, 0),
   datetime.datetime(2015, 3, 17, 0, 0),
   datetime.datetime(2015, 3, 16, 0, 0),
   datetime.datetime(2015, 3, 13, 0, 0),
   datetime.datetime(2015, 3, 12, 0, 0),
   datetime.datetime(2015, 3, 11, 0, 0),
   datetime.datetime(2015, 3, 10, 0, 0),
   datetime.datetime(2015, 3, 9, 0, 0),
   datetime.datetime(2015, 3, 6, 0, 0),
   datetime.datetime(2015, 3, 5, 0, 0),
   datetime.datetime(2015, 3, 3, 0, 0),
   datetime.datetime(201