<IMG SRC="https://github.com/jacquesroy/byte-size-data-science/raw/master/images/Banner.png" ALT="BSDS Banner" WIDTH=1195 HEIGHT=200>

# Naive Bayes document classification
[Notebook companion to: http://youtube.com/c/ByteSizeDataScience]

In this notebook, we use an embedded encoding to see if we can group documents.

The documents are a subset of a dataset found at: `http://disi.unitn.it/moschitti/corpora.htm`

It refers to Reuters-21578 90 categories set. I took a subset of the data that is available at `http://disi.unitn.it/moschitti/corpora/Reuters21578-Apte-90Cat.tar.gz`

I am using a total of 10 classifications.files that contained the word blah (as in Blah, blah, blah.).
```
cd Reuters21578-Apte-90Cat/training
for i in `ls -d *`
do
    for j in `grep -l -i  blah $i/*`
    do
        rm $j
    done
done
```

Before creating the dataset, I removed the 

To create the dataset, I used the following script (modify it for your directory structure):
```
mkdir Reuters2
cd Reuters21578-Apte-90Cat/training
for i in "alum" "barley" "coffee" "gold" "housing" "lead" "retail" "rubber" "tin" "wheat"
do
   cd $i
   for f in `ls *`
   do
     cp $f ../../../Reuters2/$i.$f.txt
   done
   cd ..
done
```

Then I zipped the content of the Reuters2 directory to create Reuters.zip

### 028-Document Classification using Naive Bayes
Execute the next cell if you want to see the `Byte Size Data Science` youtube channel video

In [None]:
from IPython.display import IFrame

IFrame(src="https://www.youtube.com/embed/6SpLv8zlnPk?rel=0&amp;controls=0&amp;showinfo=0", width=560, height=315)


## Data preparation
The original dataset includes 572 documents.

The classification is part of the filename. The format is:<br/>
`<fileclass>.<filename>.txt`

### Read the files from a zip file

In [None]:
from pyspark.sql import SparkSession
import types
import pandas as pd
from botocore.client import Config
import ibm_boto3

def __iter__(self): return 0

# @hidden_cell
# The following code accesses a file in your IBM Cloud Object Storage. It includes your credentials.
# You might want to remove those credentials before you share your notebook.
client = ibm_boto3.client(service_name='s3',
    ibm_api_key_id='9OBEPHS0jp5qEFWpF-US8qWWwiqFtRkeH6njgVaar',
    ibm_auth_endpoint="https://iam.bluemix.net/oidc/token",
    config=Config(signature_version='oauth'),
    endpoint_url='https://s3-api.us-geo.objectstorage.service.networklayer.com')

# Your data file was loaded into a botocore.response.StreamingBody object.
# Please read the documentation of ibm_boto3 and pandas to learn more about your possibilities to load the data.
# ibm_boto3 documentation: https://ibm.github.io/ibm-cos-sdk-python/
# pandas documentation: http://pandas.pydata.org/
spark = SparkSession.builder.getOrCreate()

In [None]:
!rm *.txt Reuters.zip

In [None]:
client.download_file(Bucket='bscstesting-donotdelete-pr-paqxy5fmsmaykn', 
                     Key='Reuters.zip', Filename='Reuters.zip')

In [None]:
!unzip Reuters.zip

In [None]:
!rm -rf __MACOSX

In [None]:
NaiveBayesSet0 = sc.wholeTextFiles("./*.txt")
NaiveBayesSet0.count()

<b>Convert the input data</b><br/>
We need to split the path into a fileclass and filename and convert the input<br/>
into a three-element array containing:<br/>
&lt;fileclass>, &lt;filename>, &lt;content>

In [None]:
NaiveBayesSet = NaiveBayesSet0.map(
   lambda tuple: ((tuple[0].split("/")[-1]).split(".", 1)[0], 
                  (tuple[0].split("/")[-1]).split(".", 1)[1],tuple[1])
    )
# NaiveBayesSet.first()

In [None]:
# we can create a DataFrame so we can use SQL to manipulate the data
from pyspark.sql.types import *

classNameContent = StructType([StructField("fileclass", StringType(), True),
                               StructField("filename",  StringType(), True),
                               StructField("content",   StringType(), True)])
FinalDataSet = spark.createDataFrame(NaiveBayesSet, classNameContent)
FinalDataSet.registerTempTable("articles")

print("Total number of articles: " + str(FinalDataSet.count()) )
spark.sql(
    "select fileclass, count(filename) as cnt " +
    "from articles " +
    "group by fileclass "
    "order by fileclass limit 20" ).show()


# Data Preparation: Tokenizing
We want to split the articles by non-word characters.

We could use the Tokenizer class:

```from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="content", outputCol="words")
result = tokenizer.transform(FinalDataSet)
```

But this keeps the punctuation so we'll do it differently.

In [None]:
# Split all the text files using non-Word characters and see what we get.
import re
AllTokensNonWordSplit = FinalDataSet.select('content').rdd.flatMap(
                        lambda text: re.findall(r"[\w']+", text.content.lower()) )

# We see over 250 thousand tokens
print("Number of tokens: " + str(AllTokensNonWordSplit.count()) )
print("Number of distinct tokens: " + str(AllTokensNonWordSplit.distinct().count()) )

In [None]:
# Look at the first 30 words
for x in AllTokensNonWordSplit.distinct().take(30) : print(x)

## Findings
Let's look at the word distribution.

In [None]:
# Perform some SQL to find the most common token.
tokens = StructType([StructField("token",  StringType(), True)])

# Create a dataframe from the AllTokensNonWordSplit RDD
AllTokens = spark.createDataFrame(
                   AllTokensNonWordSplit.map(lambda x:[x]), tokens )

AllTokens.registerTempTable("Tokens")

spark.sql("""
    select token, count(token) tokencount 
    from Tokens 
    group by token 
    order by tokencount desc 
    limit 20
    """).toPandas().head(20)


# Filtering
We see that some words are very common. Many of them are likely in every document.
They don't add anything in terms of classification then.

These type of words are called stop words. If we remove them from the documents, we would end up
with smaller documents leading to faster execution. We don't need to do this. 
We'll use a different method to assign weights to the words.

# Modeling and Evaluation
Creating the labeled data points RDDs and running the model.

The first step is to convert the labels, document classes, to numbers.

In [None]:
# Get class
classes=["alum", "barley", "coffee", "gold", "housing", "lead", "retail", "rubber", "tin", "wheat"]
classIx=[0,1,2,3,4,5,6,7,8,9]
classLookupMap=dict(zip(classes,classIx))

## Tokenize the content

In [None]:
# Tokenize the ceontent and convert the fileclass to a number
import re
from pyspark.sql import Row
# Convert content to array of words.
AllTokens_df = FinalDataSet.rdd.map(lambda text: Row(fileclass=classLookupMap[text[0]], filename=text[1], 
                                                     content=re.findall(r"[\w']+" ,text[2].lower())) ).toDF()

AllTokens_df.registerTempTable("allTokens")
# AllTokens_df.printSchema()
AllTokens_df.show(1)

## How many words per document?
Find out if we have documents that seem too short.

In [None]:
spark.sql("""
  select min(sz) minimum, avg(sz) average, max(sz) maximum
  from (
    select size(content) sz
    from allTokens
  )
""").show()

In [None]:
spark.sql("""
  select sz, count(sz) cnt
  from (
    select size(content) sz
    from allTokens
  )
  group by sz
  order by sz
  limit 20
""").show()

In [None]:
# Why are there documents that includes so few words?
spark.sql("""
  select filename, content
  from allTokens
  where size(content) < 35
  """).take(4)
  

In [None]:
AllTokens_df = spark.sql("""
  select *
  from allTokens
  where size(content) > 34
""")
AllTokens_df.registerTempTable("allTokens")

## Prepare the data (TF/IDF)
We want to prepare the data before splitting it in train and test:
- Hashing: convert the words to numbers
- TF: Term frequency. How often a word is found in a document
- IDF: Inverse document frequency. How many documents a term is found in

If we use the tf-idf values, we don't need to remove the stop words since they will likely be removed in the process.

The HashingTF class converts words into number using a hashing algorithm and returns a sparse vector 
that lists the words and count.<br/>
There is one vector per document.

We use a number of words (features) of 10,007 (prime number) even though we have 8,074 distinct words.
This way, there is less chance of collisions in hash values.

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [None]:
hashingTF = HashingTF(numFeatures=10007, inputCol="content", outputCol="rawFeatures")
featurizedData = hashingTF.transform(AllTokens_df).select("fileclass", "filename", "rawFeatures")

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData).select("fileclass", "filename", "features")
rescaledData.registerTempTable("rescaled")
rescaledData.printSchema()

In [None]:
# Take a look at the hashing result
featurizedData.take(1)

In [None]:
# Take a look at IDF result
rescaledData.take(1)

In [None]:
idfModel.idf

In [None]:
# Split the original set in a training and test sets, using an 80 / 20 rule.
(NaiveBayesTrain,NaiveBayesTest) = rescaledData.randomSplit([0.8, 0.2], seed = 23)
NaiveBayesTrain.registerTempTable("train")
NaiveBayesTest.registerTempTable("test")

In [None]:
print("Number ot training documents: " + str(NaiveBayesTrain.count()) )
print("Number ot testing documents: " + str(NaiveBayesTest.count()) )


In [None]:
# Let's see the classes distribution in train and test
spark.sql("""
  select art.fileclass fileclass, art.cnt total, tr.cnt train, te.cnt test
  from (
     select fileclass, count(filename) as cnt 
     from rescaled 
     group by fileclass 
     order by fileclass
   ) art,
   (
     select fileclass, count(filename) as cnt 
     from train 
     group by fileclass 
     order by fileclass
   ) tr,   
   (
     select fileclass, count(filename) as cnt 
     from test 
     group by fileclass 
     order by fileclass   
   ) te
   where art.fileclass = tr.fileclass
   and   art.fileclass = te.fileclass
   order by art.fileclass
""").show()

## Create a Naive Bayes model

In [None]:
nb = NaiveBayes(featuresCol='features',labelCol='fileclass',modelType="multinomial")
model = nb.fit(NaiveBayesTrain)

In [None]:
predictions = model.transform(NaiveBayesTest)

## How good is the classification?
If the documents were evenly distributed over the 10 categories, a random choice would possibly give
us a 10% accuracy.

Considering that we have a total of 572 documents and 199 of them are in **wheat**, if we were
to simply always say the class is **wheat**, we would get 34.8% accuracy.

So, to be successful, the model needs to do better than 34.8% accuracy.

In [None]:
# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="fileclass", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

## More details
For this, we use the RDD interface.

We can get the precision for each class andthe confusion matrix.

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics
metrics = MulticlassMetrics(predictions.select("fileclass", "prediction").rdd.map(lambda v: (float(v[1]), float(v[0]))) )

In [None]:
rclassLookupMap=dict()
for x in classLookupMap.items() :
    rclassLookupMap[x[1]]=x[0]

for i in range(10) :
    print(rclassLookupMap[i] + ": " + str(metrics.precision(i)) )


In [None]:
# Predicted classes are in columns, they are ordered by class label ascending
# The vertical values can be used to calculate the precision of a specific class
arr = metrics.confusionMatrix().toArray()
print(arr)