## Overview
#### In this notebook we will explore data around wine ratings and build out a model to determine the quality of new wines

##### 1. Our data engineering team will use scala to prepare the data and store the resulting table in delta lake
##### 2. Our data sceince team will pick up the table, and build out several models in python to then deploy to mlflow

In [2]:
# File location and type
file_location = "/FileStore/tables/winemag_data_first150k-8c60b.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

country,description,designation,points,price,province,region_1,region_2,variety,winery
US,"This tremendous 100% varietal wine hails from Oakville and was aged over three years in oak. Juicy red-cherry fruit and a compelling hint of caramel greet the palate, framed by elegant, fine tannins and a subtle minty tone in the background. Balanced and rewarding from start to finish, it has years ahead of it to develop further nuance. Enjoy 2022–2030.",Martha's Vineyard,96,235.0,California,Napa Valley,Napa,Cabernet Sauvignon,Heitz
Spain,"Ripe aromas of fig, blackberry and cassis are softened and sweetened by a slathering of oaky chocolate and vanilla. This is full, layered, intense and cushioned on the palate, with rich flavors of chocolaty black fruits and baking spices. A toasty, everlasting finish is heady but ideally balanced. Drink through 2023.",Carodorum Selección Especial Reserva,96,110.0,Northern Spain,Toro,,Tinta de Toro,Bodega Carmen Rodríguez
US,"Mac Watson honors the memory of a wine once made by his mother in this tremendously delicious, balanced and complex botrytised white. Dark gold in color, it layers toasted hazelnut, pear compote and orange peel flavors, reveling in the succulence of its 122 g/L of residual sugar.",Special Selected Late Harvest,96,90.0,California,Knights Valley,Sonoma,Sauvignon Blanc,Macauley
US,"This spent 20 months in 30% new French oak, and incorporates fruit from Ponzi's Aurora, Abetina and Madrona vineyards, among others. Aromatic, dense and toasty, it deftly blends aromas and flavors of toast, cigar box, blackberry, black cherry, coffee and graphite. Tannins are polished to a fine sheen, and frame a finish loaded with dark chocolate and espresso. Drink now through 2032.",Reserve,96,65.0,Oregon,Willamette Valley,Willamette Valley,Pinot Noir,Ponzi
France,"This is the top wine from La Bégude, named after the highest point in the vineyard at 1200 feet. It has structure, density and considerable acidity that is still calming down. With 18 months in wood, the wine has developing an extra richness and concentration. Produced by the Tari family, formerly of Château Giscours in Margaux, it is a wine made for aging. Drink from 2020.",La Brûlade,95,66.0,Provence,Bandol,,Provence red blend,Domaine de la Bégude
Spain,"Deep, dense and pure from the opening bell, this Toro is a winner. Aromas of dark ripe black fruits are cool and moderately oaked. This feels massive on the palate but sensationally balanced. Flavors of blackberry, coffee, mocha and toasty oak finish spicy, smooth and heady. Drink this exemplary Toro through 2023.",Numanthia,95,73.0,Northern Spain,Toro,,Tinta de Toro,Numanthia
Spain,"Slightly gritty black-fruit aromas include a sweet note of pastry along with a hint of prune. Wall-to-wall saturation ensures that all corners of one's mouth are covered. Flavors of blackberry, mocha and chocolate are highly impressive and expressive, while this settles nicely on a long finish. Drink now through 2024.",San Román,95,65.0,Northern Spain,Toro,,Tinta de Toro,Maurodos
Spain,"Lush cedary black-fruit aromas are luxe and offer notes of marzipan and vanilla. This bruiser is massive and tannic on the palate, but still lush and friendly. Chocolate is a key flavor, while baked berry and cassis flavors are hardly wallflowers. On the finish, this is tannic and deep as a sea trench. Drink this saturated black-colored Toro through 2023.",Carodorum Único Crianza,95,110.0,Northern Spain,Toro,,Tinta de Toro,Bodega Carmen Rodríguez
US,"This re-named vineyard was formerly bottled as deLancellotti. You'll find striking minerality underscoring chunky black fruits. Accents of citrus and graphite comingle, with exceptional midpalate concentration. This is a wine to cellar, though it is already quite enjoyable. Drink now through 2030.",Silice,95,65.0,Oregon,Chehalem Mountains,Willamette Valley,Pinot Noir,Bergström
US,"The producer sources from two blocks of the vineyard for this wine—one at a high elevation, which contributes bright acidity. Crunchy cranberry, pomegranate and orange peel flavors surround silky, succulent layers of texture that present as fleshy fruit. That delicately lush flavor has considerable length.",Gap's Crown Vineyard,95,60.0,California,Sonoma Coast,Sonoma,Pinot Noir,Blue Farm


