# Big Data Platforms

## PySpark Machine Learning

### MLlib applied to Wine reviews data 

**Dataset:**
https://www.kaggle.com/zynicide/wine-reviews


Copyright: 2018 [Ashish Pujari](apujari@uchicago.edu)

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import matplotlib.pyplot as plt
%matplotlib inline

In [3]:
#create Spark session
spark = SparkSession.builder.appName('WineReviewsML').getOrCreate()

#change configuration settings on Spark 
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'), ('spark.app.name', 'Spark Updated Conf'), ('spark.executor.cores', '4'), ('spark.cores.max', '4'), ('spark.driver.memory','8g')])

#print spark configuration settings
spark.sparkContext.getConf().getAll()

[('spark.driver.port', '49253'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', 'joshuas-mbp'),
 ('spark.executor.memory', '5g'),
 ('spark.executor.cores', '4'),
 ('spark.cores.max', '4'),
 ('spark.app.name', 'Spark Updated Conf'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.memory', '8g'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1552268824410'),
 ('spark.ui.showConsoleProgress', 'true')]

## Read Data

In [3]:
df = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .csv("T:\\courses\\BigData\\data\\wine-reviews\\winemag-data_first150k.csv",inferSchema=True, header=True )

In [22]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- country: string (nullable = true)
 |-- description: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- points: string (nullable = true)
 |-- price: string (nullable = true)
 |-- province: string (nullable = true)
 |-- region_1: string (nullable = true)
 |-- region_2: string (nullable = true)
 |-- variety: string (nullable = true)
 |-- winery: string (nullable = true)



In [21]:
df = spark.read.csv("winemag-data_first150k.csv", header=True)
df2 = spark.read.csv("winemag-data-130k-v2.csv", header=True)

In [24]:
import pyspark.sql.types as types

In [25]:
df = df.withColumn('points', df['points'].cast(types.IntegerType()))
df2 = df2.withColumn('points', df2['points'].cast(types.IntegerType()))

In [5]:
df2 = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace",True) \
    .csv("T:\\courses\\BigData\\data\\wine-reviews\\winemag-data-130k-v2.csv",inferSchema=True, header=True )

In [17]:
df2.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- country: string (nullable = true)
 |-- description: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- points: string (nullable = true)
 |-- price: string (nullable = true)
 |-- province: string (nullable = true)
 |-- region_1: string (nullable = true)
 |-- region_2: string (nullable = true)
 |-- taster_name: string (nullable = true)
 |-- taster_twitter_handle: string (nullable = true)
 |-- title: string (nullable = true)
 |-- variety: string (nullable = true)
 |-- winery: string (nullable = true)



In [11]:
#combine the two datasets
df = df.union(df2.drop("taster_name", "taster_twitter_handle", "title"))

## Data Exploration

In [12]:
df.count()

280901

In [14]:
df.show(5)

+---+-------+--------------------+--------------------+------+-----+--------------+-----------------+-----------------+------------------+--------------------+
|_c0|country|         description|         designation|points|price|      province|         region_1|         region_2|           variety|              winery|
+---+-------+--------------------+--------------------+------+-----+--------------+-----------------+-----------------+------------------+--------------------+
|  0|     US|This tremendous 1...|   Martha's Vineyard|    96|235.0|    California|      Napa Valley|             Napa|Cabernet Sauvignon|               Heitz|
|  1|  Spain|Ripe aromas of fi...|Carodorum Selecci...|    96|110.0|Northern Spain|             Toro|             null|     Tinta de Toro|Bodega Carmen Rod...|
|  2|     US|Mac Watson honors...|Special Selected ...|    96| 90.0|    California|   Knights Valley|           Sonoma|   Sauvignon Blanc|            Macauley|
|  3|     US|This spent 20 mon...|      

In [15]:
#Count rows with missing values
df.dropna().count()

73337

##  Feature Engineering

In [26]:
from pyspark.ml.feature import QuantileDiscretizer

#High Medium Low
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="points", outputCol="points_category")
df = discretizer.fit(df).transform(df)
df.show(3)

+---+-------+--------------------+--------------------+------+-----+--------------+--------------+--------+------------------+--------------------+---------------+
|_c0|country|         description|         designation|points|price|      province|      region_1|region_2|           variety|              winery|points_category|
+---+-------+--------------------+--------------------+------+-----+--------------+--------------+--------+------------------+--------------------+---------------+
|  0|     US|This tremendous 1...|   Martha's Vineyard|    96|235.0|    California|   Napa Valley|    Napa|Cabernet Sauvignon|               Heitz|            2.0|
|  1|  Spain|Ripe aromas of fi...|Carodorum Selecci...|    96|110.0|Northern Spain|          Toro|    null|     Tinta de Toro|Bodega Carmen Rod...|            2.0|
|  2|     US|Mac Watson honors...|Special Selected ...|    96| 90.0|    California|Knights Valley|  Sonoma|   Sauvignon Blanc|            Macauley|            2.0|
+---+-------+---

## Natural Language Processing

