In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 64kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 32.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=b7964543e1fe481ee9c08b13b68464a407a05a774a14b91460ffc519723aa67b
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.types import StringType,IntegerType
from pyspark.ml.feature import CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover
from pyspark.sql.functions import col, udf,regexp_replace,isnull,lower

In [4]:
spark = SparkSession.builder.appName("PipelineExample").getOrCreate()

In [5]:
df = spark.read.csv("/content/tweet_emotions.csv",header=True,inferSchema=True)

In [6]:
df.show()

+----------+----------+--------------------+
|  tweet_id| sentiment|             content|
+----------+----------+--------------------+
|1956967341|     empty|@tiffanylue i kno...|
|1956967666|   sadness|Layin n bed with ...|
|1956967696|   sadness|Funeral ceremony....|
|1956967789|enthusiasm|wants to hang out...|
|1956968416|   neutral|@dannycastillo We...|
|1956968477|     worry|Re-pinging @ghost...|
|1956968487|   sadness|I should be sleep...|
|1956968636|     worry|Hmmm. http://www....|
|1956969035|   sadness|@charviray Charle...|
|1956969172|   sadness|@kelcouch I'm sor...|
|1956969456|   neutral|    cant fall asleep|
|1956969531|     worry|Choked on her ret...|
|1956970047|   sadness|Ugh! I have to be...|
|1956970424|   sadness|@BrodyJenner if u...|
|1956970860|  surprise|        Got the news|
|1956971077|   sadness|The storm is here...|
|1956971170|      love|@annarosekerr agreed|
|1956971206|   sadness|So sleepy again a...|
|1956971473|     worry|@PerezHilton lady...|
|195697158

In [7]:
def clean_text(c):
  c = lower(c)
  c = regexp_replace(c, "^rt ", "")
  c = regexp_replace(c, "(https?\://)\S+", "")
  c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
  
  return c

clean_df = df.select(clean_text(col("content")).alias("content"),"sentiment")

clean_df.printSchema()
clean_df.show(10)

root
 |-- content: string (nullable = true)
 |-- sentiment: string (nullable = true)

+--------------------+----------+
|             content| sentiment|
+--------------------+----------+
|tiffanylue i know...|     empty|
|layin n bed with ...|   sadness|
|funeral ceremonyg...|   sadness|
|wants to hang out...|enthusiasm|
|dannycastillo we ...|   neutral|
|repinging ghostri...|     worry|
|i should be sleep...|   sadness|
|       hmmm  is down|     worry|
|charviray charlen...|   sadness|
|kelcouch im sorry...|   sadness|
+--------------------+----------+
only showing top 10 rows



In [8]:
(trainDF,testDF) = clean_df.randomSplit((0.7,0.3),seed=42)

## The total number of records (count) of the training data

In [9]:
print("Training Dataset Count: " + str(trainDF.count()))
print("Test Dataset Count: " + str(testDF.count()))

Training Dataset Count: 28085
Test Dataset Count: 11915


In [10]:
labelEncoder = StringIndexer(inputCol='sentiment',outputCol='label')
tokenizer = Tokenizer(inputCol="content", outputCol="words")
remover = StopWordsRemover(inputCol='words', outputCol='words_clean')    
hashingTF = HashingTF(inputCol="words_clean", outputCol="features")    
lr = LogisticRegression(maxIter=30, regParam=0.001)    
pipeline = Pipeline(stages=[labelEncoder,tokenizer,remover, hashingTF, lr])

In [11]:
model = pipeline.fit(trainDF)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception o

In [None]:
prediction=model.transform(testDF)

## The predictions DataFrame (using the show() command)

In [None]:
prediction.show()

## The accuracy of the model

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(prediction)