In [3]:
# Create a view or table

temp_table_name = "winemag_data"

df.createOrReplaceTempView(temp_table_name)

In [4]:
%sql

select variety, avg(POINTS) as mean_points, count(variety) as number_ratings from winemag_data group by variety having (count(variety) >25) order by mean_points desc limit 20

variety,mean_points,number_ratings
Picolit,91.03448275862068,29
Tokaji,90.68493150684932,73
Scheurebe,90.53571428571428,28
Nebbiolo,90.48148148148148,2241
Sangiovese Grosso,90.3075780089153,1346
Zibibbo,90.08571428571427,35
Petit Manseng,90.06382978723404,47
Champagne Blend,90.00242326332796,1238
Tinto Fino,89.90666666666667,75
Nerello Mascalese,89.74666666666667,75


In [5]:
%scala 
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val dfscala = spark.sql("SELECT * FROM winemag_data")

In [6]:
%scala
display(dfscala.groupBy("points").count().orderBy(desc("points")))

points,count
99,50
98,131
97,365
96,695
95,1715
94,3462
93,6012
92,9239
91,10534
90,15970


In [7]:
%scala 
import scala.util.{Try, Success, Failure}

val couldIbeAnInteger = udf((value: String) => Try(value.toInt).isSuccess)
val filteredDF = dfscala.filter((couldIbeAnInteger($"points")))
  .withColumn("price",col("price").cast(IntegerType))
  .withColumn("points",col("points").cast(IntegerType))


In [8]:
%scala
display(filteredDF.groupBy("points").count().orderBy(desc("points")))

points,count
100,24
99,50
98,131
97,365
96,695
95,1715
94,3462
93,6012
92,9239
91,10534


In [9]:
%scala
import org.apache.spark.sql.sources.IsNull


for (c <- filteredDF.columns) println(c + " has " + filteredDF.filter(filteredDF(c).isNull).count() + " null values")

In [10]:
%scala
display(filteredDF.select("points", "price"))

points,price
96,235.0
96,110.0
96,90.0
96,65.0
95,66.0
95,73.0
95,65.0
95,110.0
95,65.0
95,60.0


