In [15]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
spark = SparkSession.builder.appName("SPARK_ML").getOrCreate()

In [3]:
DATA_PATH = "/Users/Michavillson/Documents/PROJECTS/DS340W-Group10-FA19/sources/ML_model/output/ml_data.csv"

In [4]:
df = spark.read.format("com.databricks.spark.csv").options(header="true", inferschema="true").load(DATA_PATH)
df.show(5, True)
# df.printSchema()
# df.describe().show()

+-----------+-------------------+----+----+-----+-------+
|        avo|                cur|cell|mark|ideas|  valid|
+-----------+-------------------+----+----+-----+-------+
| 0.24699525|0.18017438579760167| C46| M03|    0|0.22644|
|0.248117075|0.18017438579760167| C46| M03|   92|0.22644|
| 0.24817445|0.18017438579760167| C46| M03|    0|0.22644|
|0.247927175|0.18017438579760167| C46| M03|    0|0.22644|
|0.248311925|0.18017438579760167| C46| M03|    0|0.22644|
+-----------+-------------------+----+----+-----+-------+
only showing top 5 rows



In [5]:
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [6]:
def get_dummy(df,categoricalCols,continuousCols,labelCol): 
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col
    indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) for c in categoricalCols ]
    # default setting: dropLast=True
    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers ]
    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + continuousCols, outputCol="features")
    pipeline = Pipeline(stages=indexers + encoders + [assembler])
    model=pipeline.fit(df)
    data = model.transform(df)
    data = data.withColumn('label',col(labelCol))
    return data.select('features','label')

In [7]:
transformed_dummy = get_dummy(df, ["cell", "mark", "ideas"], ["avo", "cur"], "valid")
transformed_dummy.show(1)

# transformed_df = transData(df)
# transformed_df.count()

+--------------------+-------+
|            features|  label|
+--------------------+-------+
|(110,[1,11,108,10...|0.22644|
+--------------------+-------+
only showing top 1 row



In [16]:
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=100).fit(transformed_dummy)
data = featureIndexer.transform(transformed_dummy)
data.show(5, True)

+--------------------+-------+--------------------+
|            features|  label|     indexedFeatures|
+--------------------+-------+--------------------+
|(110,[1,11,108,10...|0.22644|(110,[1,11,108,10...|
|(110,[1,104,108,1...|0.22644|(110,[1,104,108,1...|
|(110,[1,11,108,10...|0.22644|(110,[1,11,108,10...|
|(110,[1,11,108,10...|0.22644|(110,[1,11,108,10...|
|(110,[1,11,108,10...|0.22644|(110,[1,11,108,10...|
+--------------------+-------+--------------------+
only showing top 5 rows



In [17]:
(trainingData, testData) = data.randomSplit([0.6, 0.4])
print(trainingData.count())
print(testData.count())

728568
485892


In [14]:
rf = GBTRegressor()

In [18]:
pipeline = Pipeline(stages=[featureIndexer, rf])
model = pipeline.fit(trainingData)

In [19]:
predictions = model.transform(testData)

predictions.select("features", "label", "prediction").show(5)

+--------------------+------------+--------------------+
|            features|       label|          prediction|
+--------------------+------------+--------------------+
|(110,[0,10,11,108...|         0.0| 0.07849030586887516|
|(110,[0,10,11,108...|         0.0| 0.07849030586887516|
|(110,[0,10,11,108...|         0.0| 0.07849030586887516|
|(110,[0,10,11,108...|         0.0| 0.07849030586887516|
|(110,[0,10,11,108...|0.0174466129|0.013794805312973921|
+--------------------+------------+--------------------+
only showing top 5 rows



In [21]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
print(mse)

0.7679807436337086
