# Milestone 2
 * Install python mongodb connectors for connecting with Pyspark
    * !pip3 install pymongo[srv]
    * !pip3 install Pyspark
    * !pip3 install mlflow

* Load the data from mongodb
* Perform EDA to study the data
* Select the relevant columns for modeling
* Split the data for train and test 
* Model the data with different algorithms
* Evaluate the models and select the best performing and optimized model
* Save the model using pyfunc/mlflow 

# Data Loading

In [1]:
import pandas as pd

In [19]:
df = pd.read_csv('news.csv')

In [20]:
df.head()

Unnamed: 0,title,summary,link,pub_date,topic
0,Elon Musk: The THREE questions investors shoul...,"Elon Musk, the billionaire CEO of electric car...",https://www.express.co.uk/finance/city/1503278...,08-10-2021 23:09,finance
1,"Elon Musk Posts Puppy Floki In A Tesla, SHIB C...",A Twitter post of Elon Musk's Shiba Inu dog ha...,https://www.ibtimes.com/elon-musk-posts-puppy-...,04-10-2021 17:37,news
2,Cryptocurrency named after Elon Musk's dog sur...,Fans of Elon Musk made a cryptocurrency named ...,https://nypost.com/2021/10/06/cryptocurrency-n...,06-10-2021 15:28,news
3,Elon Musk says Tesla moving headquarters to Texas,Tesla chief Elon Musk says the company is movi...,https://www.news.com.au/breaking-news/elon-mus...,08-10-2021 02:23,news
4,Grimes Trolls Paparazzi with Communist Manifes...,"Grimes, Elon Musks Ex was spotted on the stree...",https://www.yahoo.com/entertainment/grimes-tro...,03-10-2021 22:29,entertainment


In [4]:
df['topic'].count()

25

In [5]:
df['topic'].value_counts()

news             18
tech              2
business          2
entertainment     1
finance           1
science           1
Name: topic, dtype: int64

In [6]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 47.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=23c0d42a246c1a16bbad31d204ab1ea0152d9fe7004b2913a2b98c420215e8ab
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [7]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [13]:
df['label'] = df['topic']

In [21]:
# Pandas to Spark
train_df = spark.createDataFrame(df)

In [18]:
df = df.drop('label')

KeyError: ignored

In [None]:
train_df.columns

In [None]:

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
tokenizer = Tokenizer(inputCol="summary", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(train_df)
mlflow.spark.log_model(model, "spark-model")

#Model Pipeline
regexTokenizer: Tokenization (with Regular Expression)
stopwordsRemover: Remove Stop Words
countVectors: Count vectors (“document-term vectors”)

In [22]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="summary", outputCol="words", pattern="\\W")
# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [23]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "topic", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(train_df)
dataset = pipelineFit.transform(train_df)
dataset.show(5)

+--------------------+--------------------+--------------------+----------------+-------------+--------------------+--------------------+--------------------+-----+
|               title|             summary|                link|        pub_date|        topic|               words|            filtered|            features|label|
+--------------------+--------------------+--------------------+----------------+-------------+--------------------+--------------------+--------------------+-----+
|Elon Musk: The TH...|Elon Musk, the bi...|https://www.expre...|08-10-2021 23:09|      finance|[elon, musk, the,...|[elon, musk, bill...|(67,[0,1,2,3,5,6,...|  4.0|
|Elon Musk Posts P...|A Twitter post of...|https://www.ibtim...|04-10-2021 17:37|         news|[a, twitter, post...|[a, twitter, post...|(67,[0,1,2,3,4,5,...|  0.0|
|Cryptocurrency na...|Fans of Elon Musk...|https://nypost.co...|06-10-2021 15:28|         news|[fans, of, elon, ...|[fans, of, elon, ...|(67,[0,1,2,3,4,5,...|  0.0|
|Elon Musk

In [24]:
# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 18
Test Dataset Count: 7


