In [3]:
%matplotlib inline
import matplotlib
import seaborn as sns
sns.set()
matplotlib.rcParams['figure.dpi'] = 144

In [4]:
from datetime import datetime, timedelta

# Spark Miniproject


Stack Overflow is a collaboratively edited question-and-answer site originally focused on programming topics. Because of the variety of features tracked, including a variety of feedback metrics, it allows for some open-ended analysis of user behavior on the site.

Stack Exchange (the parent organization) provides an anonymized [data dump](https://archive.org/details/stackexchange), and we'll use Spark to perform data manipulation, analysis, and machine learning on this data set.

## Accessing the data


The data is available and There are three sub-folders, `allUsers`, `allPosts`, and `allVotes`, which contain Gzipped XML with the following format:

```
<row Body="&lt;p&gt;I always validate my web pages, and I recommend you do the same BUT many large company websites DO NOT and cannot validate because the importance of the website looking exactly the same on all systems requires rules to be broken. &lt;/p&gt;&#10;&#10;&lt;p&gt;In general, valid websites help your page look good even on odd configurations (like cell phones) so you should always at least try to make it validate.&lt;/p&gt;&#10;" CommentCount="0" CreationDate="2008-10-12T20:26:29.397" Id="195995" LastActivityDate="2008-10-12T20:26:29.397" OwnerDisplayName="Eric Wendelin" OwnerUserId="25066" ParentId="195973" PostTypeId="2" Score="0" />
```

Data from the much smaller stats.stackexchange.com is available in the same format as spark-stats-data. This site, Cross-Validated, will be used below in some instances to avoid working with the full data set for every step

In [None]:
see = open('stack_exchange_schema.txt')

In [None]:
see.show()

You can either get the data by running the appropriate S3 commands in the terminal, or by running this block for the smaller stats data set:

## Data input and parsing


Some rows are split across multiple lines; these can be discarded. Incorrectly formatted XML can also be ignored. It is enough to simply skip problematic rows, the loss of data will not significantly impact our results on this large data sets.

We will need to handle XML parsing using  `lxml.etree`.

The goal should be to have a parsing function that can be applied to the input data to access any XML element desired. 

## Bad XML


A simple step to test our parsing function. We create an RDD of Post objects where each Post is a valid row of XML from the Cross-Validated (stats.stackexchange.com) `allPosts` data set.

We are going to take several shortcuts to speed up and simplify our computations.  First, our parsing function only attempts to parse rows that start with `  <row` as these denote actual data entries. 

Next we will return the total number of XML rows that started with ` <row` that were subsequently **rejected** during your processing.

Note that this cleaned data set will be used for future parts.


In [6]:
from lxml import etree
import os
from pyspark import SparkContext
sc = SparkContext("local[*]", "temp")

def localpath(path):
    return 'file://' + os.path.join(os.path.abspath(os.path.curdir), path)

In [7]:
Posts = sc.textFile(localpath('spark-stats-data/allPosts'))

In [8]:
def XmlFilter(x):
    if '<row' in x:
        try: 
            etree.fromstring(x)
            return True
        except:
            return False
    return False
def BadXmls(x):
    if '<row' in x:
        try:
            etree.fromstring(x.encode('utf-8'))
            return ('Good', 1)
        except:
            return ('Bad', 1)
    return ('NoTRelev', 1)

In [None]:
CleanPost = Posts.filter(XmlFilter)
count = CleanPost.map(lambda x: BadXmls(x))\
             .reduceByKey(lambda x, y: x+y)
bad_xml = count.collect()

## Favorites and scores

We're interested in looking for useful patterns in the data.  If we look at the Post data again (the smaller set, `stats.stackexchange.com`), we see that many things about each post are recorded.  We're going to start by looking to see if there is a relationship between the number of times a post was favorited (the `FavoriteCount`) and the `Score`.  The score is the number of times the post was upvoted minus the number of times it was downvoted, so it is a measure of how much a post was liked.  We'd expect posts with a higher number of favorites to have better scores, since they're both measurements of how good the post is.

Let's aggregate posts by the number of favorites, and find the average score for each number of favorites. We do this for the lowest 50 numbers of favorites.


**Checkpoints**

- Total score across all posts: 299469
- Mean of first 50 favorite counts (averaging the keys themselves): 24.76

In [9]:
def ZeroNone(x):
    if x: return int(x)
    else: return 0

In [10]:
def AggregateScore(x):
    score = ZeroNone(etree.fromstring(x).get('Score'))
    favorite = ZeroNone(etree.fromstring(x).get('FavoriteCount'))
    return (favorite, (score, 1))


In [None]:
favorite_score = CleanPost.map(lambda x : AggregateScore(x))\
                          .reduceByKey(lambda x,y : (x[0]+y[0], x[1]+y[1]))\
                           .map(lambda x: (x[0], x[1][0]/x[1][1]))\
                            .sortByKey()
                          

In [None]:
favorite_score = favorite_score.take(50)

## Answer percentage


Now we investigate the correlation between a user's reputation and the kind of posts they make. For the 99 users with the highest reputation, we will single out posts which are either questions or answers and look at the percentage of these posts that are answers: *(answers / (answers + questions))*. 

We only will run this on the statistics overflow set.


#### Checkpoints

* Total questions: 52,060
* Total answers: 55,304
* Top 99 users' average reputation: 11893.464646464647

In [None]:
users = sc.textFile(localpath('spark-stats-data/allUsers'))

In [None]:
cleanUsrs = users.filter(XmlFilter)

In [11]:
def UserReputation(x):
    ID = ZeroNone(etree.fromstring(x).get('Id'))
    reput =ZeroNone(etree.fromstring(x).get('Reputation'))
    return (ID, reput)

In [12]:
def QuestionsAnswers(x):
    ID = ZeroNone(etree.fromstring(x).get('OwnerUserId'))
    Ptype = ZeroNone(etree.fromstring(x).get('PostTypeId'))
    if Ptype==2:
        ans, Q =1, 0
    elif Ptype==1:
        ans, Q = 0, 1
    else:
        ans, Q = 0, 0
    return (ID, (Q, ans))

def AnswerPercentage(x):
    if x[1][1]== 0 : return (x[0], 0)
    else: return (x[0], x[1][1]/(x[1][0]+x[1][1]))
    


In [None]:
test = CleanPost.map(lambda x: QuestionsAnswers(x))\
                .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))


