In [1]:
#Data Ingestion and Extraction
from pyspark.sql import SparkSession
jobDir = "tweets1.json"
tweets = spark.read.json([jobDir])
tweets.count() 

7992

In [2]:
tweets = tweets.select("text", \
                     "Category" )

tweets.printSchema()

root
 |-- text: string (nullable = true)
 |-- Category: string (nullable = true)



In [3]:
#Model Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words")

countVectors = CountVectorizer(inputCol="words", outputCol="features")



In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(tweets)
dataset = pipelineFit.transform(tweets)
dataset.show(10,50)

+-------------------------------------------------+--------+--------------------------------------------------+--------------------------------------------------+-----+
|                                             text|Category|                                             words|                                          features|label|
+-------------------------------------------------+--------+--------------------------------------------------+--------------------------------------------------+-----+
|               والل عجب عشان كتاب انجليز صعب كلم |     POS|         [والل, عجب, عشان, كتاب, انجليز, صعب, كلم]|(17072,[68,109,147,351,644,1456,6074],[1.0,1.0,...|  2.0|
|               انه مفيد جدان انا اتعلم كثير اشيء |     POS|         [انه, مفيد, جدان, انا, اتعلم, كثير, اشيء]|(17072,[19,22,126,3800,9802,13014,16094],[1.0,1...|  2.0|
|           انه رنامج رايع يترجم كلم قطع باقص سرع |     POS|    [انه, رنامج, رايع, يترجم, كلم, قطع, باقص, سرع]|(17072,[19,147,242,251,1087,2266,3234,14862]

In [5]:
#Partition Training & Test sets
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 5654
Test Dataset Count: 2335


In [6]:
#Logistic Regression using Count Vector Features
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("text","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 10)

+----------+--------+-----------+-----+----------+
|      text|Category|probability|label|prediction|
+----------+--------+-----------+-----+----------+
|قل اذا ...|     NEG| [0.9999...|  0.0|       0.0|
|رييس تح...|     NEG| [0.9930...|  0.0|       0.0|
|اين دول...|     NEG| [0.9891...|  0.0|       0.0|
|يوم است...|     NEG| [0.9768...|  0.0|       0.0|
|صار ريق...|     NEG| [0.9696...|  0.0|       0.0|
|سعوديه ...|     NEG| [0.9679...|  0.0|       0.0|
|تحدث جن...|     POS| [0.9670...|  2.0|       0.0|
|حقين سك...|     NEG| [0.9655...|  0.0|       0.0|
|ليش شرك...|     NEG| [0.9651...|  0.0|       0.0|
|محمد بن...|     NEG| [0.9651...|  0.0|       0.0|
+----------+--------+-----------+-----+----------+
only showing top 10 rows



In [7]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.6792656838629382

In [8]:
#Logistic Regression using TF-IDF Features
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, hashingTF, idf, label_stringIdx])
pipelineFit = pipeline.fit(tweets)
dataset = pipelineFit.transform(tweets)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("text","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 100)


+----------------------------------------------------------------------------------------------------+--------+---------------------------------------------------------------+-----+----------+
|                                                                                                text|Category|                                                    probability|label|prediction|
+----------------------------------------------------------------------------------------------------+--------+---------------------------------------------------------------+-----+----------+
|قل اذا هدد وتح ضغط بعت بنان نذهب جميع لنجد ندعم اصدق انك نفس شخص اللذ سنه يتكلم يتصرفسعدالحرير قل...|     NEG| [0.9999760363629308,1.682738045158144E-5,7.136256617690415E-6]|  0.0|       0.0|
|اين دوله عربيه اعط فلسطين طلق واحد حرب غزه اخيره ابن سعود ارسل اكفان نصر الله صواريخ كر… اين دوله...|     NEG|[0.9992480937334544,3.488727115958501E-4,4.0303355494990077E-4]|  0.0|       0.0|
|ايا عمر فاروق عود جيوش فرس تنهي تا

In [9]:
#Cross-Validation
pipeline = Pipeline(stages=[regexTokenizer,countVectors, label_stringIdx])
pipelineFit = pipeline.fit(tweets)
dataset = pipelineFit.transform(tweets)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.6811880616486291

In [10]:
pl = predictions.select("label", "prediction")

In [11]:
pl.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        1085|
|  1.0|         600|
|  2.0|         650|
+-----+------------+



In [12]:
pl.groupby('prediction').agg({'prediction': 'count'}).show()

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             1302|
|       1.0|              465|
|       2.0|              568|
+----------+-----------------+



In [13]:
pl.filter(pl.label == pl.prediction).count() / pl.count()

0.6869379014989293

In [15]:
acc = pl.filter(pl.label == pl.prediction).count() / pl.count()
print("Model accuracy: %.3f%%" % (acc * 100)) 

Model accuracy: 68.694%


In [11]:
lrModel.save("myModelPath3")

In [16]:
cvModel.save("vModel")

In [19]:
sameCVModel = CrossValidatorModel.load("vModel")

In [18]:
from pyspark.ml.tuning import  CrossValidatorModel

In [26]:
sameCVModel.transform(testData).show()

+--------------------+--------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|                text|Category|               words|            features|label|       rawPrediction|         probability|prediction|
+--------------------+--------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|                    |     NEG|                  []|       (17072,[],[])|  0.0|[0.38419132882769...|[0.47073646302972...|       0.0|
|                    |     NEG|                  []|       (17072,[],[])|  0.0|[0.38419132882769...|[0.47073646302972...|       0.0|
|                    |     NEG|                  []|       (17072,[],[])|  0.0|[0.38419132882769...|[0.47073646302972...|       0.0|
|                    |     NEG|                  []|       (17072,[],[])|  0.0|[0.38419132882769...|[0.47073646302972...|       0.0|
|                    |     NEG|                  []|       (17072,[],