This notebook is meant as an example of how to handle PySpark-MySQL communication.

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.config("spark.jars", "/usr/share/java/mysql-connector-java-8.0.22.jar") \
    .master("local").appName("PySpark_MySQL_test").getOrCreate()

In [3]:
wine_df = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/TestDB") \
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "Wines") \
    .option("user", "greg").option("password", "greg").load()

In [4]:
wine_df.show(5)

+-------------+----------------+-----------+--------------+---------+--------+---------+-------+----+---------+-------+-------+------+
|fixed_acidity|volatile_acidity|citric_acid|residual_sugar|chlorides|free_so2|total_so2|density|  pH|sulphates|alcohol|quality|is_red|
+-------------+----------------+-----------+--------------+---------+--------+---------+-------+----+---------+-------+-------+------+
|          7.4|             0.7|        0.0|           1.9|    0.076|    11.0|     34.0| 0.9978|3.51|     0.56|    9.4|      5|     1|
|          7.8|            0.88|        0.0|           2.6|    0.098|    25.0|     67.0| 0.9968| 3.2|     0.68|    9.8|      5|     1|
|          7.8|            0.76|       0.04|           2.3|    0.092|    15.0|     54.0|  0.997|3.26|     0.65|    9.8|      5|     1|
|         11.2|            0.28|       0.56|           1.9|    0.075|    17.0|     60.0|  0.998|3.16|     0.58|    9.8|      6|     1|
|          7.4|             0.7|        0.0|           

In [5]:
train_df, test_df = wine_df.randomSplit([.8, .2], seed=12345)

In [6]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

predictors = ["fixed_acidity", "volatile_acidity", "citric_acid", "residual_sugar", "chlorides",
              "free_so2", "total_so2", "density", "pH", "sulphates", "alcohol"]
vec_assembler = VectorAssembler(inputCols=predictors, outputCol="features")
vec_train_df = vec_assembler.transform(train_df)
vec_train_df.select("features", "is_red").show(5)

+--------------------+------+
|            features|is_red|
+--------------------+------+
|[3.8,0.31,0.02,11...|     0|
|[3.9,0.225,0.4,4....|     0|
|[4.2,0.17,0.36,1....|     0|
|[4.2,0.215,0.23,5...|     0|
|[4.4,0.32,0.39,4....|     0|
+--------------------+------+
only showing top 5 rows



In [7]:
lr = LogisticRegression(labelCol="is_red", featuresCol="features")
lr_model = lr.fit(vec_train_df)

In [8]:
vec_test_df = vec_assembler.transform(test_df)
predictions = lr_model.transform(vec_test_df)

In [9]:
predictions.select("rawPrediction", "probability", "prediction").toPandas().head()

Unnamed: 0,rawPrediction,probability,prediction
0,"[4.448878602515167, -4.448878602515167]","[0.9884434448051217, 0.011556555194878429]",0.0
1,"[1.1833541636634013, -1.1833541636634013]","[0.7655503552774502, 0.23444964472254975]",0.0
2,"[13.004578940159455, -13.004578940159455]","[0.9999977500019089, 2.249998091073513e-06]",0.0
3,"[9.536028316929983, -9.536028316929983]","[0.9999278021663506, 7.219783364951426e-05]",0.0
4,"[4.187699820018537, -4.187699820018537]","[0.9850458571442262, 0.014954142855773707]",0.0


In [10]:
# do this once RF is working
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vec_assembler, lr])
pipeline_model = pipeline.fit(train_df)
pipeline_predictions = pipeline_model.transform(test_df)

In [11]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="is_red")
evaluator.evaluate(predictions)

0.9917049465379387

In [12]:
evaluator.evaluate(pipeline_predictions)

0.9917049465379387

In [None]:
# some information on what to do is in Learning Spark chapter 5