In [1]:
import sparknlp 

spark = sparknlp.start()

print("Spark NLP version")
sparknlp.version()
print("Apache Spark version")
spark.version

Spark NLP version
Apache Spark version


'2.4.5'

In [2]:
spark

In [3]:
raw_tweets = spark.read.format("csv").option("inferSchema", 'true').option("header", 'false').option("sep", ",").load('tweets.csv')

In [4]:
raw_tweets.printSchema

<bound method DataFrame.printSchema of DataFrame[_c0: int, _c1: bigint, _c2: string, _c3: string, _c4: string, _c5: string]>

In [5]:
from pyspark.sql.functions import col 
raw_tweets = raw_tweets.select(col('_c0'),col('_c5'))\
.withColumnRenamed('_c0', 'Target')\
.withColumnRenamed('_c5', 'Text')\
.dropDuplicates()

In [6]:
raw_tweets.count()

1583691

In [7]:
raw_tweets.show(5)

+------+--------------------+
|Target|                Text|
+------+--------------------+
|     0|I feel like a com...|
|     0|@KishoreK this is...|
|     0|@InYourEyes2410 I...|
|     0|       A little sad |
|     0|I'm off too bed. ...|
+------+--------------------+
only showing top 5 rows



In [8]:
import sparknlp
from sparknlp.annotator import *
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, StringIndexer

In [9]:
document_assembler = DocumentAssembler().setInputCol("Text").setOutputCol("document")

sentenceDetector = SentenceDetector().setInputCols(["document"]).setOutputCol("sentences").setUseAbbreviations(True)

tokenizer = Tokenizer().setInputCols(["sentences"]).setOutputCol("token")

normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normal")

stop_word = StopWordsCleaner().setInputCols(["normal"]).setOutputCol('clean')

stemmer = Stemmer().setInputCols(['clean']).setOutputCol('stem')

finisher = Finisher().setInputCols(["stem"]).setOutputCols("token_features")

CountVectors = CountVectorizer(inputCol= "token_features", outputCol= "features", vocabSize = 10000, minDF= 5)

label_stringIDx = StringIndexer(inputCol= 'Target', outputCol="label")


nlpPipeline = Pipeline(stages=[
 document_assembler, 
 sentenceDetector,
 tokenizer,
 normalizer,
 stop_word,   
 stemmer,
 finisher,
 CountVectors,
 label_stringIDx
 ])

pipelineModel = nlpPipeline.fit(raw_tweets)

In [10]:
processed = pipelineModel.transform(raw_tweets)
print(processed.count())

1583691


In [11]:
processed.select("features").take(1)

[Row(features=SparseVector(10000, {0: 1.0, 2: 1.0, 5: 1.0, 6: 1.0, 18: 1.0, 23: 1.0, 116: 1.0, 144: 1.0, 280: 1.0, 521: 1.0, 1728: 1.0}))]

In [12]:
train,test = processed.randomSplit(weights=[0.7,0.3], seed = 102)

In [13]:
print(train.count())
print(test.count())

1109771
473920


In [14]:
train.printSchema()

root
 |-- Target: integer (nullable = true)
 |-- Text: string (nullable = true)
 |-- token_features: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



In [15]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(train)

# get training summary used for eval metrics and other params
lrTrainingSummary = lrModel.summary

In [17]:
# make predictions on test data
lrPredictions = lrModel.transform(test)

# display predictions
lrPredictions.select("label", "prediction", "probability").limit(10).toPandas()

Unnamed: 0,label,prediction,probability
0,1.0,1.0,"[0.30657493332798763, 0.6934250666720123]"
1,1.0,0.0,"[0.6044232531691004, 0.39557674683089966]"
2,1.0,0.0,"[0.5228333362444512, 0.4771666637555487]"
3,1.0,1.0,"[0.015941219674955865, 0.9840587803250442]"
4,1.0,0.0,"[0.8383991946981377, 0.1616008053018623]"
5,1.0,0.0,"[0.6573956471613437, 0.34260435283865637]"
6,1.0,1.0,"[0.04602579040946332, 0.9539742095905368]"
7,1.0,1.0,"[0.014705683858720286, 0.9852943161412797]"
8,1.0,1.0,"[0.33097732856222606, 0.6690226714377739]"
9,1.0,0.0,"[0.6193618941371545, 0.3806381058628454]"


In [18]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol= "prediction")

evaluator.evaluate(lrPredictions)

0.7759629349395232

In [19]:
from sklearn.metrics import confusion_matrix, classification_report

y_true = lrPredictions.select("label")
y_true = y_true.toPandas()

y_pred = lrPredictions.select("prediction")
y_pred = y_pred.toPandas()

In [21]:
cf_matrix = confusion_matrix(y_true.label,y_pred.prediction)
cf_matrix

array([[190729,  46634],
       [ 59459, 177098]])

In [23]:
print(classification_report(y_true.label,y_pred.prediction))

              precision    recall  f1-score   support

         0.0       0.76      0.80      0.78    237363
         1.0       0.79      0.75      0.77    236557

    accuracy                           0.78    473920
   macro avg       0.78      0.78      0.78    473920
weighted avg       0.78      0.78      0.78    473920

