In [1]:
import findspark
try:
    findspark.init()
    print("Spark found by findspark.")
except:
    print("Spark not found by findspark.")

Spark found by findspark.


In [2]:
!java -version

java version "1.8.0_431"
Java(TM) SE Runtime Environment (build 1.8.0_431-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.431-b10, mixed mode)


In [3]:
import sys
import os
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [4]:
from pyspark.sql.functions import col, concat_ws
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import split, trim, size
from pyspark.sql.functions import length
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import StringIndexer
from textblob import TextBlob
from wordcloud import WordCloud,STOPWORDS
import re
import string
import pandas as pd
from pyspark.sql.types import FloatType

In [5]:
reviews = pd.read_csv('../data/Musical_instruments_reviews.csv')

In [6]:


spark = SparkSession.builder \
    .appName("Amazon Sentiment Analysis") \
    .getOrCreate()

In [7]:
df = spark.createDataFrame(reviews)

In [8]:
# df = spark.read.csv("../data/Musical_instruments_reviews.csv", header=True)
df.show(5)

+--------------+----------+--------------------+--------+--------------------+-------+--------------------+--------------+-----------+
|    reviewerID|      asin|        reviewerName| helpful|          reviewText|overall|             summary|unixReviewTime| reviewTime|
+--------------+----------+--------------------+--------+--------------------+-------+--------------------+--------------+-----------+
|A2IBPI20UZIR0U|1384719342|cassandra tu "Yea...|  [0, 0]|Not much to write...|    5.0|                good|    1393545600|02 28, 2014|
|A14VAT5EAX3D9S|1384719342|                Jake|[13, 14]|The product does ...|    5.0|                Jake|    1363392000|03 16, 2013|
|A195EZSQDW3E21|1384719342|Rick Bennette "Ri...|  [1, 1]|The primary job o...|    5.0|It Does The Job Well|    1377648000|08 28, 2013|
|A2C00NNG1ZQQG2|1384719342|RustyBill "Sunday...|  [0, 0]|Nice windscreen p...|    5.0|GOOD WINDSCREEN F...|    1392336000|02 14, 2014|
| A94QU4C90B1AX|1384719342|       SEAN MASLANKA|  [0, 0

In [9]:
df = df.fillna({'reviewText': 'Missing'})

In [10]:
# Combine 'reviewText' and 'summary' into a new column 'reviews'
df = df.withColumn('reviews', concat_ws('', col('reviewText'), col('summary')))

# Drop the original 'reviewText' and 'summary' columns
df = df.drop('reviewText', 'summary')

In [11]:
df.show()

+--------------+----------+--------------------+--------+-------+--------------+-----------+--------------------+
|    reviewerID|      asin|        reviewerName| helpful|overall|unixReviewTime| reviewTime|             reviews|
+--------------+----------+--------------------+--------+-------+--------------+-----------+--------------------+
|A2IBPI20UZIR0U|1384719342|cassandra tu "Yea...|  [0, 0]|    5.0|    1393545600|02 28, 2014|Not much to write...|
|A14VAT5EAX3D9S|1384719342|                Jake|[13, 14]|    5.0|    1363392000|03 16, 2013|The product does ...|
|A195EZSQDW3E21|1384719342|Rick Bennette "Ri...|  [1, 1]|    5.0|    1377648000|08 28, 2013|The primary job o...|
|A2C00NNG1ZQQG2|1384719342|RustyBill "Sunday...|  [0, 0]|    5.0|    1392336000|02 14, 2014|Nice windscreen p...|
| A94QU4C90B1AX|1384719342|       SEAN MASLANKA|  [0, 0]|    5.0|    1392940800|02 21, 2014|This pop filter i...|
|A2A039TZMZHH9Y|B00004Y2UT| Bill Lewey "blewey"|  [0, 0]|    5.0|    1356048000|12 21, 2

In [12]:

# Define the sentiment function
def sent(overall):
    if overall == 3:
        return 'Neutral'
    elif overall > 3:
        return 'Positive'
    else:
        return 'Negative'

# Register the UDF
sent_udf = udf(sent, StringType())

# Apply the UDF to create the 'Sentiment' column
df = df.withColumn('Sentiment', sent_udf(df['overall']))


In [13]:
df.show(10)

+--------------+----------+--------------------+--------+-------+--------------+-----------+--------------------+---------+
|    reviewerID|      asin|        reviewerName| helpful|overall|unixReviewTime| reviewTime|             reviews|Sentiment|
+--------------+----------+--------------------+--------+-------+--------------+-----------+--------------------+---------+
|A2IBPI20UZIR0U|1384719342|cassandra tu "Yea...|  [0, 0]|    5.0|    1393545600|02 28, 2014|Not much to write...| Positive|
|A14VAT5EAX3D9S|1384719342|                Jake|[13, 14]|    5.0|    1363392000|03 16, 2013|The product does ...| Positive|
|A195EZSQDW3E21|1384719342|Rick Bennette "Ri...|  [1, 1]|    5.0|    1377648000|08 28, 2013|The primary job o...| Positive|
|A2C00NNG1ZQQG2|1384719342|RustyBill "Sunday...|  [0, 0]|    5.0|    1392336000|02 14, 2014|Nice windscreen p...| Positive|
| A94QU4C90B1AX|1384719342|       SEAN MASLANKA|  [0, 0]|    5.0|    1392940800|02 21, 2014|This pop filter i...| Positive|
|A2A039T

In [14]:


# Split 'reviewTime' into 'date' and 'year'
df = df.withColumn('date', split(df['reviewTime'], ',').getItem(0))
df = df.withColumn('year', trim(split(df['reviewTime'], ',').getItem(1)))

# Split 'date' into 'month' and 'day'
df = df.withColumn('month', split(df['date'], ' ').getItem(0))
df = df.withColumn('day', split(df['date'], ' ').getItem(1))

# Drop 'reviewTime' and 'date' columns
df = df.drop('reviewTime', 'date')

# Show the result
df.show(5)


+--------------+----------+--------------------+--------+-------+--------------+--------------------+---------+----+-----+---+
|    reviewerID|      asin|        reviewerName| helpful|overall|unixReviewTime|             reviews|Sentiment|year|month|day|
+--------------+----------+--------------------+--------+-------+--------------+--------------------+---------+----+-----+---+
|A2IBPI20UZIR0U|1384719342|cassandra tu "Yea...|  [0, 0]|    5.0|    1393545600|Not much to write...| Positive|2014|   02| 28|
|A14VAT5EAX3D9S|1384719342|                Jake|[13, 14]|    5.0|    1363392000|The product does ...| Positive|2013|   03| 16|
|A195EZSQDW3E21|1384719342|Rick Bennette "Ri...|  [1, 1]|    5.0|    1377648000|The primary job o...| Positive|2013|   08| 28|
|A2C00NNG1ZQQG2|1384719342|RustyBill "Sunday...|  [0, 0]|    5.0|    1392336000|Nice windscreen p...| Positive|2014|   02| 14|
| A94QU4C90B1AX|1384719342|       SEAN MASLANKA|  [0, 0]|    5.0|    1392940800|This pop filter i...| Positive|

In [15]:
df = df.drop('reviewerName', 'unixReviewTime')

In [16]:
def clean_review(text):
    text = str(text).lower()
    text = re.sub('\[.*?\]', '', text)
    text = re.sub('https?://\S+|www\.\S+', '', text)
    text = re.sub('<.*?>+', '', text)
    text = re.sub('[%s]' % re.escape(string.punctuation), '', text)
    text = re.sub('\n', '', text)
    text = re.sub('\w*\d\w*', '', text)
    return text

In [17]:
clean_review_udf = udf(clean_review, StringType())

In [18]:
df = df.withColumn('reviews', clean_review_udf(df['reviews']))

In [19]:
df.show(5)


+--------------+----------+--------+-------+--------------------+---------+----+-----+---+
|    reviewerID|      asin| helpful|overall|             reviews|Sentiment|year|month|day|
+--------------+----------+--------+-------+--------------------+---------+----+-----+---+
|A2IBPI20UZIR0U|1384719342|  [0, 0]|    5.0|not much to write...| Positive|2014|   02| 28|
|A14VAT5EAX3D9S|1384719342|[13, 14]|    5.0|the product does ...| Positive|2013|   03| 16|
|A195EZSQDW3E21|1384719342|  [1, 1]|    5.0|the primary job o...| Positive|2013|   08| 28|
|A2C00NNG1ZQQG2|1384719342|  [0, 0]|    5.0|nice windscreen p...| Positive|2014|   02| 14|
| A94QU4C90B1AX|1384719342|  [0, 0]|    5.0|this pop filter i...| Positive|2014|   02| 21|
+--------------+----------+--------+-------+--------------------+---------+----+-----+---+
only showing top 5 rows



In [20]:
stop_words= ['yourselves', 'between', 'whom', 'itself', 'is', "she's", 'up', 'herself', 'here', 'your', 'each', 
             'we', 'he', 'my', "you've", 'having', 'in', 'both', 'for', 'themselves', 'are', 'them', 'other',
             'and', 'an', 'during', 'their', 'can', 'yourself', 'she', 'until', 'so', 'these', 'ours', 'above', 
             'what', 'while', 'have', 're', 'more', 'only', "needn't", 'when', 'just', 'that', 'were', "don't", 
             'very', 'should', 'any', 'y', 'isn', 'who',  'a', 'they', 'to', 'too', "should've", 'has', 'before',
             'into', 'yours', "it's", 'do', 'against', 'on',  'now', 'her', 've', 'd', 'by', 'am', 'from', 
             'about', 'further', "that'll", "you'd", 'you', 'as', 'how', 'been', 'the', 'or', 'doing', 'such',
             'his', 'himself', 'ourselves',  'was', 'through', 'out', 'below', 'own', 'myself', 'theirs', 
             'me', 'why', 'once',  'him', 'than', 'be', 'most', "you'll", 'same', 'some', 'with', 'few', 'it',
             'at', 'after', 'its', 'which', 'there','our', 'this', 'hers', 'being', 'did', 'of', 'had', 'under',
             'over','again', 'where', 'those', 'then', "you're", 'i', 'because', 'does', 'all']

def remove_stopwords(text):
    if text is None:
        return ""
    words = text.split()
    filtered = [word for word in words if word not in stop_words]
    return ' '.join(filtered)

remove_stopwords_udf = udf(remove_stopwords, StringType())


In [21]:
df = df.withColumn('reviews', remove_stopwords_udf(df['reviews']))

In [22]:
def get_polarity(text):
    try:
        return float(TextBlob(str(text)).sentiment.polarity)
    except:
        return 0.0

polarity_udf = udf(get_polarity, FloatType())
df = df.withColumn('polarity', polarity_udf(df['reviews']))

In [23]:
df = df.withColumn('review_len', length(df['reviews']))
df = df.withColumn('word_count', size(split(df['reviews'], ' ')))

df.select('reviews', 'polarity', 'review_len', 'word_count').show(5)

+--------------------+-----------+----------+----------+
|             reviews|   polarity|review_len|word_count|
+--------------------+-----------+----------+----------+
|not much write bu...|       0.25|       162|        25|
|product exactly q...|0.014285714|       356|        55|
|primary job devic...|     0.1675|       315|        48|
|nice windscreen p...| 0.33333334|       169|        22|
|pop filter great ...|        0.8|       136|        21|
+--------------------+-----------+----------+----------+
only showing top 5 rows



In [24]:
positive_review = df.filter(df["Sentiment"] == 'Positive').na.drop()
neutral_review  = df.filter(df["Sentiment"] == 'Neutral').na.drop()
negative_review = df.filter(df["Sentiment"] == 'Negative').na.drop()

In [25]:
negative_review.show()

+--------------+----------+----------+-------+--------------------+---------+----+-----+---+------------+----------+----------+
|    reviewerID|      asin|   helpful|overall|             reviews|Sentiment|year|month|day|    polarity|review_len|word_count|
+--------------+----------+----------+-------+--------------------+---------+----+-----+---+------------+----------+----------+
|A2PD27UKAD3Q00|B00005ML71|    [0, 0]|    2.0|bought use keyboa...| Negative|2013|   08| 17|  0.26944444|       430|        63|
|A12ABV9NU02O29|B000068NW5|    [2, 2]|    2.0|didnt expect cabl...| Negative|2011|   07|  6|-0.018707482|       281|        41|
|A1L7M2JXN4EZCR|B000068NW5|    [0, 0]|    1.0|hums crackles thi...| Negative|2014|   02|  9|         0.6|       183|        29|
|A3UD50M7M72150|B000068NW5|    [0, 0]|    1.0|im procheapo hate...| Negative|2014|   03| 14|       -0.25|       106|        16|
|A1W3CEEQBJ4GTN|B000068NZC|    [0, 0]|    2.0|bought canon vixi...| Negative|2013|   09| 16|   0.0734127

In [26]:
df.printSchema()


root
 |-- reviewerID: string (nullable = true)
 |-- asin: string (nullable = true)
 |-- helpful: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviews: string (nullable = true)
 |-- Sentiment: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)
 |-- polarity: float (nullable = true)
 |-- review_len: integer (nullable = true)
 |-- word_count: integer (nullable = false)



