In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TaylorTree").getOrCreate()

data = spark.read.csv("taylor.csv", header=True, inferSchema=True, sep=";")

In [273]:
from pyspark.sql.functions import hour, minute, second, col, to_date, day, month, to_timestamp

data = data.withColumn("duration", to_timestamp(col("duration"), "dd/MM/yyyy HH:mm:ss"))
data = data.withColumn("minutes", hour(col("duration")))
data = data.withColumn("seconds", minute(col("duration")))
data = data.withColumn("duration", col("minutes") * 60 + col("seconds"))
data = data.drop("minutes")
data = data.drop("seconds")

In [274]:
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import round, col

data = data.withColumn("grade", regexp_replace("grade", ",", "."))
data = data.withColumn("grade", data["grade"].cast("double"))
data = data.withColumn("youtube", data["youtube"].cast("integer"))
data = data.withColumn("spotify", data["spotify"].cast("integer"))

In [275]:
from pyspark.sql.functions import when

data = data.withColumn("new_old", when(data["year"] > 2016, 1).otherwise(0))

In [276]:
data.printSchema()
print(data.columns)

root
 |-- album: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- spotify: integer (nullable = true)
 |-- youtube: integer (nullable = true)
 |-- grade: double (nullable = true)
 |-- new_old: integer (nullable = false)

['album', 'title', 'year', 'duration', 'spotify', 'youtube', 'grade', 'new_old']


In [277]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="album", outputCol="AlbumIndex")
indexed = indexer.fit(data).transform(data)

In [278]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['year', 'duration', 'spotify', 'youtube', 'AlbumIndex'], 
outputCol="features",
handleInvalid="skip")

In [279]:
output = assembler.transform(indexed)
output.select("features", "new_old").show()
final_data = output.select("features", "new_old")

+--------------------+-------+
|            features|new_old|
+--------------------+-------+
|[2006.0,232.0,102...|      0|
|[2008.0,173.0,143...|      0|
|[2007.0,203.0,177...|      0|
|[2008.0,242.0,92....|      0|
|[2007.0,201.0,243...|      0|
|[2006.0,213.0,38....|      0|
|[2008.0,241.0,167...|      0|
|[2008.0,294.0,106...|      0|
|[2008.0,235.0,766...|      0|
|[2008.0,234.0,104...|      0|
|[2008.0,231.0,554...|      0|
|[2008.0,261.0,54....|      0|
|[2008.0,243.0,309...|      0|
|[2008.0,245.0,39....|      0|
|[2008.0,279.0,34....|      0|
|[2008.0,237.0,31....|      0|
|[2008.0,263.0,28....|      0|
|[2021.0,277.0,284...|      1|
|[2010.0,237.0,38....|      0|
|[2010.0,231.0,107...|      0|
+--------------------+-------+
only showing top 20 rows



In [318]:
train_data, test_data = final_data.randomSplit([0.5, 0.5])

train_data.describe().show()
test_data.describe().show()

+-------+------------------+
|summary|           new_old|
+-------+------------------+
|  count|                85|
|   mean|               0.6|
| stddev|0.4928053803045811|
|    min|                 0|
|    max|                 1|
+-------+------------------+

+-------+-------------------+
|summary|            new_old|
+-------+-------------------+
|  count|                 82|
|   mean| 0.6707317073170732|
| stddev|0.47283954548277546|
|    min|                  0|
|    max|                  1|
+-------+-------------------+



In [319]:
from pyspark.ml.classification import (DecisionTreeClassifier, GBTClassifier, RandomForestClassifier)
from pyspark.ml import Pipeline

In [320]:
dtc = DecisionTreeClassifier(labelCol="new_old", featuresCol="features")
rfc = RandomForestClassifier(labelCol="new_old", featuresCol="features")
gbt = GBTClassifier(labelCol="new_old", featuresCol="features")

In [321]:
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbt_model = gbt.fit(train_data)

In [322]:
dtc_preds = dtc_model.transform(test_data)
rfc_preds = rfc_model.transform(test_data)
gbt_preds = gbt_model.transform(test_data)

In [323]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

my_binary_eval = BinaryClassificationEvaluator(labelCol="new_old")

In [324]:
print("DTC:")
print(my_binary_eval.evaluate(dtc_preds))

DTC:
1.0


In [325]:
print("RFC:")
print(my_binary_eval.evaluate(rfc_preds))

RFC:
1.0


In [326]:
print("GBT:")
print(my_binary_eval.evaluate(gbt_preds))

GBT:
1.0


In [327]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator(labelCol="new_old", metricName="accuracy")
rfc_acc = acc_eval.evaluate(rfc_preds)

In [328]:
rfc_acc

1.0