<a href="https://colab.research.google.com/github/Fimbrez/PersonalProjects/blob/main/WineRatings.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import pandas as pd


In [None]:
from pyspark.sql import SparkSession

# read csv file into pyspark dataframe
infile = 'winemag-data-130k-v2.csv'

spark = SparkSession \
    .builder \
    .appName("Winemag Wine") \
    .getOrCreate()


In [None]:
df = spark.read.csv(infile, inferSchema=True, header = True)

In [None]:
type(df)

In [None]:
#showing df schema; see all the variables we will be using
df.printSchema()

In [None]:
#Look at data
df.show(3)

# Linear Regression


### Price as a predictor


In [None]:
#subsetting the data to prepare it for linear regression
vars_to_keep = ["points","price"]

# subset the dataframe on these predictors
df2 = df.select(vars_to_keep)
df2.show(3)

In [None]:
#importing libararies we will be using
from pyspark.ml.linalg import DenseVector
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler 
from pyspark.mllib.linalg import Vectors

In [None]:
#Renaming the response variables
df2 = df2.withColumnRenamed("points","Ratings")

In [None]:
df2.show(3)

In [None]:
#Removing null values
df2 = df2.filter(df2.price.isNotNull())
df2 = df2.filter(df2.Ratings.isNotNull())

In [None]:
#Looking at df2 now
df2.show(3)

In [None]:
#Typecasting the variables to integers
from pyspark.sql.types import IntegerType
df2 = df2.withColumn("Ratings", df2["Ratings"].cast("float"))
df2 = df2.withColumn("price", df2["price"].cast("float")) #needs to be of type array

In [None]:
assembler = VectorAssembler(inputCols=["price"], outputCol="features") 
df2 = assembler.transform(df2.na.drop())

In [None]:
#dropping the price column
df2 = df2.drop("price")

In [None]:
df2.show(20)

In [None]:
from pyspark.sql.functions import col

In [None]:
# Split data approximately into training (60%) and test (40%)
seed = 314
train_test = [0.6, 0.4]
train_data, test_data = df2.randomSplit(train_test, seed)

In [None]:
train_data.show(3)

In [None]:
#checking count of training and test to match up with total
(train_data.count(), test_data.count(), df2.count())

In [None]:
from pyspark.ml.regression import LinearRegression # note this is from the ML package

maxIter=10
regParam=0.3
elasticNetParam=0.8

lr = LinearRegression(featuresCol = 'features', labelCol = 'Ratings', maxIter = maxIter, 
                      regParam = regParam, elasticNetParam = elasticNetParam)

In [None]:
model1 = lr.fit(train_data)

In [None]:
lrPred = model1.transform(test_data)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
eval = RegressionEvaluator(labelCol = "Ratings", predictionCol = "prediction",
                          metricName = "rmse")
mse = eval.evaluate(lrPred, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

### Variety, Winery, and Country as predictors

In [None]:
#subsetting the data to prepare it for linear regression
vars_to_keep = ["variety","winery","country","points"]

# subset the dataframe on these predictors
df3 = df.select(vars_to_keep)
df3.show(3)

#### Encoding the string-valued data

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").setHandleInvalid("skip").fit(df3) for column in list(set(df3.columns)-set(['points'])) ]


pipeline = Pipeline(stages=indexers)
df3 = pipeline.fit(df3).transform(df3)

df3.show(3)

In [None]:
#Renaming the response variables
df3 = df3.withColumnRenamed("points","Ratings")
df3 = df3.withColumn("Ratings", df3["Ratings"].cast("float"))
df3 = df3.withColumn("variety_index", df3["variety_index"].cast("float"))
df3 = df3.withColumn("winery_index", df3["winery_index"].cast("float"))
df3 = df3.withColumn("country_index", df3["country_index"].cast("float"))

In [None]:
# dropping the string-valued columns
df3 = df3.select(["Ratings","variety_index","winery_index","country_index"])
df3 = df3.filter(df3.variety_index.isNotNull())
df3 = df3.filter(df3.Ratings.isNotNull())
df3 = df3.filter(df3.country_index.isNotNull())
df3 = df3.filter(df3.winery_index.isNotNull())


In [None]:
assembler = VectorAssembler(inputCols=["variety_index","winery_index","country_index"], outputCol="features") 
df3 = assembler.transform(df3.na.drop())
df3.show(3)

In [None]:
df3 = df3.select("Ratings","features")
df3.show(3)

In [None]:
# Feature scaling

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled", 
                                withStd=True, withMean=False)

# Fit the DataFrame to the scaler; this computes the mean, standard deviation of each feature
scaler = standardScaler.fit(df3)

# Transform the data in `df2` with the scaler
scaled_df = scaler.transform(df3)
scaled_df.show(3)

In [None]:
# Split data approximately into training (60%) and test (40%)
seed = 314
train_test = [0.6, 0.4]
train_data, test_data = df3.randomSplit(train_test, seed)

In [None]:
model2 = lr.fit(train_data)

In [None]:
lrPred2 = model2.transform(test_data)

In [None]:
mse = eval.evaluate(lrPred2, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)

### Price, Variety, Winery, and Country as predictors

In [None]:
vars_to_keep = ["variety","winery","country","points","price"]

# subset the dataframe on these predictors
df4 = df.select(vars_to_keep)
df4.show(3)

In [None]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").setHandleInvalid("skip").fit(df4) for column in list(set(df4.columns)-set(['points','price'])) ]


pipeline = Pipeline(stages=indexers)
df4 = pipeline.fit(df4).transform(df4)

df4.show(3)

In [None]:
df4 = df4.withColumnRenamed("points","Ratings")
df4 = df4.withColumn("Ratings", df4["Ratings"].cast("float"))
df4 = df4.withColumn("variety_index", df4["variety_index"].cast("float"))
df4 = df4.withColumn("winery_index", df4["winery_index"].cast("float"))
df4 = df4.withColumn("country_index", df4["country_index"].cast("float"))

In [None]:
df4 = df4.select(["Ratings","variety_index","winery_index","country_index","price"])
df4 = df4.filter(df4.variety_index.isNotNull())
df4 = df4.filter(df4.Ratings.isNotNull())
df4 = df4.filter(df4.country_index.isNotNull())
df4 = df4.filter(df4.winery_index.isNotNull())
df4 = df4.filter(df4.price.isNotNull())

In [None]:
df4 = df4.withColumn("price", df4["price"].cast("float"))

In [None]:
assembler = VectorAssembler(inputCols=["variety_index","winery_index","country_index","price"], outputCol="features") 
df4 = assembler.transform(df4.na.drop())
df4.show(3)

In [None]:
df4 = df4.select("Ratings","features")
df4.show(3)

In [None]:
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled", 
                                withStd=True, withMean=False)

# Fit the DataFrame to the scaler; this computes the mean, standard deviation of each feature
scaler = standardScaler.fit(df4)

# Transform the data in `df2` with the scaler
scaled_df = scaler.transform(df4)
scaled_df.show(3)

In [None]:
train_data, test_data = df4.randomSplit(train_test, seed)

In [None]:
model3 = lr.fit(train_data)

In [None]:
lrPred3 = model3.transform(test_data)

In [None]:
mse = eval.evaluate(lrPred3, {eval.metricName: "mse"})
print("MSE: %.3f" % mse)