In [27]:
df.select("Sentiment").limit(5).show()


+---------+
|Sentiment|
+---------+
| Positive|
| Positive|
| Positive|
| Positive|
| Positive|
+---------+



In [28]:
df.show()

+--------------+----------+--------+-------+--------------------+---------+----+-----+---+-----------+----------+----------+
|    reviewerID|      asin| helpful|overall|             reviews|Sentiment|year|month|day|   polarity|review_len|word_count|
+--------------+----------+--------+-------+--------------------+---------+----+-----+---+-----------+----------+----------+
|A2IBPI20UZIR0U|1384719342|  [0, 0]|    5.0|not much write bu...| Positive|2014|   02| 28|       0.25|       162|        25|
|A14VAT5EAX3D9S|1384719342|[13, 14]|    5.0|product exactly q...| Positive|2013|   03| 16|0.014285714|       356|        55|
|A195EZSQDW3E21|1384719342|  [1, 1]|    5.0|primary job devic...| Positive|2013|   08| 28|     0.1675|       315|        48|
|A2C00NNG1ZQQG2|1384719342|  [0, 0]|    5.0|nice windscreen p...| Positive|2014|   02| 14| 0.33333334|       169|        22|
| A94QU4C90B1AX|1384719342|  [0, 0]|    5.0|pop filter great ...| Positive|2014|   02| 21|        0.8|       136|        21|