In [27]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

#tokenize words
tokenizer = Tokenizer(inputCol="description", outputCol="words")
df = tokenizer.transform(df)

#drop the redundant source column
df= df.drop("description")
df.show(5)

+---+-------+--------------------+------+-----+--------------+-----------------+-----------------+------------------+--------------------+---------------+--------------------+
|_c0|country|         designation|points|price|      province|         region_1|         region_2|           variety|              winery|points_category|               words|
+---+-------+--------------------+------+-----+--------------+-----------------+-----------------+------------------+--------------------+---------------+--------------------+
|  0|     US|   Martha's Vineyard|    96|235.0|    California|      Napa Valley|             Napa|Cabernet Sauvignon|               Heitz|            2.0|[this, tremendous...|
|  1|  Spain|Carodorum Selecci...|    96|110.0|Northern Spain|             Toro|             null|     Tinta de Toro|Bodega Carmen Rod...|            2.0|[ripe, aromas, of...|
|  2|     US|Special Selected ...|    96| 90.0|    California|   Knights Valley|           Sonoma|   Sauvignon Blanc|   

In [28]:
from pyspark.ml.feature import StopWordsRemover

#remove stop words
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
df = remover.transform(df)

#drop the redundant source column
df= df.drop("words")
df.show(5)

+---+-------+--------------------+------+-----+--------------+-----------------+-----------------+------------------+--------------------+---------------+--------------------+
|_c0|country|         designation|points|price|      province|         region_1|         region_2|           variety|              winery|points_category|            filtered|
+---+-------+--------------------+------+-----+--------------+-----------------+-----------------+------------------+--------------------+---------------+--------------------+
|  0|     US|   Martha's Vineyard|    96|235.0|    California|      Napa Valley|             Napa|Cabernet Sauvignon|               Heitz|            2.0|[tremendous, 100%...|
|  1|  Spain|Carodorum Selecci...|    96|110.0|Northern Spain|             Toro|             null|     Tinta de Toro|Bodega Carmen Rod...|            2.0|[ripe, aromas, fi...|
|  2|     US|Special Selected ...|    96| 90.0|    California|   Knights Valley|           Sonoma|   Sauvignon Blanc|   

In [29]:
#Maps a sequence of terms to their term frequencies using the hashing trick. 
#alternatively, CountVectorizer can also be used to get term frequency vectors
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(df)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
nlpdf = idfModel.transform(featurizedData)
nlpdf.select("points_category", "features").show(10)

+---------------+--------------------+
|points_category|            features|
+---------------+--------------------+
|            2.0|(20,[0,1,3,4,5,7,...|
|            2.0|(20,[0,1,2,4,5,6,...|
|            2.0|(20,[2,3,4,6,7,8,...|
|            2.0|(20,[0,1,2,3,4,5,...|
|            2.0|(20,[0,1,2,4,5,6,...|
|            2.0|(20,[0,1,2,4,5,6,...|
|            2.0|(20,[1,2,4,5,6,7,...|
|            2.0|(20,[0,1,2,4,5,6,...|
|            2.0|(20,[0,1,2,3,4,5,...|
|            2.0|(20,[0,2,3,6,7,8,...|
+---------------+--------------------+
only showing top 10 rows



In [37]:
nlpdf.show(5)

+---+-------+--------------------+------+-----+--------------+-----------------+-----------------+------------------+--------------------+---------------+--------------------+--------------------+
|_c0|country|         designation|points|price|      province|         region_1|         region_2|           variety|              winery|points_category|            filtered|         wordVectors|
+---+-------+--------------------+------+-----+--------------+-----------------+-----------------+------------------+--------------------+---------------+--------------------+--------------------+
|  0|     US|   Martha's Vineyard|    96|235.0|    California|      Napa Valley|             Napa|Cabernet Sauvignon|               Heitz|            2.0|[tremendous, 100%...|[0.13242212413913...|
|  1|  Spain|Carodorum Selecci...|    96|110.0|Northern Spain|             Toro|             null|     Tinta de Toro|Bodega Carmen Rod...|            2.0|[ripe, aromas, fi...|[-0.2539120897530...|
|  2|     US|Sp

In [38]:
#split data into train and test
splits = nlpdf.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

train_df.show(1)

+---+-------+-----------------+------+-----+----------+-----------+--------+------------------+------+---------------+--------------------+--------------------+
|_c0|country|      designation|points|price|  province|   region_1|region_2|           variety|winery|points_category|            filtered|         wordVectors|
+---+-------+-----------------+------+-----+----------+-----------+--------+------------------+------+---------------+--------------------+--------------------+
|  0|     US|Martha's Vineyard|    96|235.0|California|Napa Valley|    Napa|Cabernet Sauvignon| Heitz|            2.0|[tremendous, 100%...|[0.13242212413913...|
+---+-------+-----------------+------+-----+----------+-----------+--------+------------------+------+---------------+--------------------+--------------------+
only showing top 1 row



### Logistic Regression Model

In [31]:
from pyspark.ml.classification import LogisticRegression

# Set parameters for Logistic Regression
lgr = LogisticRegression(maxIter=10, featuresCol='features', labelCol='points_category')

# Fit the model to the data.
lgrm = lgr.fit(train_df)

# Given a dataset, predict each point's label, and show the results.
predictions = lgrm.transform(test_df)

Py4JJavaError: An error occurred while calling o422.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 21.0 failed 1 times, most recent failure: Lost task 7.0 in stage 21.0 (TID 74, localhost, executor driver): scala.MatchError: [null,1.0,(20,[7,8,16,19],[0.467091309645458,0.3350156782106137,0.3772608279144095,0.4309985243468549])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1$$anonfun$15.apply(LogisticRegression.scala:497)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1$$anonfun$15.apply(LogisticRegression.scala:497)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:298)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1098)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1092)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1161)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1137)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1.apply(LogisticRegression.scala:520)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1.apply(LogisticRegression.scala:494)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:494)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:489)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:279)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: [null,1.0,(20,[7,8,16,19],[0.467091309645458,0.3350156782106137,0.3772608279144095,0.4309985243468549])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1$$anonfun$15.apply(LogisticRegression.scala:497)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1$$anonfun$15.apply(LogisticRegression.scala:497)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:298)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [18]:
predictions.show(3)

+---+-------+--------------------+------+-----+----------+--------------------+--------+------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|_c0|country|         designation|points|price|  province|            region_1|region_2|           variety|              winery|points_category|            filtered|         rawFeatures|            features|       rawPrediction|         probability|prediction|
+---+-------+--------------------+------+-----+----------+--------------------+--------+------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|  4| France|          La Brûlade|    95| 66.0|  Provence|              Bandol|    null|Provence red blend|Domaine de la Bégude|            2.0|[top, wine, la, b...|(20,[0,1,2,4,5,6,...|(20,[0,1,2,4,5,6,...|[-1.551037

In [32]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#print evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="points_category", predictionCol="prediction")

print(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))

NameError: name 'predictions' is not defined

### Word2Vec

In [33]:
# Learn a mapping from words to Vectors
from pyspark.ml.feature import Word2Vec
word2Vec = Word2Vec(vectorSize=2, minCount=0, inputCol="filtered", outputCol="wordVectors")
w2VM = word2Vec.fit(df)
nlpdf = w2VM.transform(df)

In [34]:
nlpdf.select("points_category", "wordVectors").show(2, truncate=False)

+---------------+-----------------------------------------+
|points_category|wordVectors                              |
+---------------+-----------------------------------------+
|2.0            |[0.13242212413913673,0.14012055512931612]|
|2.0            |[-0.2539120897530548,0.34223595694188147]|
+---------------+-----------------------------------------+
only showing top 2 rows



In [35]:
#split data into train and test
splits = nlpdf.randomSplit([0.8, 0.2])
train_df = splits[0]
test_df = splits[1]

train_df.show(1)

+---+-------+-----------------+------+-----+----------+-----------+--------+------------------+------+---------------+--------------------+--------------------+
|_c0|country|      designation|points|price|  province|   region_1|region_2|           variety|winery|points_category|            filtered|         wordVectors|
+---+-------+-----------------+------+-----+----------+-----------+--------+------------------+------+---------------+--------------------+--------------------+
|  0|     US|Martha's Vineyard|    96|235.0|California|Napa Valley|    Napa|Cabernet Sauvignon| Heitz|            2.0|[tremendous, 100%...|[0.13242212413913...|
+---+-------+-----------------+------+-----+----------+-----------+--------+------------------+------+---------------+--------------------+--------------------+
only showing top 1 row



In [36]:
# Set parameters for Logistic Regression
lgr = LogisticRegression(maxIter=10, featuresCol = 'wordVectors', labelCol='points_category')

# Fit the model to the data.
lgrm = lgr.fit(train_df)

# Given a dataset, predict each point's label, and show the results.
predictions = lgrm.transform(test_df)

Py4JJavaError: An error occurred while calling o603.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 30.0 failed 1 times, most recent failure: Lost task 7.0 in stage 30.0 (TID 110, localhost, executor driver): scala.MatchError: [null,1.0,[0.07494956953451037,-0.1474372986704111]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1$$anonfun$15.apply(LogisticRegression.scala:497)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1$$anonfun$15.apply(LogisticRegression.scala:497)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:298)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1098)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1092)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1161)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1137)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1.apply(LogisticRegression.scala:520)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1.apply(LogisticRegression.scala:494)
	at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:494)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:489)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:279)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: [null,1.0,[0.07494956953451037,-0.1474372986704111]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1$$anonfun$15.apply(LogisticRegression.scala:497)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$train$1$$anonfun$15.apply(LogisticRegression.scala:497)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:298)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [24]:
#print evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="points_category", predictionCol="prediction")

print(evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
print(evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))

0.46087065031374785
0.45402426901079673


<b>Exercise</b>: <font color='red'>Fine tune the Word2vec method to improve model accuracy </font>