In [25]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("summary","topic","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                       summary|   topic|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|The era of dog-related cryp...|    news|[0.9336094237015901,0.00707...|  0.0|       0.0|
|Elon Musk, the billionaire ...| finance|[0.8997637586829493,0.01568...|  4.0|       0.0|
|Grimes said she 'trolled' p...|business|[0.8545492536479375,0.00431...|  1.0|       0.0|
|Grimes and Tesla CEO Elon M...|    news|[0.795394413402757,0.014312...|  0.0|       0.0|
|These truly weird photos of...|    news|[0.7678421006758459,0.01898...|  0.0|       0.0|
|After making electric cars ...|    news|[0.6482070601334967,0.02012...|  0.0|       0.0|
+------------------------------+--------+------------------------------+-----+----------+



In [26]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator_lr_cv = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_lr_cv.evaluate(predictions)

0.5194805194805194

#Logistic Regression using TF-IDF Features

In [27]:
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])
pipelineFit = pipeline.fit(train_df)
dataset = pipelineFit.transform(train_df)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("summary","topic","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                       summary|   topic|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|The era of dog-related cryp...|    news|[0.946163743136605,0.005249...|  0.0|       0.0|
|Elon Musk, the billionaire ...| finance|[0.8799097212851666,0.01636...|  4.0|       0.0|
|Grimes said she 'trolled' p...|business|[0.847148770887788,0.004561...|  1.0|       0.0|
|Grimes and Tesla CEO Elon M...|    news|[0.7749901880303055,0.01404...|  0.0|       0.0|
|After making electric cars ...|    news|[0.6936651152995056,0.01652...|  0.0|       0.0|
|These truly weird photos of...|    news|[0.6617509050781417,0.01961...|  0.0|       0.0|
+------------------------------+--------+------------------------------+-----+----------+



In [28]:
evaluator_lr_tf = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_lr_tf.evaluate(predictions)

0.5194805194805194

#Cross-Validation

In [31]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
pipelineFit = pipeline.fit(train_df)
dataset = pipelineFit.transform(train_df)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator_lr_tf, \
                    numFolds=5)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator_cv = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_cv.evaluate(predictions)

0.5952380952380951

#Naive bayes

In [32]:
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("summary","topic","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+-------+------------------------------+-----+----------+
|                       summary|  topic|                   probability|label|prediction|
+------------------------------+-------+------------------------------+-----+----------+
|The era of dog-related cryp...|   news|[0.9999896012069359,9.52542...|  0.0|       0.0|
|Elon Musk, the billionaire ...|finance|[0.9975052807878101,2.77499...|  4.0|       0.0|
|Grimes and Tesla CEO Elon M...|   news|[0.8236860597259401,7.25249...|  0.0|       0.0|
+------------------------------+-------+------------------------------+-----+----------+



In [33]:
evaluator_nb = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_nb.evaluate(predictions)

0.35714285714285715

#Random Forest

In [34]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("summary","topic","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                       summary|   topic|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|Elon Musk, the billionaire ...| finance|[0.8655000000000002,0.00875...|  4.0|       0.0|
|The era of dog-related cryp...|    news|[0.8280000000000001,0.00125...|  0.0|       0.0|
|These truly weird photos of...|    news|[0.7108333333333333,0.04812...|  0.0|       0.0|
|Grimes and Tesla CEO Elon M...|    news|[0.6977916666666666,0.01062...|  0.0|       0.0|
|Grimes said she 'trolled' p...|business|[0.6975833333333332,0.0325,...|  1.0|       0.0|
|After making electric cars ...|    news|[0.6529166666666666,0.01125...|  0.0|       0.0|
|Photo: Getty ImagesPhoto: G...|    news|[0.47375,6.25E-4,0.31375000...|  0.0|       0.0|
+------------------------------+--------+------------------------------+-----+----------+



In [35]:
evaluator_rf = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_rf.evaluate(predictions)

0.5952380952380951

In [41]:
# Results
print(f"Accuracy of logReg using Count Vectorizer: {evaluator_lr_cv.evaluate(predictions):.3f}")
print(f"Accuracy of logReg using TF-IDF:{evaluator_lr_tf.evaluate(predictions):.3f}")
print(f"Accuracy of logReg using cross validation - hyper parameter tuning:{evaluator_lr_tf.evaluate(predictions):.3f}")
print(f"Accuracy of logReg using naive bayes:{evaluator_nb.evaluate(predictions):.3f}")
print(f"Accuracy of logReg using random forest:{evaluator_rf.evaluate(predictions):.3f}")

Accuracy of logReg using Count Vectorizer: 0.595
Accuracy of logReg using TF-IDF:0.595
Accuracy of logReg using cross validation - hyper parameter tuning:0.595
Accuracy of logReg using naive bayes:0.595
Accuracy of logReg using random forest:0.595


In [None]:
import pickle

#Pickle implementation for Linear Regression
pickle.dump(lrModel, open('lr_model.pkl','wb'))
# Loading model to compare the results
model = pickle.load(open('lr_model.pkl','rb'))
print(model)

TypeError: ignored