In [29]:
df.cache()


DataFrame[reviewerID: string, asin: string, helpful: string, overall: double, reviews: string, Sentiment: string, year: string, month: string, day: string, polarity: float, review_len: int, word_count: int]

In [31]:
pdf = df.select("*").toPandas()
from sklearn import preprocessing

le = preprocessing.LabelEncoder()
pdf['Sentiment'] = le.fit_transform(pdf['Sentiment'])
df = spark.createDataFrame(pdf)

Py4JJavaError: An error occurred while calling o204.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 10.0 failed 1 times, most recent failure: Lost task 7.0 in stage 10.0 (TID 24) (DESKTOP-AJHT4E1 executor driver): java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:210)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:210)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:774)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)


In [None]:
# from pyspark.ml.feature import StringIndexer

# indexer = StringIndexer(inputCol="Sentiment", outputCol="Sentiment_Indexed")
# indexer_model = indexer.fit(df)
# df = indexer_model.transform(df)
# df.select("Sentiment", "Sentiment_Indexed").distinct().show()

Py4JJavaError: An error occurred while calling o901.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 62.0 failed 1 times, most recent failure: Lost task 3.0 in stage 62.0 (TID 164) (DESKTOP-AJHT4E1 executor driver): java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:210)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
	at java.io.DataInputStream.readFully(DataInputStream.java:195)
	at java.io.DataInputStream.readFully(DataInputStream.java:169)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:777)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:210)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
	at java.io.DataInputStream.readFully(DataInputStream.java:195)
	at java.io.DataInputStream.readFully(DataInputStream.java:169)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:777)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)


In [94]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer

spark = SparkSession.builder.getOrCreate()
data = [("Positive",), ("Negative",), ("Neutral",)]
df2 = spark.createDataFrame(data, ["Sentiment"])
indexer = StringIndexer(inputCol="Sentiment", outputCol="Sentiment_Indexed")
model = indexer.fit(df2)
df2 = model.transform(df2)
df2.show()

+---------+-----------------+
|Sentiment|Sentiment_Indexed|
+---------+-----------------+
| Positive|              2.0|
| Negative|              0.0|
|  Neutral|              1.0|
+---------+-----------------+