In [None]:
PostIDQA = CleanPost.map(lambda x: QuestionsAnswers(x))\
                    .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))\
                    .map(lambda x: AnswerPercentage(x))

In [None]:
UserIDReputation = cleanUsrs.map(lambda x : UserReputation(x))\
                          .sortBy(lambda x: -x[1])

In [None]:
UserIDReputation = sc.parallelize(UserIDReputation.take(100))

In [None]:
USerIDQA = UserIDReputation.join(PostIDQA)


In [None]:
Answer3rd = USerIDQA.map(lambda x: (x[0],x[1][1]))

In [None]:
answer_percentage = Answer3rd.take(100)

## First question of the user

We'd expect the first **question** a user asks to be indicative of their future behavior.  We'll dig more into that in the next problem, but for now let's see the relationship between reputation and how long it took each person to ask their first question.

For each user that asked a question, we find the difference between when their account was created (`CreationDate` for the User) and when they asked their first question (`CreationDate` for their first question).  Then we return this time difference in days (round down, so 2.7 days counts as 2 days) for the 100 users with the highest reputation, in the form

`(UserId, Days)`

**Checkpoints**
- Users that asked a question: 23134
- Average number of days (round each user's days, then average): 30.1074258

In [13]:
def FilteringQuestions(x):
    Ptype = ZeroNone(etree.fromstring(x).get('PostTypeId'))
    if Ptype ==1: return True
    else: return False

In [None]:
Questions = CleanPost.filter(FilteringQuestions)

In [14]:
def UserCreationDateReputation(x):
    ID = ZeroNone(etree.fromstring(x).get('Id'))
    Date = datetime.strptime(etree.fromstring(x).get('CreationDate')[:19],'%Y-%m-%dT%H:%M:%S' )
    reput =ZeroNone(etree.fromstring(x).get('Reputation'))
    return (ID, (Date, reput))

In [None]:
Users = cleanUsrs.map(lambda x: UserCreationDateReputation(x))

In [15]:
def PostDates(x):
    ID = ZeroNone(etree.fromstring(x).get('OwnerUserId'))
    QCreationDate = datetime.strptime(etree.fromstring(x).get('CreationDate')[:19],'%Y-%m-%dT%H:%M:%S')
    return (ID, QCreationDate)

In [None]:
FirstPost = Questions.map(lambda x: PostDates(x))\
                    .reduceByKey(lambda x,y: min(x, y))

In [None]:
UserDateandPost = FirstPost.join(Users)
First_question = UserDateandPost.map(lambda x: (x[0], (int((x[1][0]-x[1][1][0]).days), x[1][1][1])))\
                                 .sortBy(lambda x: -x[1][1])
First_questionF = First_question.map(lambda x: (x[0], x[1][0]))


In [None]:
first_question = First_questionF.take(100)

## Identify Active Users (veterans)


It can be interesting to think about what factors influence a user to remain active on the site over a long period of time. In order not to bias the results towards older users, we'll define a time window between 100 and 150 days after account creation. If the user has made a post in this time, we'll consider them active and well on their way to being veterans of the site; if not, they are inactive and were likely brief users.

Let's see if there are differences between the first ever question posts of "veterans" vs. "brief users". For each group separately, we average the score, views, number of answers, and number of favorites of the users' **first question**.


In [16]:
def QuestionInfo(x):
    ID = ZeroNone(etree.fromstring(x).get('OwnerUserId'))
    ScoreCount = ZeroNone(etree.fromstring(x).get('Score'))
    ViewCount = ZeroNone(etree.fromstring(x).get('ViewCount'))
    AnswerCount = ZeroNone(etree.fromstring(x).get('AnswerCount'))
    FavoriteCount = ZeroNone(etree.fromstring(x).get('FavoriteCount'))
    CreationDate = datetime.strptime(etree.fromstring(x).get('CreationDate')[:19],'%Y-%m-%dT%H:%M:%S' )
                        
    return(ID, (CreationDate,ScoreCount ,ViewCount, AnswerCount, FavoriteCount))

def comparison(x,y):
    if x[0]<y[0]:
        return x
    return y

In [17]:
def UserCreationDate(x):
    ID = ZeroNone(etree.fromstring(x).get('Id'))
    Date = datetime.strptime(etree.fromstring(x).get('CreationDate')[:19],'%Y-%m-%dT%H:%M:%S' )
    return (ID, Date)

In [None]:
UsersIdDate = cleanUsrs.map(lambda x: UserCreationDate(x))

In [None]:
QuestionInfo = Questions.map(lambda x: QuestionInfo(x))\
                        .reduceByKey(lambda x, y: comparison(x, y))

In [None]:
UserDateDict = {x[0]:x[1] for x in UsersIdDate.collect()}

In [None]:
PostsandDates = CleanPost.map(lambda x: PostDates(x))
PostsandDates.take(2)

In [18]:
def VeteranOrBrief(x):
    if int(x[0]) in UserDateDict:
        days = (x[1] - UserDateDict[x[0]]).days
        if days >= 100 and days <= 150 : V=1
        else : V=0
    else: V = 0
    return (x[0], V)

In [None]:
VeteraBriefs = PostsandDates.map(lambda x: VeteranOrBrief(x))\
                            .reduceByKey(lambda x, y: max(x,y))
VeteranBriefDict ={x[0]:x[1] for x in VeteraBriefs.collect()}

In [19]:
def Veteran(x):
    if x[0] in VeteranBriefDict:
        if VeteranBriefDict[x[0]]==1: return True
        else : return False
    else: return False

In [20]:
def brief(x):
    if x[0] in VeteranBriefDict:
        if VeteranBriefDict[x[0]]==0: return True
        else : return False
    else: return False

In [None]:
Veterans = UsersIdDate.filter(Veteran)
Briefs = UsersIdDate.filter(brief)

In [None]:
VeteransAggregate = Veterans.join(QuestionInfo)
BriefsAggregate= Briefs.join(QuestionInfo)

In [None]:
#Checkpoint
print(VeteransAggregate.count())
print(BriefsAggregate.count())

In [21]:
def Aggregatedreduce(x,y):
    return (x[0]+y[0], x[1]+y[1], x[2]+y[2], x[3]+y[3])

In [None]:
VeteransAggregateF = VeteransAggregate.map(lambda x: ('V', (x[1][1][1], x[1][1][2], x[1][1][3], x[1][1][4])))\
                                     .reduceByKey(lambda x, y: Aggregatedreduce(x,y))
BriefsAggregateF = BriefsAggregate.map(lambda x: ('B', (x[1][1][1], x[1][1][2], x[1][1][3], x[1][1][4])))\
                                     .reduceByKey(lambda x, y: Aggregatedreduce(x,y))

In [None]:
V = [VeteransAggregateF.collect()[0][1][0], VeteransAggregateF.collect()[0][1][1], VeteransAggregateF.collect()[0][1][2], VeteransAggregateF.collect()[0][1][3]]
B = [BriefsAggregateF.collect()[0][1][0], BriefsAggregateF.collect()[0][1][1], BriefsAggregateF.collect()[0][1][2], BriefsAggregateF.collect()[0][1][3]]
V1 = [v/1843 for v in V]
B1 = [b/21252 for b in B]
print(V1)
print(B1)


In [None]:
Final5V = Veterans.map(lambda x: ('alpha', x))\
                  .reduceByKey(lambda x,y: ReduceTuple(x,y))

#### Checkpoints

* Total brief users: 24,864
* Total veteran users: 2,027

In [None]:
identify_veterans = {
    "vet_score":V1[0],
    "vet_views": V1[1],
    "vet_answers": V1[2],
    "vet_favorites":V1[3],
    "brief_score": B1[0],
    "brief_views": B1[1],
    "brief_answers": B1[2],
    "brief_favorites": B1[3]
}

## Identify veterans&mdash;full


Same as above, but on the full Stack Exchange data set.


In [22]:
def VeteranOrBrief_f(x):
    if int(x[0]) in UserDateDict_f:
        days = (x[1] - UserDateDict_f[x[0]]).days
        if days >= 100 and days <= 150 : V=1
        else : V=0
    else: V = 0
    return (x[0], V)

def Veteran_f(x):
    if x[0] in VeteranBriefDict_f:
        if VeteranBriefDict_f[x[0]]==1: return True
        else : return False
    else: return False
def brief_f(x):
    if x[0] in VeteranBriefDict_f:
        if VeteranBriefDict_f[x[0]]==0: return True
        else : return False
    else: return False

In [23]:
users_f = sc.textFile(localpath('spark-stack-data/allUsers'))
post_f = sc.textFile(localpath('spark-stack-data/allPosts'))
cleanuser_f = users_f.filter(XmlFilter)
cleanpost_f = post_f.filter(XmlFilter)

In [None]:

questions_f = cleanpost_f.filter(FilteringQuestions)
UserIdDate_f = cleanuser_f.map(lambda x: UserCreationDate(x))
postIdDate_f = cleanpost_f.map(lambda x: PostDates(x))
UserDateDict_f = {x[0]:x[1] for x in UserIdDate_f.collect()}
VeteraBriefs_f = postIdDate_f.map(lambda x: VeteranOrBrief_f(x))\
                            .reduceByKey(lambda x, y: max(x,y))
VeteranBriefDict_f ={x[0]:x[1] for x in VeteraBriefs_f.collect()}
Veterans_f = UserIdDate_f.filter(Veteran_f)
Briefs_f = UserIdDate_f.filter(brief_f)


In [None]:
print('Veterans Count is '+ str(Veterans_f.count()))
print('Briefs Count is '+ str(Briefs_f.count()))
QuestionInfo_f = questions_f.map(lambda x: QuestionInfo(x))\
                            .reduceByKey(lambda x, y: comparison(x, y))
VeterAggr_f= Veterans_f.join(QuestionInfo_f)
BriefsAggr_f = Briefs_f.join(QuestionInfo_f)
V_fcount = VeterAggr_f.count()
B_fcount = BriefsAggr_f.count()
print('Veterans aggregated Count with first question is '+ str(V_fcount))
print('Briefs aggregated Count with first questionis '+ str(B_fcount))


In [None]:
VeteransAggregate_F = VeterAggr_f.map(lambda x: ('V', (x[1][1][1], x[1][1][2], x[1][1][3], x[1][1][4])))\
                                     .reduceByKey(lambda x, y: Aggregatedreduce(x,y))
BriefsAggregate_F = BriefsAggr_f.map(lambda x: ('B', (x[1][1][1], x[1][1][2], x[1][1][3], x[1][1][4])))\
                                     .reduceByKey(lambda x, y: Aggregatedreduce(x,y))



In [None]:
V_f = [VeteransAggregate_F.collect()[0][1][0], VeteransAggregate_F.collect()[0][1][1], VeteransAggregate_F.collect()[0][1][2], VeteransAggregate_F.collect()[0][1][3]]
B_f = [BriefsAggregate_F.collect()[0][1][0], BriefsAggregate_F.collect()[0][1][1], BriefsAggregate_F.collect()[0][1][2], BriefsAggregate_F.collect()[0][1][3]]
V1_f = [v/V_fcount for v in V_f]
B1_f = [b/B_fcount for b in B_f]
print(V1_f)
print(B1_f)

#### Checkpoints

* Total brief users: 1,848,628
* Total veteran users: 288,285

In [None]:
cealnpost_f

In [None]:
identify_veterans_full = {
    "vet_score":V1_f[0],
    "vet_views": V1_f[1],
    "vet_answers": V1_f[2],
    "vet_favorites":V1_f[3],
    "brief_score": B1_f[0],
    "brief_views": B1_f[1],
    "brief_answers": B1_f[2],
    "brief_favorites": B1_f[3]
}

## Word2vec


Word2Vec is an alternative approach for vectorizing text data. The vectorized representations of words in the vocabulary tend to be useful for predicting other words in the document, hence the famous example "vector('king') - vector('man') + vector('woman') ~= vector('queen')".

Let's see how good a Word2Vec model we can train using the tags of each Stack Exchange post as documents (this uses the full data set). We Use the implementation of Word2Vec from pyspark.ml to return a list of the top 25 closest synonyms to "ggplot2" and their similarity score in tuple format ("string", number).


In [24]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import SQLContext
from pyspark.ml.feature import Word2Vec
sqlContext = SQLContext(sc)

In [25]:
def tagfilter(x):
    if etree.fromstring(x.encode('utf-8')).get('Tags'): return True
    else: False

In [26]:
def tagsOrNot(x):
    tags = etree.fromstring(x.encode('utf-8')).get('Tags')
    if'<' in tags:
        tags = tags.replace('<', ' ')
    if '>' in tags:
        tags = tags.replace('>', ' ')
        tags = tags.split()
    return (tags, 1)
    

In [None]:
test = CleanPost.filter(tagfilter).map(lambda x: tagsOrNot(x)).map(lambda line: (line.split(" "), 1)).toDF(['text', 'score'])
test.collect()

In [None]:
ggplotpost = cleanpost_f.filter(tagfilter).map(lambda x: tagsOrNot(x)).toDF(['text', 'score'])
w2v = Word2Vec(inputCol="text", outputCol="vectors", vectorSize=100, minCount=15, seed=42)
model = w2v.fit(ggplotpost)


In [None]:
ans = model.findSynonyms('ggplot2', 25).rdd.take(25)
# f = ans.apply(lambda x: (x.__getattr__('word'), x.__getattr__('similarity')))
f = []
for x in ans:
    f. append(((x.__getattr__('word'), x.__getattr__('similarity'))))


#### Parameters


The dimensionality of the vector space should be 100. The random seed should be 42 in `PySpark`, 42L in Scala Spark.


#### Checkpoints

* Mean of the top 25 cosine similarities: 0.8012362027168274

## Classification


We'd like to see if we can predict the tags of a question from its body text. Instead of predicting specific tags, we will instead try to predict if a question contains one of the top ten most common tags.  

To this end, we have separated out a train and a test set from the original data.  The training and tests sets were downloaded with the stats data at the beginning of the notebook.  You can also get them from S3:
  * spark-stats-data/posts_train.zip
  * spark-stats-data/posts_test.zip

This will involve two steps: first, we find the ten most common tags for questions in the training data set (the tags have been removed from the test set). Then train a learner to predict from the text of the question (the `Body` attribute) if it should have one of those ten tags in it - we will process the question text with NLP techniques such as splitting the text into tokens.

#### Checkpoints

- Number of training posts with a tag in the top 10: `19908`
- Number without: `17067`

In [133]:
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline

In [105]:
def intersection(a,b):
    return list(set(a) & set(b))

def BodylabelingtagsinTags(x):
    tags = etree.fromstring(x.encode('utf-8')).get('Tags')
    body = etree.fromstring(x.encode('utf-8')).get('Body')
    if'<' in tags:
        tags = tags.replace('<', ' ')
    if '>' in tags:
        tags = tags.replace('>', ' ')
        tags = tags.split()
    if intersection(tags,toptags):l=1
    else: l = 0
    return (body, l)

def listoftags(x):
    tags = etree.fromstring(x.encode('utf-8')).get('Tags')
    if'<' in tags:
        tags = tags.replace('<', ' ')
    if '>' in tags:
        tags = tags.replace('>', ' ')
        tags = tags.split()
    return (tags)

def Questionsfilter(x):
    Ptype = ZeroNone(etree.fromstring(x).get('PostTypeId'))
    if Ptype==1: return True
    else: return False
    
def QuestionsId(x):
    return (x, ZeroNone(etree.fromstring(x).get('Id')))

    

In [28]:
!unzip -d spark-stats-data/train spark-stats-data/posts_train.zip
!unzip -d spark-stats-data/test spark-stats-data/posts_test.zip

Archive:  spark-stats-data/posts_train.zip
replace spark-stats-data/train/part-00001? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C
Archive:  spark-stats-data/posts_test.zip
replace spark-stats-data/test/part-00001? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C


In [67]:
top10tags = sc.textFile(localpath('spark-stats-data/train')).filter(XmlFilter)\
                                                            .filter(tagfilter)\
                                                            .flatMap(lambda tags: listoftags(tags))\
                                                            .map(lambda tag: (tag.lower(), 1))\
                                                            .reduceByKey(lambda x,y: x+y)\
                                                            .sortBy(lambda x: -x[1])

In [43]:
toptags = [t[0] for t in top10tags.collect()[:10]]

In [44]:
toptags

['r',
 'regression',
 'time-series',
 'machine-learning',
 'probability',
 'hypothesis-testing',
 'distributions',
 'self-study',
 'logistic',
 'correlation']

In [108]:
trainset = sc.textFile(localpath('spark-stats-data/train')).filter(XmlFilter)\
                                                          .filter(Questionsfilter)\
                                                          .filter(tagfilter)\
                                                          .map(lambda x: BodylabelingtagsinTags(x))\
                                                          .toDF(['Body', 'label'])

In [116]:
testset = sc.textFile(localpath('spark-stats-data/test')).filter(XmlFilter)\
                                                       .filter(Questionsfilter)\
                                                       .map(lambda x: (etree.fromstring(x.encode('utf-8')).get('Body'), ZeroNone(etree.fromstring(x).get('Id'))) )\
                                                       .sortBy(lambda x: x[1])\
                                                       .toDF(['Body', 'ID'])

In [122]:
Xtrain = []
Y = []
for t in trainset.collect():
    Xtrain.append(t.__getattr__('Body'))
    Y.append(t.__getattr__('label'))


In [123]:
Xtest=[]
for t in testset.collect():
    Xtest.append(t.__getattr__('Body'))


In [137]:
len(Xtest)

4649

In [134]:
LG = LogisticRegression(random_state=123, solver='lbfgs')

bag_of_words_est = Pipeline([
    ("HashingVect", HashingVectorizer()),
    ("LG", LG)
])
bag_of_words_est.fit(Xtrain, Y)

Exception ignored in: <object repr() failed>
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 40, in __del__
    if SparkContext._active_spark_context and self._java_obj is not None:
AttributeError: 'LogisticRegression' object has no attribute '_java_obj'


Pipeline(memory=None,
     steps=[('HashingVect', HashingVectorizer(alternate_sign=True, analyzer='word', binary=False,
         decode_error='strict', dtype=<class 'numpy.float64'>,
         encoding='utf-8', input='content', lowercase=True,
         n_features=1048576, ngram_range=(1, 1), non_negative=False,
         norm='...penalty='l2', random_state=123, solver='lbfgs', tol=0.0001,
          verbose=0, warm_start=False))])

In [135]:
pred = bag_of_words_est.predict(Xtest)

*Copyright &copy; 2019 The Data Incubator.  All rights reserved.*