In [None]:
# on any EC2 or locally
# warcio recompress command may need to be run on each WARC dataset file as sometimes files dont't parse
# the following code is used to extract page titles from WARC files and save them into a Parquet format
import pandas as pd
from bs4 import BeautifulSoup
from numpy import int64
from warcio.archiveiterator import ArchiveIterator

# set filenames
WARCfileName         = 'CC-NEWS-20160828145159-00004_ENG.warc.gz'
WARCfileNameNoExt   = WARCfileName.split('.')[0]
parquetFileName     = WARCfileName +'.parquet'

def build_titles_df():
    with open(WARCfileName, 'rb') as stream:
            recordCounter = 0
            for record in ArchiveIterator(stream):
                    if record.rec_type == 'response':
                        payload_content = record.raw_stream.read()
                        soup             = BeautifulSoup(payload_content, 'html.parser')
                        if (soup.title is not None):
                            title = soup.title.string
                            df.loc[recordCounter] = [title]

                    recordCounter += 1

    df.head()

print("Generating dataframe")
df = pd.DataFrame(columns=(['Title']))

print("Working....")
build_titles_df()

df.to_parquet(parquetFileName)
print("DONE!")

On Spark Cluster

#### Data Loading CLI commands

mkdir GoogleTrainingData
cd GoogleTrainingData/
aws s3 cp s3://litter-box/GoogleNewsTrainingData/Random20Files .  --recursive
hdfs dfs -mkdir hdfs:///GoogleTrainingData
hdfs dfs -put * hdfs:///GoogleTrainingData
mkdir CC-News-En-Titles-Only
aws s3 cp s3://litter-box/CC-News-En-Titles-Only/ ./CC-News-En-Titles-Only --recursive
hdfs dfs -mkdir hdfs:///CC-News-En-Titles-Only
hdfs dfs -put * hdfs:///CC-News-En-Titles-Only
hdfs dfs -mkdir hdfs:///PredictionResults

In [None]:
# the following code is run in the PySpark prompt

from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from pyspark.ml import Pipeline
import pyspark.sql.functions as f
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

parDF1=spark.read.parquet("hdfs:///GoogleTrainingData/*.parkquet")
parDF1.count()

parDF2=spark.read.parquet("hdfs:///CC-News-En-Titles-Only/*.parquet")
parDF2.count()

parDF1 = parDF1.withColumn('index', f.monotonically_increasing_id())

trainDataset    = parDF1
predictDataset  = parDF2

# splitting the dataset int 160k and 40k records for train/test
trainDataset = parDF1.sort('index').limit(160000)
testDataset = parDF1.sort('index', ascending = False).limit(40000)

trainDataset.groupBy("topic").count().orderBy(col("count").desc()).show()
testDataset.groupBy("topic").count().orderBy(col("count").desc()).show()


document_assembler = DocumentAssembler() .setInputCol("title") .setOutputCol("document")

tokenizer = Tokenizer() .setInputCols(["document"]) .setOutputCol("token")

bert_embeddings = BertEmbeddings().pretrained(name='small_bert_L4_256', lang='en') .setInputCols(["document",'token']).setOutputCol("embeddings")

embeddingsSentence = SentenceEmbeddings() .setInputCols(["document", "embeddings"]) .setOutputCol("sentence_embeddings") .setPoolingStrategy("AVERAGE")

classsifierdl = ClassifierDLApproach().setInputCols(["sentence_embeddings"]).setOutputCol("class").setLabelColumn("topic").setMaxEpochs(10).setLr(0.001).setBatchSize(8).setEnableOutputLogs(True)#.setOutputLogsPath('logs')

bert_clf_pipeline = Pipeline(stages=[document_assembler,tokenizer,bert_embeddings,embeddingsSentence,classsifierdl])

# training the model - this may take a fairly long time
# for 5 files start time - 10:14am, end time -  10:22am
bert_clf_pipelineModel = bert_clf_pipeline.fit(trainDataset)

# make sanity check predictions
preds = bert_clf_pipelineModel.transform(testDataset)
preds_df = preds.select('topic','title','class.result')
preds_df.count()
preds_df.show(20)

# make predictions against CC-News-En dataset
preds = bert_clf_pipelineModel.transform(predictDataset)
preds_df = preds.select('topic','title','class.result')
preds_df.count()
preds_df.show(20)

#exporting to parkquet to perform analysis in a regular Jupyter notebook
preds_df.write.parquet("hdfs:///preds_df.parquet")

In [2]:
# the following code is used to evaluate classifier accuracy
# it is run in Jupyter notebook because I wasnt able to load scikit-learn (sklearn) on EMR
# accuracy matrix is just a sanity check example, full model is still learning/processing

import pandas as pd
from sklearn.metrics import classification_report

preds_df = pd.read_parquet("part-00000-18296d15-3f4f-447f-9ec3-9c4ad3a8d2ed-c000.snappy.parquet")
preds_df['result'] = preds_df['result'].apply(lambda x : x[0])
print (classification_report(preds_df['topic'], preds_df['result']))

  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


               precision    recall  f1-score   support

     BUSINESS       0.67      0.70      0.68      3642
ENTERTAINMENT       0.69      0.76      0.73      2862
       HEALTH       0.51      0.76      0.61      1739
       NATION       0.52      0.67      0.59      3540
      SCIENCE       0.00      0.00      0.00       439
       SPORTS       0.79      0.83      0.81      3755
   TECHNOLOGY       0.70      0.67      0.69      2026
        WORLD       0.00      0.00      0.00      1997

     accuracy                           0.65     20000
    macro avg       0.49      0.55      0.51     20000
 weighted avg       0.58      0.65      0.61     20000



  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
# this shows the test/train dataset and the number labelled examples for training

>>> trainDataset.groupBy("topic").count().orderBy(col("count").desc()).show()
+-------------+-----+
|        topic|count|
+-------------+-----+
|       SPORTS|29781|
|     BUSINESS|28577|
|       NATION|28416|
|ENTERTAINMENT|23518|
|        WORLD|16472|
|   TECHNOLOGY|15406|
|       HEALTH|14295|
|      SCIENCE| 3535|
+-------------+-----+

>>> testDataset.groupBy("topic").count().orderBy(col("count").desc()).show()
+-------------+-----+
|        topic|count|
+-------------+-----+
|       SPORTS| 7525|
|     BUSINESS| 7206|
|       NATION| 7038|
|ENTERTAINMENT| 5737|
|        WORLD| 4157|
|   TECHNOLOGY| 3923|
|       HEALTH| 3587|
|      SCIENCE|  827|
+-------------+-----+
