In [9]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [12]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [19]:
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz


In [20]:
!tar xf spark-3.0.0-bin-hadoop3.2.tgz


In [21]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [22]:
!pip install -q findspark

In [23]:
import findspark
findspark.init()


In [24]:
findspark.find()

'/content/spark-3.0.0-bin-hadoop3.2'

In [26]:
import pyspark as ps
import warnings
from pyspark.sql import SQLContext

In [27]:
try:
  sc = ps.SparkContext('local[4]')
  sqlContext = SQLContext(sc)
  print("Just created a SparkContext")
except ValueError:
  warnings.warn("SparkContext already exists in this scope")

Just created a SparkContext


In [28]:
sc.master

'local[4]'

In [45]:
df = sqlContext.read.csv('/content/drive/My Drive/train.csv')
(train_set, val_set) = df.randomSplit([0.85,0.15], seed=2000)
test_set = sqlContext.read.csv('/content/drive/My Drive/test.csv')

In [30]:
type(df)

pyspark.sql.dataframe.DataFrame

In [31]:
df.show(5)


+---------+--------------------+
|      _c0|                 _c1|
+---------+--------------------+
|Sentiment|               Tweet|
|        4|ohh could life ge...|
|        0|says GIANTY OOLLL...|
|        4|@nursedoublek hav...|
|        0|@RubiaNY can you ...|
+---------+--------------------+
only showing top 5 rows



In [32]:
df = df.dropna()

In [33]:
df.count()

1048576

In [41]:
import numpy as np
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [55]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="_c1", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5)
label_stringIdx = StringIndexer(inputCol="_c0", outputCol="label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)


+---+--------------------+--------------------+--------------------+--------------------+-----+
|_c0|                 _c1|               words|                  tf|            features|label|
+---+--------------------+--------------------+--------------------+--------------------+-----+
|  0|                 ...|[, , , , , , , , ...|(65536,[11228,190...|(65536,[11228,190...|  0.0|
|  0|    Nobody likes ...|[, , , , nobody, ...|(65536,[372,10014...|(65536,[372,10014...|  0.0|
|  0|          fed up....|[, , , , fed, up....|(65536,[21954,525...|(65536,[21954,525...|  0.0|
|  0|   &lt;---Sad lev...|[, , , &lt;---sad...|(65536,[1880,2941...|(65536,[1880,2941...|  0.0|
|  0|         @Beansummer| [, , , @beansummer]|(65536,[8623,5257...|(65536,[8623,5257...|  0.0|
+---+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [56]:
from pyspark.ml.classification import LogisticRegression
import time

t0= time.time()
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)
print(time.time() - t0, "seconds wall time")

93.01029849052429 seconds wall time


In [66]:
predictions.show(5)

+---+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|_c0|                 _c1|               words|                  tf|            features|label|       rawPrediction|         probability|prediction|
+---+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|  0|  *pout*  I want ...|[, , *pout*, , i,...|(65536,[7896,1666...|(65536,[7896,1666...|  0.0|[1.27160849091722...|[0.78101797069004...|       0.0|
|  0|  General Motors ...|[, , general, mot...|(65536,[12986,306...|(65536,[12986,306...|  0.0|[1.19732474669432...|[0.76804852896482...|       0.0|
|  0|  Just finished m...|[, , just, finish...|(65536,[835,3114,...|(65536,[835,3114,...|  0.0|[1.42969713695218...|[0.80685412180978...|       0.0|
|  0|  *sends you e-hugs*|[, *sends, you, e...|(65536,[17000,183...|(65536,[17000,183...|  0.0|[-0.0355265

In [64]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")


In [67]:
evaluator.evaluate(predictions)

Py4JJavaError: ignored

In [62]:
evaluator.getMetricName()

'areaUnderROC'