In [1]:
# Section must be included at the beginning of each new notebook. Remember to change the app name.
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import *
spark = SparkSession.builder.appName('logistic_regression_wq').getOrCreate()

# If you're getting an error with numpy, please type 'sudo pip3 install numpy --user' into the console.
# If you're getting an error with another package, type 'sudo pip3 install PACKAGENAME --user'. 
# Replace PACKAGENAME with the relevant package (such as pandas, etc).
from pyspark.ml.classification import LogisticRegression

In [2]:
# Import data and print schema - columns is another way to view the data's features.
df = spark.read.csv('clean_data.csv', header=True, inferSchema=True)
df.printSchema()
df.toPandas().describe()

root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- Wine color: integer (nullable = true)
 |-- quality: string (nullable = true)



Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,density,pH,sulphates,alcohol,Wine color
count,4871.0,4871.0,4871.0,4871.0,4871.0,4871.0,4871.0,4871.0,4871.0,4871.0,4871.0
mean,7.197824,0.336564,0.313683,5.040279,0.054024,29.61045,0.994459,3.225915,0.524607,10.591818,0.256005
std,1.269676,0.158814,0.133128,4.369997,0.028821,16.514626,0.002936,0.149938,0.136529,1.171285,0.436469
min,4.4,0.08,0.0,0.6,0.012,1.0,0.98711,2.79,0.22,8.4,0.0
25%,6.4,0.23,0.25,1.8,0.037,17.0,0.992,3.12,0.43,9.6,0.0
50%,7.0,0.29,0.31,2.8,0.047,28.0,0.9946,3.22,0.51,10.5,0.0
75%,7.7,0.405,0.39,7.4,0.066,40.0,0.9968,3.33,0.6,11.4,1.0
max,13.3,1.1,0.78,20.8,0.413,83.0,1.0021,3.75,1.98,14.2,1.0


In [None]:
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)
# First create a string indexer which converts every string into a number, such as male = 0 and female = 1.
# A number will be assigned to every category in the column.
quality_indexer = StringIndexer(inputCol='quality',outputCol='qualityIndex')
wine_color_indexer = StringIndexer(inputCol='Wine color',outputCol='wine_colorIndex')

# Now we can one hot encode these numbers. This converts the various outputs into a single vector.
# Multiple columns are collapsed into one. 
# This makes it easier to process when you have multiple classes.
# fixed_acidity_encoder = OneHotEncoder(inputCol='fixed acidity',outputCol='fixed_acidityVec')
# volatile_acidity_encoder = OneHotEncoder(inputCol='volatile acidity',outputCol='volatile_acidityVec')
# citric_acid_encoder = OneHotEncoder(inputCol='citric acid',outputCol='citric_acidVec')
# residual_sugar_encoder = OneHotEncoder(inputCol='residual sugar',outputCol='residual_sugarVec')
# chlorides_encoder = OneHotEncoder(inputCol='chlorides',outputCol='chloridesVec')
# free_sulfur_dioxide_encoder = OneHotEncoder(inputCol='free sulfur dioxide',outputCol='free_sulfur_dioxideVec')
# density_encoder = OneHotEncoder(inputCol='density',outputCol='densityVec')
# pH_encoder = OneHotEncoder(inputCol='pH',outputCol='pHVec')
# sulphates_encoder = OneHotEncoder(inputCol='sulphates',outputCol='sulphatesVec')
# alcohol_encoder = OneHotEncoder(inputCol='alcohol',outputCol='alcoholVec')
wine_color_encoder = OneHotEncoder(inputCol='wine_colorIndex',outputCol='wine_colorVec')
# quality_encoder = OneHotEncoder(inputCol='qualityIndex',outputCol='qualityVec')

# And finally, using vector assembler to turn all of these columns into one column (named features).
assembler = VectorAssembler(inputCols=['fixed acidity','volatile acidity','citric acid','residual sugar',
                                       'chlorides','free sulfur dioxide','density','pH',
                                       'sulphates', 'alcohol', 'wine_colorVec'], outputCol="features")

In [None]:
from pyspark.ml import Pipeline

# Then go through our steps. It's essentially sequential to the above.
pipeline = Pipeline(stages=[quality_indexer, wine_color_indexer, wine_color_encoder, assembler])