In [11]:
%scala
val finalDF = filteredDF.withColumn("DescriptionLen", length('description))

display(finalDF.select("points", "DescriptionLen").orderBy("points").sample(false, 0.0065))

points,DescriptionLen
80,270
80,134
80,98
80,135
80,174
81,149
81,170
81,113
81,136
81,213


In [12]:
%scala

spark.sql("CREATE TABLE IF NOT EXISTS winedelta USING DELTA LOCATION '/delta/winedelta'")
finalDF.write.format("delta").mode("overwrite").save("/delta/winedelta")


In [13]:
%scala
val wine = spark.read.format("delta").load("/delta/winedelta")

In [14]:
%sql
OPTIMIZE delta.`delta/winedelta`
  ZORDER BY (region_2)

path,metrics
delta/winedelta,"List(List(23498100, 23498100, 2.34981E7, 1, 23498100), List(1243083, 3357411, 3046660.0, 8, 24373282), 0, List(minCubeSize(107374182400), List(0, 0), List(8, 24373282), 0, List(8, 24373282), 0), 1)"


In [15]:
%scala

for (c <- wine.columns) println(c + " has " + wine.filter(wine(c).isNull).count() + " null values")

### \/\/\/\/ Hand off to data science team for Python work \/\/\/\/

In [17]:
pywine = spark.read.format("delta").load("/delta/winedelta")

In [18]:
import pandas as pd
display(pywine.toPandas().corr())

points,price,DescriptionLen
1.0,0.459851480946517,0.5190604401608511
0.459851480946517,1.0,0.2548625717465874
0.5190604401608511,0.2548625717465874,1.0


In [19]:
display(pywine.groupBy("country").max("price").orderBy("max(price)", ascending = False))

country,max(price)
France,2300.0
US,2013.0
Austria,1100.0
Portugal,980.0
Italy,900.0
Australia,850.0
Germany,775.0
Spain,770.0
Hungary,764.0
Chile,400.0


In [20]:
pywine = pywine.filter(pywine.country.isNotNull()).filter(pywine.price.isNotNull()).filter(pywine.province.isNotNull()).filter(pywine.region_1.isNotNull())

#filter out all rows that have a null value for Country, Price, Province, or Region_1

In [21]:
collist = pywine.columns
collist.remove("points")
collist.remove("price")
collist.remove("DescriptionLen")
collist.remove("designation")
collist.remove("region_2")

#dropping designation and region_2 based on null count 

In [22]:
collist

In [23]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import *

#define stringindexer to cast fraud_flag column to 'label' for mllib models
strindexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in collist]
stringpipe = Pipeline().setStages(strindexers).fit(pywine).transform(pywine)

In [24]:
feature_vec = stringpipe.columns[(len(stringpipe.columns)-len(strindexers)):len(stringpipe.columns)]
feature_vec.append('price')
feature_vec.append('DescriptionLen')
feature_vec

In [25]:
# SET UP PIPELINE

assemble_features = VectorAssembler(inputCols=feature_vec, outputCol="features")
winePipe =Pipeline().setStages([assemble_features]).fit(stringpipe).transform(stringpipe)
winePipe = winePipe.withColumnRenamed("points", "label")
winePipe.describe

In [26]:
train,test =winePipe.randomSplit([0.7, 0.3])

In [27]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(maxIter=10)

# Fit the model
lrModel = lr.fit(train)
yhat = lrModel.transform(test)

In [28]:
# !pip install -U scikit-learn
import sklearn.metrics

lrr2 = sklearn.metrics.r2_score(test.select("label").collect(),yhat.select("prediction").collect())

lrr2

In [29]:
from pyspark.ml.regression import GeneralizedLinearRegression

glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10)

# Fit the model
glrModel = glr.fit(winePipe)
glrYhat = glrModel.transform(test)

In [30]:
glrr2 = sklearn.metrics.r2_score(test.select("label").collect(),glrYhat.select("prediction").collect())
glrr2

In [31]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, avg, stddev

#Set max categories to 4 so any indexed variable with greater than 4 categories will be treated as continuous
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(winePipe)

transformedData = featureIndexer.transform(winePipe)
transformedData = transformedData.drop("features")
transformedData= transformedData.withColumn("features", col("indexedFeatures"))

training,testing =transformedData.randomSplit([0.7, 0.3])

rf = RandomForestRegressor()

# Chain indexer and forest in a Pipeline
rfPipe = Pipeline(stages=[featureIndexer, rf])

# Train model.  This also runs the indexer.
rfModel = rfPipe.fit(training)

# Make predictions.
rfYhat = rfModel.transform(testing)

In [32]:
rfr2 = sklearn.metrics.r2_score(testing.select("label").collect(),rfYhat.select("prediction").collect())
rfr2

In [33]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator


# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
kmModel = kmeans.fit(winePipe)

