# Part 2 - Data Wrangling
---
### Papers Past Topic Modeling

<br/>

Ben Faulks - bmf43@uclive.ac.nz

Xiandong Cai - xca24@uclive.ac.nz

Yujie Cui - ycu23@uclive.ac.nz

In [1]:
import sys, subprocess
sys.path.insert(0, '../utils') # for import customed modules
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.types import *
from utils_data import conf_pyspark, load_dataset

# intiate PySpark
sc, spark = conf_pyspark()

sc

[('spark.driver.port', '45024'),
 ('spark.app.id', 'local-1547694941045'),
 ('spark.app.name', 'local'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.host', '192.168.1.207'),
 ('spark.driver.memory', '62g'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.cores', '6'),
 ('spark.driver.maxResultSize', '4g')]


## 1 Load Data

**Load raw dataset:**

In [2]:
df = load_dataset('papers_past', spark)

nrow_raw = df.count()
print('Shape of dataframe: ({}, {})'.format(nrow_raw, len(df.columns)))
df.sample(False, 0.00001).limit(10).show()

Shape of dataframe: (16731578, 6)
+--------+--------------------+------------------+--------------------+--------------------+--------------------+
|      id|                 url|         publisher|                time|               title|             content|
+--------+--------------------+------------------+--------------------+--------------------+--------------------+
|14479009|http://api.digita...|Poverty Bay Herald|1910-11-12T00:00:...|TOWN EDITION. (Po...|TOWN EDITION.A ma...|
|15564607|http://api.digita...|Poverty Bay Herald|1913-10-11T00:00:...|Page 1 Advertisem...|HUDuART-PARKER LI...|
| 3592912|http://api.digita...|Poverty Bay Herald|1879-06-14T00:00:...|Page 2 Advertisem...|She springs from'...|
| 6732646|http://api.digita...|Poverty Bay Herald|1887-11-26T00:00:...|THE SCANDAL IN PA...|THE SCANDAL IN PA...|
| 4720741|http://api.digita...|     Tuapeka Times|1879-03-22T00:00:...|THE FIELD OF INVE...|THE FIELD OF INVE...|
|34135105|http://api.digita...|    Mataura Ensign|1908

## 2 Missing Values

**Check empty values:**

In [3]:
# Count null
print('Print Null:')
df.select([F.count(F.when(F.col(c).isNull() | (F.col(c) == ''), c)).alias(c) for c in df.columns]).show()

Print Null:
+---+---+---------+----+-----+-------+
| id|url|publisher|time|title|content|
+---+---+---------+----+-----+-------+
|  0|  0|        0|   0|    0|  56232|
+---+---+---------+----+-----+-------+



**Drop rows with empty document:**

In [4]:
# Clean NA to avoid nonetype.
df = df.na.drop(subset=['content'])

**Check again:**

In [5]:
# Count null
print('Print Null:')
df.select([F.count(F.when(F.col(c).isNull() | (F.col(c) == ''), c)).alias(c) for c in df.columns]).show()

Print Null:
+---+---+---------+----+-----+-------+
| id|url|publisher|time|title|content|
+---+---+---------+----+-----+-------+
|  0|  0|        0|   0|    0|      0|
+---+---+---------+----+-----+-------+



## 3 Duplicate Values

**The "id" should be unique, check duplication:**

In [6]:
print('Duplicated id number: ', df.count() - df.select('id').distinct().count())

Duplicated id number:  543700


**There are duplicated "id" in the dataset, show three of them:**

In [7]:
df.groupBy('id').count().where(F.col('count')>1).show(3)

+--------+-----+
|      id|count|
+--------+-----+
|10036037|    2|
|10059447|    2|
|10099968|    2|
+--------+-----+
only showing top 3 rows



**Select the first one to check detail:**

In [8]:
df.filter(df.id == 10036037).show()

+--------+--------------------+------------------+--------------------+--------------------+--------------------+
|      id|                 url|         publisher|                time|               title|             content|
+--------+--------------------+------------------+--------------------+--------------------+--------------------+
|10036037|http://api.digita...|Poverty Bay Herald|1898-01-06T00:00:...|THE EASTERN SITUA...|THE EASTERN SITUA...|
|10036037|http://api.digita...|Poverty Bay Herald|1898-01-06T00:00:...|THE EASTERN SITUA...|THE EASTERN SITUA...|
+--------+--------------------+------------------+--------------------+--------------------+--------------------+



**Check difference of the content:**

In [9]:
import difflib

str1 = df.filter(df.id == 10036037).select('content').collect()[0]['content']
#print(str1 + '\n')

str2 = df.filter(df.id == 10036037).select('content').collect()[1]['content']
#print(str2 + '\n')

diff = difflib.SequenceMatcher(None, str1, str2).ratio()

print('Similarity: ', diff)

Similarity:  0.9994846688997681


**The two duplicates are very close, drop one of them:**

In [10]:
df = df.drop_duplicates(subset=['id'])

**Check duplicate again:**

In [11]:
print('Duplicated id number: ', df.count() - df.select('id').distinct().count())

Duplicated id number:  0


## 4 Abnormal Values

**There should be 68 publishers, check numbers:**

In [12]:
n = df.select('publisher').distinct().count()
print(n)
if n == 68:
    print('Correct! no abnormal values in publishers.')
else:
    print('Error! abnormal values in publishers.')

68
Correct! no abnormal values in publishers.


## 5 Extract Features

### 5.1 Date

**For history documents, it only need date as time unit, we extract "date" column from "time" column:**

In [13]:
# extract feature date
df = df.withColumn('date', df['time'].cast(DateType()))

**Check schema of the dataframe:**

In [14]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- url: string (nullable = true)
 |-- publisher: string (nullable = true)
 |-- time: string (nullable = true)
 |-- title: string (nullable = true)
 |-- content: string (nullable = true)
 |-- date: date (nullable = true)



**Check date range has abnormal values:**

In [15]:
start, end = df.select(F.min('date'), F.max('date')).first()
start, end

(datetime.date(1839, 8, 21), datetime.date(1945, 12, 31))

### 5.2 Advertisements

**Check "title" column to see if it is possible to extract features:**

In [16]:
df.sample(False, 0.00001).limit(10).select('id', 'date', 'title').show(10, False)

+--------+----------+------------------------------------------------------------+
|id      |date      |title                                                       |
+--------+----------+------------------------------------------------------------+
|33583964|1901-08-17|TELEGRAMS. (Otago Daily Times 17-8-1901)                    |
|6577219 |1901-03-25|AN UNFORTUNATE MISTAKE. (Bay Of Plenty Times, 25 March 1901)|
|28322435|1893-04-07|CARNEGIE'S EMPLOYEES. (Auckland Star, 07 April 1893)        |
|13133895|1902-06-06|Telegraphic News. (Thames Star, 06 June 1902)               |
|18336280|1927-05-03|Page 14 Advertisements Column 2 (Evening Post, 03 May 1927) |
|2918534 |1862-11-18|WATER SUPPLY. (Otago Daily Times, 18 November 1862)         |
|28183893|1890-07-25|TELEGRAPHIC" SHIPPING. (Auckland Star, 25 July 1890)        |
|7228753 |1883-01-06|Gossipy Paragraphs. (Otago Witness, 06 January 1883)        |
|16698477|1918-09-07|THE BATTIE FRONT. (Poverty Bay Herald, 07 September 1918)   |
|281

**The "title" column specified advertisement, we extract "ads" column from "title" column:**

In [17]:
# extract feature ads
df = df.withColumn('ads', df.title.contains('dvertisement'))

df.sample(False, 0.00001).limit(10).select('id', 'ads', 'title').show(10, False)

+--------+-----+-------------------------------------------------------------------------------------------------------+
|id      |ads  |title                                                                                                  |
+--------+-----+-------------------------------------------------------------------------------------------------------+
|8806818 |true |Page 2 Advertisements Column 7 (Feilding Star, 30 April 1903)                                          |
|13809064|false|EXTRACT. (Taranaki Herald, 23 September 1903)                                                          |
|5708215 |false|CHRISTMAS. (Wanganui Herald, 24 December 1881)                                                         |
|18654017|false|MOVING SOUTHWARDS (Evening Post, 23 September 1942)                                                    |
|2736372 |false|THE NELSON EXAMINER. Nelson, April 17, 1852. (Nelson Examiner and New Zealand Chronicle, 17 April 1852)|
|8645373 |false|POLITICAL REFORM

### 5.3 Title

**The title consists of three parts: "real title" ("publisher", "date"), we only need "real title" part. Extract real title:**

In [18]:
# remove redandunt parts of title
df = df.withColumn('title_', F.regexp_extract(F.col('title'), '(.*)(\s\(.*\))', 1))

**Check if some titles are not the form "title ("publisher", "date"), which will lead to "title_" column is empty string:**

In [19]:
df.where(F.col('title_') == '').select(['id', 'title_', 'title']).show(5, False)

+--------+------+---------------------+
|id      |title_|title                |
+--------+------+---------------------+
|3656781 |      |Untitled Illustration|
|4832017 |      |Untitled Illustration|
|5417742 |      |Untitled Illustration|
|12676570|      |Untitled Illustration|
|12777321|      |Untitled Illustration|
+--------+------+---------------------+
only showing top 5 rows



**Change empty string in "title_" column to "Untitled Illustration":**

In [20]:
df = df.withColumn(
    'title_',
    F.when(
        F.col('title_') == '',
        F.lit('Untitled Illustration')
    ).otherwise(
        F.col('title_')
    )
)

**Check empty string again:**

In [21]:
# Count null
print('Print Null:')
df.select([F.count(F.when(F.col(c).isNull() | (F.col(c) == ''), c)).alias(c) for c in df.columns]).show()

Print Null:
+---+---+---------+----+-----+-------+----+---+------+
| id|url|publisher|time|title|content|date|ads|title_|
+---+---+---------+----+-----+-------+----+---+------+
|  0|  0|        0|   0|    0|      0|   0|  0|     0|
+---+---+---------+----+-----+-------+----+---+------+



**Print title columns:**

In [22]:
df.sample(False, 0.00001).limit(10).select('id', 'title_', 'title').show(10, False)

+--------+----------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
|id      |title_                                                          |title                                                                                            |
+--------+----------------------------------------------------------------+-------------------------------------------------------------------------------------------------+
|20077538|FOURTEEN VESSELS                                                |FOURTEEN VESSELS (Evening Post, 29 September 1941)                                               |
|32535786|CABLE MESSAGES                                                  |CABLE MESSAGES (Otago Daily Times 2-12-1910)                                                     |
|10261389|AUSTRALIAN. SYDNEY UNDER A CLOUD. HARVEST PROSPECTS IN VICTORIA.|AUSTRALIAN. SYDNEY UNDER A CLOUD. HARVEST PROSPECTS IN 

### 5.4 Region

**In the website of [Papers Past](https://paperspast.natlib.govt.nz), we could find the publisher-region relationship in the [Explore all newspapers](https://paperspast.natlib.govt.nz/newspapers/all#region) webpage. Based on this webpage, we could extract region feature from "publisher" column. Here we saved [the webpage](https://paperspast.natlib.govt.nz/newspapers/all#region) and crawling the publisher-region relationship into a dataframe for extract feature.**

In [23]:
from bs4 import BeautifulSoup

# read webpage
path = r'../temp/Papers Past _ Explore all newspapers.html'
with open(path, 'r') as f:
    html = f.read()

# get table 
soup = BeautifulSoup(html, "html.parser")
table = soup.find('table', attrs={'class':'table datatable'})
table_rows = table.find_all('tr')
res = []
for tr in table_rows:
    td = tr.find_all('td')
    row = [tr.text.strip() for tr in td if tr.text.strip()]
    if row:
        res.append(row)

# transform table to pandas dataframe
df_region = pd.DataFrame(res, columns=['publisher_', 'region', 'start_', 'end_']) # column_ means it will be drop later

# transform pandas dataframe to pyspark dataframe
df_region = spark.createDataFrame(df_region).orderBy('region')

In [24]:
print('Shape of dataframe: ({}, {})'.format(df_region.count(), len(df_region.columns)))
df_region.sample(False, 0.1).show(10, False)

Shape of dataframe: (148, 4)
+-------------------+-----------------+------+----+
|publisher_         |region           |start_|end_|
+-------------------+-----------------+------+----+
|Albertland Gazette |Auckland         |1862  |1864|
|Hot Lakes Chronicle|Bay of Plenty    |1895  |1910|
|Ashburton Guardian |Canterbury       |1879  |1921|
|Ellesmere Guardian |Canterbury       |1891  |1945|
|Globe              |Canterbury       |1874  |1882|
|Matariki           |Gisborne         |1881  |1881|
|Feilding Star      |Manawatu-Wanganui|1882  |1920|
|Nelson Evening Mail|Nelson           |1866  |1922|
|Bruce Herald       |Otago            |1865  |1920|
|Lake County Press  |Otago            |1872  |1928|
+-------------------+-----------------+------+----+
only showing top 10 rows



**Notice that in this publisher-region relationship dataframe, there are two publisher's name is not identical with the dataset: "Bay Of Plenty Times" mismatch by "of", "New Zealand Free Lance" mismatch by "New Zeland", so we modify the** `df_region` **to make it identical with dataset:**

In [25]:
(df_region.filter((df_region.publisher_ == 'Bay of Plenty Times')
                 | (df_region.publisher_ == 'Free Lance'))
 .show(10, False))

+-------------------+-------------+------+----+
|publisher_         |region       |start_|end_|
+-------------------+-------------+------+----+
|Bay of Plenty Times|Bay of Plenty|1872  |1949|
|Free Lance         |Wellington   |1900  |1920|
+-------------------+-------------+------+----+



In [26]:
# update df_region for Bay Of Plenty Times and New Zealand Free Lance
df_region = df_region.withColumn(
    'publisher_',
    F.when(
        F.col('publisher_') == 'Bay of Plenty Times',
        F.lit('Bay Of Plenty Times')
    ).otherwise(
        F.col('publisher_')
    )
).withColumn(
    'publisher_',
    F.when(
        F.col('publisher_') == 'Free Lance',
        F.lit('New Zealand Free Lance')
    ).otherwise(
        F.col('publisher_')
    )
)

**Check if the two publishers' name were modified:**

In [27]:
(df_region.filter((df_region.publisher_ == 'Bay Of Plenty Times')
                 | (df_region.publisher_ == 'New Zealand Free Lance'))
 .show(10, False))

+----------------------+-------------+------+----+
|publisher_            |region       |start_|end_|
+----------------------+-------------+------+----+
|Bay Of Plenty Times   |Bay of Plenty|1872  |1949|
|New Zealand Free Lance|Wellington   |1900  |1920|
+----------------------+-------------+------+----+



**Save the dataframe for later use:**

In [28]:
path = r'../temp/region.csv'

(df_region.select(F.col('publisher_'),
                  F.col('region'))
 .toPandas()
 .to_csv(path, header=False, index=False, encoding='utf-8'))

**Extract region column, and abandon redundant columns:**

In [29]:
df = (df.join(df_region, df.publisher == df_region.publisher_, how='left')
      .select(F.col('id'), 
              F.col('publisher'), 
              F.col('region'), 
              F.col('date'), 
              F.col('ads'), 
              F.col('title_').alias('title'), 
              F.col('content'))
      .orderBy('id')
      )

**Imputing missing value in region column with "unknwon":**

In [30]:
df = df.na.fill({'region':'unknown'})

**Check if miss any field or element:**

In [31]:
print('Print Null and empty string:')
df.select([F.count(F.when(F.col(c).isNull() | (F.col(c) == ''), c)).alias(c) for c in df.columns]).show()

Print Null and empty string:
+---+---------+------+----+---+-----+-------+
| id|publisher|region|date|ads|title|content|
+---+---------+------+----+---+-----+-------+
|  0|        0|     0|   0|  0|    0|      0|
+---+---------+------+----+---+-----+-------+



**Check dataframe szie:**

In [32]:
nrow = df.count()
print('Shape of dataframe: ({}, {})'.format(nrow, len(df.columns)))
print('usable line percentage:', nrow/nrow_raw)
print('removed line number:', nrow_raw - nrow)   

Shape of dataframe: (16131646, 7)
usable line percentage: 0.9641437287026962
removed line number: 599932


**After data wrangling, there are:**
* 16,131,646 (96.4%) samples/rows/lines/documents usable, 
* 599,932 samples/rows/lines/documents were removed.

**Print schema and dataframe:**

In [33]:
df.printSchema()
df.sample(False, 0.00001).limit(20).show()

root
 |-- id: integer (nullable = true)
 |-- publisher: string (nullable = true)
 |-- region: string (nullable = false)
 |-- date: date (nullable = true)
 |-- ads: boolean (nullable = true)
 |-- title: string (nullable = true)
 |-- content: string (nullable = true)

+-------+--------------------+-----------------+----------+-----+--------------------+--------------------+
|     id|           publisher|           region|      date|  ads|               title|             content|
+-------+--------------------+-----------------+----------+-----+--------------------+--------------------+
|1948954|       Clutha Leader|            Otago|1876-10-27|false|SATURDAY, OCTOBER...|SATURDAY, OCTOBER...|
|2255241|    Grey River Argus|       West Coast|1869-10-30| true|Page 1 Advertisem...|TJOUNDABY TIMBER ...|
|2703510|New Zealand Gazet...|       Wellington|1842-08-13| true|Page 4 Advertisem...|! Heifers in Calf...|
|3143807|Daily Southern Cross|         Auckland|1863-10-24|false|QUEEN'S REDOUBT. ...

## 6 Save Dataset

### 6.1 Dataset for Subset

**This dataframe would be our final dataset to generate metadata and subset to analyze and visualize, let's save it as compressed csv file to save time for later processes.**

In [34]:
path = r'../data/dataset'
df.write.csv(path, mode='overwrite', compression='gzip')

**Check the clean dataset size:**

In [35]:
path = r'../data/papers_past'
print('raw   dataset size:', subprocess.check_output(['du','-sh', path]).split()[0].decode('utf-8'))
path = r'../data/dataset'
print('clean dataset size:', subprocess.check_output(['du','-sh', path]).split()[0].decode('utf-8'))

raw   dataset size: 33G
clean dataset size: 14G


**After processing and compressing, the dataset reduce from 33GB to 14GB.**

### 6.2 Dataset for Training

**This dataset would be the full dataset for MALLET topic modeling, we will use this trained topic model to infer topic models of subset to analyze and visualize.**

In [38]:
df = df.select(F.col('id'), F.col('title'), F.col('content'))
print('Shape of dataframe: ({}, {})'.format(df.count(), len(df.columns)))

path = r'../data/train'
df.write.csv(path, mode='overwrite', compression='gzip')

In [None]:
path = r'../data/train'
print('dataset-to-train size:', subprocess.check_output(['du','-sh', path]).split()[0].decode('utf-8'))

---