https://towardsdatascience.com/pyspark-mysql-tutorial-fa3f7c26dc7

docker exec -it mysql /bin/bash

!python3 -m pip show pandas

In [1]:
import pandas as pd

red_wines = pd.read_csv("winequality-red.csv", sep=";")
red_wines["is_red"] = 1
white_wines = pd.read_csv("winequality-white.csv", sep=";")
white_wines["is_red"] = 0
all_wines = pd.concat([red_wines, white_wines])
all_wines

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality,is_red
0,7.4,0.70,0.00,1.9,0.076,11.0,34.0,0.99780,3.51,0.56,9.4,5,1
1,7.8,0.88,0.00,2.6,0.098,25.0,67.0,0.99680,3.20,0.68,9.8,5,1
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.99700,3.26,0.65,9.8,5,1
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.99800,3.16,0.58,9.8,6,1
4,7.4,0.70,0.00,1.9,0.076,11.0,34.0,0.99780,3.51,0.56,9.4,5,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...
4893,6.2,0.21,0.29,1.6,0.039,24.0,92.0,0.99114,3.27,0.50,11.2,6,0
4894,6.6,0.32,0.36,8.0,0.047,57.0,168.0,0.99490,3.15,0.46,9.6,5,0
4895,6.5,0.24,0.19,1.2,0.041,30.0,111.0,0.99254,2.99,0.46,9.4,6,0
4896,5.5,0.29,0.30,1.1,0.022,20.0,110.0,0.98869,3.34,0.38,12.8,7,0


!apt-get install -y python3-setuptools

!python3 -m pip install --user mysql-connector

import mysql.connector

db_connection = mysql.connector.connect(host='mysql', user="root", password="go2team", auth_plugin='mysql_native_password')
db_cursor = db_connection.cursor()
db_cursor.execute("CREATE DATABASE TestDB;")
db_cursor.execute("USE TestDB;")

db_cursor.execute("CREATE TABLE Wines(fixed_acidity FLOAT, volatile_acidity FLOAT, \
                   citric_acid FLOAT, residual_sugar FLOAT, chlorides FLOAT, \
                   free_so2 FLOAT, total_so2 FLOAT, density FLOAT, pH FLOAT, \
                   sulphates FLOAT, alcohol FLOAT, quality INT, is_red INT);")

wine_tuples = list(all_wines.itertuples(index=False, name=None))
wine_tuples_string = ",".join(["(" + ",".join([str(w) for w in wt]) + ")" for wt in wine_tuples])

db_cursor.execute("USE TestDB;")
db_cursor.execute("INSERT INTO Wines(fixed_acidity, volatile_acidity, citric_acid,\
                   residual_sugar, chlorides, free_so2, total_so2, density, pH,\
                   sulphates, alcohol, quality, is_red) VALUES " + wine_tuples_string + ";")
db_cursor.execute("FLUSH TABLES;")

# Accessing MySQL with PySpark

In [3]:
spark

In [5]:
spark.sparkContext._conf.getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.eventLog.dir', 'hdfs://master:9000/spark-logs'),
 ('spark.sql.repl.eagerEval.enabled', 'true'),
 ('spark.history.ui.port', '18080'),
 ('spark.master', 'spark://master:7077'),
 ('spark.driver.port', '45059'),
 ('spark.driver.host', 'master'),
 ('spark.sql.warehouse.dir', 'hdfs://master:9000/apps/hive/warehouse'),
 ('spark.history.fs.logDirectory', 'hdfs://master:9000/spark-logs'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.app.id', 'app-20220303032228-0008'),
 ('spark.yarn.jars', 'hdfs:///spark-jars/*.jar'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.executor.memory', '1g'),
 ('spark.app.startTime', '1646277746451'),
 ('spark.rdd.compress', 'True'),
 ('spark.history.provider',
  'org.apache.spark.deploy.history.FsHistoryProvider'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.cores', '1'),
 ('spark.submit.deployMode', 'client'),
 ('spark.yarn

!wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.22/mysql-connector-java-8.0.22.jar

!pwd

!ls -al

In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.jars", "/root/workspace/mysql-connector-java-8.0.22.jar") \
    .master("spark://master:7077").appName("PySpark_MySQL_test").getOrCreate()

wine_df = spark.read.format("jdbc").option("url", "jdbc:mysql://mysql:3306/TestDB") \
    .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "Wines") \
    .option("user", "root").option("password", "go2team").load()

In [7]:
from pyspark.ml.feature import VectorAssembler

train_df, test_df = wine_df.randomSplit([.8, .2], seed=12345)
predictors = ["fixed_acidity", "volatile_acidity", "citric_acid", "residual_sugar", "chlorides",
              "free_so2", "total_so2", "density", "pH", "sulphates", "alcohol"]
vec_assembler = VectorAssembler(inputCols=predictors, outputCol="features")
vec_train_df = vec_assembler.transform(train_df)
vec_train_df.select("features", "is_red").show(5)

+--------------------+------+
|            features|is_red|
+--------------------+------+
|[3.8,0.31,0.02,11...|     0|
|[3.9,0.225,0.4,4....|     0|
|[4.2,0.17,0.36,1....|     0|
|[4.2,0.215,0.23,5...|     0|
|[4.4,0.32,0.39,4....|     0|
+--------------------+------+
only showing top 5 rows



In [8]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="is_red", featuresCol="features")
lr_model = lr.fit(vec_train_df)
vec_test_df = vec_assembler.transform(test_df)
predictions = lr_model.transform(vec_test_df)

In [9]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[vec_assembler, lr])
pipeline_model = pipeline.fit(train_df)
predictions = pipeline_model.transform(test_df)

In [10]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol="is_red")
evaluator.evaluate(predictions)

0.9918152444721234