# Make predictions
clusterpreds = kmModel.transform(winePipe)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(clusterpreds)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = kmModel.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

In [34]:
import seaborn as sns
import matplotlib.pyplot as plt
clusterpd =  clusterpreds.select("price" ,"label", "prediction").toPandas()
clustpdFilt = clusterpd[clusterpd['price']<1500]

sns.set(style="whitegrid", palette = sns.palplot("muted"))
fig, ax = plt.subplots(figsize = (15, 8))
figure = sns.scatterplot(x = "label", y = "price", hue = "prediction", style = "prediction", x_jitter = True, y_jitter = True, data = clustpdFilt)
sns.set(style="whitegrid", palette = sns.palplot("muted"))
display(figure)



In [35]:
clusterpd.groupby("prediction").describe()

Unnamed: 0_level_0,price,price,price,price,price,price,price,price,label,label,label,label,label,label,label,label
Unnamed: 0_level_1,count,mean,std,min,25%,50%,75%,max,count,mean,std,min,25%,50%,75%,max
prediction,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2,Unnamed: 15_level_2,Unnamed: 16_level_2
0,40028.0,36.280953,41.610985,4.0,17.0,26.0,44.0,2300.0,40028.0,88.102828,3.271077,80.0,86.0,88.0,90.0,100.0
1,74342.0,33.781012,34.675436,4.0,16.0,25.0,40.0,1400.0,74342.0,87.721342,3.284015,80.0,85.0,87.0,90.0,100.0


In [36]:
# Start an MLflow run
!pip install mlflow
import mlflow 

with mlflow.start_run():
  # Log a parameter (key-value pair)
  mlflow.log_param("Training Set Size", train.count())
  mlflow.log_param("Testing Set Size", test.count())

  # Log a metric; metrics can be updated throughout the run
  mlflow.log_metric("Linear Regression R Squared", lrr2)
  mlflow.log_metric("Generalized Linear Regression R Squared", glrr2)
  mlflow.log_metric("Random Forest Regressor R Squared", rfr2)
  mlflow.log_metric("Cluster Silhouette", silhouette)


  # Log an artifact (output file)
  with open("output.txt", "w") as f:
      f.write("Here can add text etc. to an output file to be downloaded from MLFlow page")
  mlflow.log_artifact("output.txt")

In [37]:
from IPython.display import IFrame    
IFrame("https://giphy.com/gifs/go-american-outlaws-ZFJYcVE5lYvWE/fullscreen" , width="800", height="500")


## Extra unused cells and viz for dashboard

In [39]:
display(pywine.select("country", "variety", "points").filter(col("variety") == "Malbec"))

country,variety,points
Argentina,Malbec,87
Argentina,Malbec,89
Argentina,Malbec,89
France,Malbec,89
France,Malbec,89
Argentina,Malbec,90
Argentina,Malbec,90
Argentina,Malbec,90
Argentina,Malbec,87
Argentina,Malbec,89


In [40]:
display(pywine.select("region_1", "price", "winery").filter(col("region_1") == "Napa Valley").orderBy("price", ascending = False).limit(50))

region_1,price,winery
Napa Valley,625,Yao Ming
Napa Valley,625,Yao Ming
Napa Valley,500,Harlan Estate
Napa Valley,450,Harlan Estate
Napa Valley,350,Harlan Estate
Napa Valley,335,Bryant Family
Napa Valley,300,Cardinale
Napa Valley,300,Hundred Acre
Napa Valley,300,Hundred Acre
Napa Valley,300,Hundred Acre


In [41]:
# import seaborn as sns
# sns.set(style="whitegrid", palette = sns.palplot("muted"))
# fig1, ax1 = plt.subplots(figsize = (10, 6))
# seaborn1 = sns.regplot("price", "points", data = pywine.toPandas())
# # display(seaborn1)

In [42]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

# permanent_table_name = "finalPyWineTable"

# pywine.write.format("parquet").saveAsTable(permanent_table_name)