Use Case

1. Scale the algorithm with spark functional and cluster computing capabilities
2. explore sparkml capabilities compared to the scikit-learn 
3. Make a workflow that can automate feature generation process using parallel computing for large scale collection

In [2]:
import pyspark
import requests

In [3]:
# install package to fetch api from HTRC endpoint
dbutils.library.installPyPI('htrc-feature-reader')
# install jsonpickle to open json serialized dataset
dbutils.library.installPyPI('jsonpickle')

In [4]:
from htrc_features import FeatureReader

Transformation from csv dataset to streaming dataset

Here I present how I can get data from Github (other turtle) on the internet and store it in a temporary file in databricks

In [7]:
import requests
# data is stored somewhere in the internet, for this use case I stored data on the github
# read data from github and store it on the databricks store

# this is the initial training data, in the workflow this will be considered as the first step on model training
initial_training = "https://raw.githubusercontent.com/htrc/ht-frontmatter-analysis/master/Jupyter/FrontMatter-initial.csv"

# download data using request command
get_resp = requests.request("GET",initial_training)
# store the response text in databricks store
with open("/dbfs/tmp/FrontMatter-initial.csv","w") as file:
  file.write(get_resp.text)

In [8]:
#from pyspark.sql.types import StructType,StructField,StringType
from pyspark.sql.types import *
import pyspark.sql.types as sparktype

In [9]:
front_matter_schema = StructType([StructField("volume",StringType(),True),StructField("pages",StringType(),True)])
front_matter_df = spark.read.csv("dbfs:/tmp/FrontMatter-initial.csv",sep="\t",schema=front_matter_schema,header=None)
type(front_matter_df)
display(front_matter_df)

volume,pages
chi.40727635,"[[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""stamp""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""factual"",""title"",""""],[""no_content"",""no_content"",""""],[""factual"",""title"",""""],[""factual"",""pub_info"",""""],[""factual"",""list"",""""],[""factual"",""list"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""]]"
coo.31924000431662,"[[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""barcode, due date slip""],[""factual"",""title"",""""],[""factual"",""pub_info"",""""],[""factual"",""list"",""""],[""no_content"",""no_content"",""""],[""creative"",""pref_text"",""foreword""],[""creative"",""pref_text"",""foreword""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""factual"",""main_text"",""data table""],[""factual"",""main_text"",""data table""],[""factual"",""main_text"",""data table""],[""factual"",""main_text"",""data table""],[""creative"",""main_text"",""""]]"
coo.31924001144959,"[[""no_content"",""no_content"",""binding""],[""no_content"",""no_content"",""library bookplate""],[""no_content"",""no_content"",""due date slip, barcode""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""factual"",""title"",""""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""factual"",""title"",""""],[""factual"",""pub_info"",""""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""creative"",""pref_text"",""preface""],[""creative"",""pref_text"",""preface""],[""creative"",""pref_text"",""preface""],[""no_content"",""no_content"",""""],[""creative"",""main_text"",""""],[""no_content"",""no_content"",""""]]"
coo.31924073896007,"[[""factual"",""cover"",""""],[""no_content"",""no_content"",""stamp""],[""no_content"",""no_content"",""due date slip""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""factual"",""title"",""""],[""no_content"",""no_content"",""cutter""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""factual"",""list"",""""],[""no_content"",""no_content"",""""],[""creative"",""pref_text"",""biography""],[""no_content"",""no_content"",""""],[""creative"",""pref_text"",""acknowledgements""],[""no_content"",""no_content"",""""],[""creative"",""pref_text"",""abstract""],[""no_content"",""no_content"",""""],[""creative"",""pref_text"",""abstract""],[""no_content"",""no_content"",""""]]"
coo.31924080009719,"[[""mixed"",""cover"",""cover art photo""],[""mixed"",""pub_info"",""""],[""creative"",""ad"",""""],[""creative"",""image"",""incorporates photos into index""],[""creative"",""image"",""incorporates photos into index""],[""creative"",""image"",""""],[""creative"",""image"",""""],[""creative"",""image"",""""],[""creative"",""image"",""""],[""creative"",""image"",""""],[""creative"",""image"",""""],[""creative"",""image"",""""],[""creative"",""image"",""""],[""creative"",""image"",""""],[""creative"",""image"",""""],[""creative"",""image"",""""],[""creative"",""image"",""""],[""creative"",""image"",""""],[""creative"",""image"",""""],[""creative"",""image"",""""]]"
coo.31924094241217,"[[""no_content"",""no_content"",""cutter""],[""no_content"",""no_content"",""bookplate""],[""no_content"",""no_content"",""date slip""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""factual"",""title"",""""],[""no_content"",""no_content"",""""],[""factual"",""list"",""""],[""no_content"",""no_content"",""cutter""],[""creative"",""main_text"",""""],[""no_content"",""no_content"",""""],[""creative"",""main_text"",""""],[""no_content"",""no_content"",""""],[""creative"",""main_text"",""""],[""no_content"",""no_content"",""""],[""creative"",""main_text"",""""],[""no_content"",""no_content"",""""],[""creative"",""main_text"",""""],[""no_content"",""no_content"",""""]]"
coo.31924098538329,"[[""factual"",""cover"",""""],[""no_content"",""no_content"",""""],[""factual"",""title"",""""],[""no_content"",""no_content"",""handwritten cutter""],[""creative"",""pref_text"",""acknowledgement""],[""creative"",""pref_text"",""summary""],[""factual"",""appendix"",""""],[""factual"",""list"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""]]"
ien.35556003776788,"[[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""library sticker""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""creative"",""pref_text"",""foreword""],[""no_content"",""no_content"",""""],[""factual"",""list"",""""],[""no_content"",""no_content"",""""],[""factual"",""title"",""Section title""],[""no_content"",""no_content"",""hand written cutter""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""has data table""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""]]"
inu.30000070597756,"[[""no_content"",""no_content"",""binding""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""factual"",""cover"",""title, list of contents""],[""factual"",""pub_info"",""""],[""factual"",""list"",""""],[""no_content"",""no_content"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""factual"",""main_text"",""data table""],[""creative"",""main_text"",""""],[""factual"",""main_text"",""data table""],[""creative"",""main_text"",""""]]"
inu.30000081728184,"[[""factual"",""cover"",""""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""no_content"",""no_content"",""""],[""factual"",""title"",""""],[""factual"",""pub_info"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""factual"",""main_text"",""data plots""],[""factual"",""main_text"",""data plots""],[""factual"",""main_text"",""data plots""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""],[""creative"",""main_text"",""""]]"


The data prepared by the HTRC librarian is in the volumes and list format, we must create a function to transform the set of pages into single row for each page

In [11]:
import numpy as np
from itertools import chain
import json

This Function will read the labeled file and transform the file into single row label data for each page

In [13]:
def vol_page_pairs(X,file_name=None):
  """
  Use for transforming labeled page dataset
  """
  vol = X[0]
  pages = X[1]
  dump_pages = json.loads(pages)
  # create a temporary dataframe for column operation
  dump_pages_pd = np.array(dump_pages,dtype=str)
  
  page_length = len(dump_pages)
  vols = np.repeat(vol,page_length)
  file_names = np.repeat(file_name,page_length)
  
  
  #return tuple(zip(zip(vols,range(page_length)),zip(vols,range(page_length),dump_pages_pd[:,0], dump_pages_pd[:,1],dump_pages_pd[:,2]))
  return zip(file_names,vols,range(page_length),dump_pages_pd[:,0], dump_pages_pd[:,1],dump_pages_pd[:,2])
  #return list(zip(vols,range(page_length),[str(x) for x in dump_pages_pd[:,0]],[str(x) for x in dump_pages_pd[:,1]],[str(x) for x in dump_pages_pd[:,2]]))
  #return zip(vols,range(page_length),dump_pages)
  
def read_pages(file_name):
  front_matter_schema = StructType([StructField("volume",StringType(),True),StructField("pages",StringType(),True)])
  front_matter_df = spark.read.csv(file_name,sep="\t",schema=front_matter_schema,header=None)
  #type(front_matter_df)
  #display(front_matter_df)
  tt_pages = front_matter_df.rdd.map(lambda X:vol_page_pairs(X,file_name))
  tt_pages1 = tt_pages.flatMap(lambda x:list(x))
  
  from pyspark.sql.types import Row
  
  schema_pages = [("file_name",str),("vol_id",str),("page",int),("label1",str),("label2",str),("desc",str)]

  def transform_row(x,schema):
    t = list()
    for i,s in enumerate(schema):
      #t.append((s[0],s[1](x[i])))
      c_name = s[0]
      if c_name == None:
        c_name = "c_{}".format(i)
      t.append((c_name,s[1](x[i])))
    return Row(**dict(t))

  #tt_pages_df = spark.createDataFrame(tt_pages1.map(lambda x: Row(p_vol=str(x[0]),p_page=int(x[1]),p_type=str(x[2]),p_subtype=str(x[3]),p_desc=str(x[4]))))
  tt_pages_df = spark.createDataFrame(tt_pages1.map(lambda x: transform_row(x,schema_pages)))
  return tt_pages_df

In [14]:
pages_df = read_pages("dbfs:/tmp/FrontMatter-initial.csv")

In [15]:
display(pages_df)

desc,file_name,label1,label2,page,vol_id
,dbfs:/tmp/FrontMatter-initial.csv,no_content,no_content,0,chi.40727635
stamp,dbfs:/tmp/FrontMatter-initial.csv,no_content,no_content,1,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,no_content,no_content,2,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,no_content,no_content,3,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,factual,title,4,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,no_content,no_content,5,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,factual,title,6,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,factual,pub_info,7,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,factual,list,8,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,factual,list,9,chi.40727635


Now we stored the dataset into spark delta lake database

In [17]:
# write the data into table

# remove the delta pages store first if it's exist since this is an initialization dataset
dbutils.fs.rm("dbfs:/delta/pages", True)  
pages_df.write.partitionBy("file_name").format("delta").mode("overwrite").save("/delta/pages")
spark.sql("CREATE TABLE IF NOT EXISTS pages USING DELTA LOCATION '/delta/pages/'")

now I can do exploration for the initial dataset using spark SQL

In [19]:
%sql
SELECT * from pages

desc,file_name,label1,label2,page,vol_id
,dbfs:/tmp/FrontMatter-initial.csv,no_content,no_content,0,chi.40727635
stamp,dbfs:/tmp/FrontMatter-initial.csv,no_content,no_content,1,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,no_content,no_content,2,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,no_content,no_content,3,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,factual,title,4,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,no_content,no_content,5,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,factual,title,6,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,factual,pub_info,7,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,factual,list,8,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,factual,list,9,chi.40727635


From the first visualization we can see that most of the book pages we labeled here is creative content. The creative contents can be graphic, poem, main text in the book. To get more idea, I cross reference the count of each content to the particular section on the page which is also labeled in the dataset (label2)

In [21]:
%sql
SELECT label1,label2,count(1) from pages
group by label1,label2

label1,label2,count(1)
factual,appendix,564
mixed,main_text,28
factual,main_text,407
factual,ad,141
mixed,list,15
creative,poem,123
factual,pref_text,54
mixed,pub_info,12
mixed,image,6
factual,cover,280


From the detail visualization we can see that the creative content mainly found in the main_text section, while some in the preface_text content. Content that mainly displaying image is also labeled as creative content. On the other hand, factual content for the first 20 pages (front-matter) is mainly found in appendix, list of content, title page, dedication, and publication info. This information might be useful for research scholar especially people that are working with HTRC collection (Digital Humanities research).

Next I fetch volume from the HTRC API and store it in the extracted features statistic for the page.

Fetch data from HTRC api. This function will fetch the extracted features for the respected volumes and pages

In [25]:
def fetch_volumes_api(file_name):  
  # transform statistical values for every pages from EF
  def transform_page_stat(t):
    """
    Given a page extracted feature it will transform the page feature into the summary statistical fatures we will use
    for the machine learning model
    t: page extracted feature
    """
    line_count = t.line_count()
    token_count = t.token_count()
    empty_line_count = t.empty_line_count()
    cap_alpha_seq = t.cap_alpha_seq()
    total_sentence = sum(t._json["body"]["beginCharCounts"].values())
    total_capital = sum([t._json["body"]["beginCharCounts"][x] if x.isupper() else 0 for x in t._json["body"]["beginCharCounts"].keys()])
    total_numeric = sum([t._json["body"]["endCharCount"][x] if x.isnumeric() else 0 for x in t._json["body"]["endCharCount"].keys()])
    if total_sentence > 0:
      pct_begin_char_caps = total_capital/total_sentence
      pct_end_numeric = total_numeric/total_sentence
    else:
      pct_begin_char_caps = 0
      pct_end_numeric = 0
    total_roman = 0
    pct_roman = 0
    #total_roman = roman_count(t)
    if t.token_count() > 0:      
      pct_all_caps = t.tokenlist().loc[[x.isupper() for x in t.tokenlist().reset_index().token.values],:]["count"].sum()/t.token_count()
      #pct_roman = total_roman/t.token_count()
    else:
      pct_all_caps = 0
      pct_roman = 0
    return [line_count,token_count,empty_line_count,total_sentence,total_capital,total_numeric,pct_begin_char_caps,pct_end_numeric,pct_all_caps,pct_roman]

  def fetch_volume(vol:str,pages:int=20):
      """
      vol: list of volume_id
      pages: int, how many pages want to be extracted. For the front-matter detection purpose
      we only use the first 20 pages
      """
      from htrc_features import FeatureReader
      import urllib.parse

      #print(vol)
      #fr_vol = FeatureReader(ids=[urllib.parse.quote(vol)])
      #return fr_vol
      try:
        fr_vol = FeatureReader(ids=[urllib.parse.quote(vol)]).volumes()
        for my_vol in fr_vol:
          #print(my_vol.volume_identifier)
          #return my_vol
          all_pages = my_vol.pages()

          # all_pages are iter file, read only the pages that we are interested in
          # to minimize memory usage

          page_stat = []
          #return(all_pages)
          for i in range(pages):
            # seq start from 1, pages index processor start from 0
            #return(i)
            try:
              page = next(all_pages)
              #return page
              #if page == None:
              #  break
              temp = [my_vol.volume_identifier,i]
              temp.extend(transform_page_stat(page))
              page_stat.append(temp)
            except:
              continue

          #return ((my_vol.id,i),(page_stat))
          return page_stat
          #print(my_vol.id)
          #file.write(jsonpickle.dumps(my_vol)+"\n")        
      except BaseException as ex:
        #print(ex)
        return []

  from pyspark.sql.types import Row
  import pyspark.sql.types as sparktype

  def transform_row(x,schema):
    t = list()
    for i,s in enumerate(schema):
      #t.append((s[0],s[1](x[i])))
      c_name = s[0]
      if c_name == None:
        c_name = "c_{}".format(i)
      t.append((c_name,s[1](x[i])))
    return Row(**dict(t))      
      
  try:
    new_volumes_df = spark.sql("(select distinct vol_id from pages where file_name='{}') minus select distinct vol_id from page_stat".format(file_name))
    tt_stat1 = new_volumes_df.rdd.map(lambda x: fetch_volume(x.vol_id,20))
  except BaseException as ex:
    print(ex)
    # there is no page_stat exist yet in the table
    # then use all the volumes
    new_volumes_df = spark.sql("select distinct vol_id from pages")
    tt_stat1 = new_volumes_df.rdd.map(lambda x: fetch_volume(x.vol_id,20))
  
  schema_stat = [("vol_id",str),("page",int),("line_count",int),("token_count",int),("empty_line_count",int),("total_sentence",int)
                ,("total_capital",int),("total_numeric",int),("pct_begin_char_caps",float),("pct_end_numeric",float),("pct_all_caps",float),("pct_roman",float)]
  try:
    tt_stat_df = spark.createDataFrame(tt_stat1.flatMap(lambda x:x).map(lambda x: transform_row(x,schema_stat)))
  except ValueError as ex:
    print(ex)
    # rdd is empty
    return None,new_volumes_df
  return tt_stat_df,new_volumes_df
  #"""
  #return tt_stat1

In [26]:
stat_df,delta_vol_df = fetch_volumes_api("dbfs:/tmp/FrontMatter-initial.csv")

# Save stat data to table, Because this is for initialization, we drop the table and rewrite the delta file by removing the folder
if stat_df!=None:
  dbutils.fs.rm("dbfs:/delta/page_stat", True)  
  #tt_stat_df.write.format("delta").mode("overwrite").save("/delta/page_stat")
  stat_df.write.format("delta").mode("overwrite").save("/delta/page_stat")
  spark.sql("CREATE TABLE IF NOT EXISTS page_stat USING DELTA LOCATION '/delta/page_stat/'")
else:
  print("There is no new volume need to be fetch, if this is an initial dataset, there is something wrong with the HTRC API, use the backup approach, on cell 31")

In [27]:
delta_vol_df.count()

there are 21 volumes that are not exist on the api, this can be happened because of several reasons: 
1. missing data: there is no ocr for this scanned book
2. wrong volume_id: there is an error when writing this dataset, but this rarely happened because the labeler has an application to prepare dataset

Incase The function returning an empty rdd (stat_df is empty), that's mean the api cannot be accessed because of maintenance I already prepared the complete statistic in the separated file and here I perform the download, and transform of the statistic values from github. This block codes is attempt for skipping the data crawling trhough the API directly. we assume that we have a statistical feature already derived and living somewhere in the internet.

-- Start of backup attempt for preparing page_stat table

In [30]:
# this is the initial training data, in the workflow this will be considered as the first step on model training
statistic_file = "https://raw.githubusercontent.com/htrc/ht-frontmatter-analysis/master/Jupyter/ef-fact-creat-sels.json"

# download data using request command
get_resp = requests.request("GET",statistic_file)
# store the response text in databricks store
with open("/dbfs/tmp/ef-fact-creat-sels.json","w") as file:
  file.write(get_resp.text)

In [31]:
#from pyspark.sql.types import StructType,StructField,StringType
from pyspark.sql.types import Row
import pyspark.sql.types as sparktype

schema_stat = [("vol_id",str),("page",int),("line_count",int),("token_count",int),("empty_line_count",int),("total_sentence",int)
                ,("total_capital",int),("total_numeric",int),("pct_begin_char_caps",float),("pct_end_numeric",float),("pct_all_caps",float),("pct_roman",float)]

def transform_row(x,schema):
  t = list()
  for i,s in enumerate(schema):
    #t.append((s[0],s[1](x[i])))
    c_name = s[0]
    if c_name == None:
      c_name = "c_{}".format(i)
    t.append((c_name,s[1](x[i])))
  return Row(**dict(t))

stat_checkpoint = spark.read.text("dbfs:/tmp/ef-fact-creat-sels.json").rdd.map(lambda x:json.loads(x[0])).flatMap(lambda x:x)

stat_checkpoint_df = spark.createDataFrame(stat_checkpoint.map(lambda x: transform_row(x,schema_stat)))


Now I store the statistic data to the table

In [33]:
# write the data into table
# because this will be the initialization data, we remove the old file
dbutils.fs.rm("dbfs:/delta/page_stat", True)  
#tt_stat_df.write.format("delta").mode("overwrite").save("/delta/page_stat")
stat_checkpoint_df.write.format("delta").mode("overwrite").save("/delta/page_stat")
spark.sql("CREATE TABLE IF NOT EXISTS page_stat USING DELTA LOCATION '/delta/page_stat/'")

--End of block code for backup attempt on page_stat

Now since we have page_stat and page_label we can combine the two tables together and make a features based on the volume label set

In [36]:
%sql
SELECT * FROM pages p, page_stat s
where p.vol_id = s.vol_id

desc,file_name,label1,label2,page,vol_id,empty_line_count,line_count,page.1,pct_all_caps,pct_begin_char_caps,pct_end_numeric,pct_roman,token_count,total_capital,total_numeric,total_sentence,vol_id.1
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,19,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,18,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,17,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,16,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,15,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,14,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,13,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,12,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,11,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,10,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635


Here we provide the statistic visualization for each label, creative content has the most average total_sentence and average line_count. Interestingly the same pattern exists for each label.

In [38]:
%sql
SELECT * FROM pages p, page_stat s
where p.vol_id = s.vol_id

desc,file_name,label1,label2,page,vol_id,empty_line_count,line_count,page.1,pct_all_caps,pct_begin_char_caps,pct_end_numeric,pct_roman,token_count,total_capital,total_numeric,total_sentence,vol_id.1
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,19,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,18,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,17,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,16,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,15,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,14,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,13,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,12,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,11,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635
,dbfs:/tmp/FrontMatter-initial.csv,creative,main_text,10,chi.40727635,1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,chi.40727635


This other visualization is providing the percentage for token start with character caps , end with numeric and all capitals. The intuition behind this is we want to use the percentage of these features and see if token characteristics can help us determine the label content. For these features, interestingly, the most percentage comes from the factual content, while creative content can be seen as having quite the lowest percentage of tokens starting with caps, all caps or end with numeric

Now after providing set of visualization, we start training a model using the features set

Prepare spark dataframe for prediction

In [42]:
filtered_sql = spark.sql("""
select s.*,p.label1,p.label2,p.desc from page_stat as s, pages as p
where p.vol_id = s.vol_id and p.page = s.page
and p.file_name='dbfs:/tmp/FrontMatter-initial.csv'
""")

mixed content should be categorized as creative prediction as well, at least for now

In [45]:
from pyspark.sql.functions import col,when
filtered_sql = filtered_sql.withColumn("label1",when(col("label1")=="mixed","creative").otherwise(col("label1")))

Volumes that are not exists on the HTRC API, there are 25 volumes in total need to check further why its not there

In [47]:
%sql
(select distinct vol_id from pages) minus select distinct vol_id from page_stat

vol_id
uc1.l0075873877
txu.059173014313723
umn.31951d010311165
uiug.30112121939497
nyp.33433082868591
uc1.l0064487499
pst.000047198050
osu.32435075731331
uc1.b000580094
umn.31951p01038978c


In [48]:
display(filtered_sql)

empty_line_count,line_count,page,pct_all_caps,pct_begin_char_caps,pct_end_numeric,pct_roman,token_count,total_capital,total_numeric,total_sentence,vol_id,label1,label2,desc
1,1,0,0.0,0.0,0.0,0.0,0,0,0,0,coo.31924000431662,no_content,no_content,
1,9,1,0.4285714285714285,0.75,0.0,0.0,28,6,0,8,coo.31924000431662,no_content,no_content,"barcode, due date slip"
1,18,2,0.711340206185567,0.7647058823529411,0.1176470588235294,0.0,97,13,2,17,coo.31924000431662,factual,title,
0,13,3,0.0252100840336134,0.5384615384615384,0.0769230769230769,0.0,119,7,1,13,coo.31924000431662,factual,pub_info,
0,28,4,0.0786516853932584,0.7142857142857143,0.3214285714285714,0.0,178,20,9,28,coo.31924000431662,factual,list,
0,0,5,0.0,0.0,0.0,0.0,0,0,0,0,coo.31924000431662,no_content,no_content,
0,24,6,0.036,0.4166666666666667,0.0,0.0,250,10,0,24,coo.31924000431662,creative,pref_text,foreword
0,26,7,0.0248226950354609,0.2692307692307692,0.0,0.0,282,7,0,26,coo.31924000431662,creative,pref_text,foreword
0,43,8,0.0736842105263157,0.5348837209302325,0.0,0.0,380,23,0,43,coo.31924000431662,creative,main_text,
0,45,9,0.0262008733624454,0.2444444444444444,0.0222222222222222,0.0,458,11,1,45,coo.31924000431662,creative,main_text,


use sparkml to build a random forest prediction from the dataset

In [50]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

labelIndexer = StringIndexer(inputCol="label1", outputCol="index_label1").fit(filtered_sql)

In [51]:
filtered_sql.columns

In [52]:
from pyspark.ml.linalg import Vectors
features_df = spark.createDataFrame(filtered_sql.rdd.map(lambda x:{"vol_id":x.vol_id,"page":x.page,"features":Vectors.dense([x.empty_line_count,
x.line_count,
x.pct_all_caps,
x.pct_begin_char_caps,
x.pct_end_numeric,
x.token_count,
x.total_capital,
x.total_numeric,
x.total_sentence])}))
#.sample(fraction=0.1,withReplacement=False))
#.collect()
#.map(lambda x: Row(tuple(x))).collect()

In [53]:
display(features_df)

features,page,vol_id
"List(1, 9, List(), List(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))",0,coo.31924000431662
"List(1, 9, List(), List(1.0, 9.0, 0.42857142857142855, 0.75, 0.0, 28.0, 6.0, 0.0, 8.0))",1,coo.31924000431662
"List(1, 9, List(), List(1.0, 18.0, 0.711340206185567, 0.7647058823529411, 0.11764705882352941, 97.0, 13.0, 2.0, 17.0))",2,coo.31924000431662
"List(1, 9, List(), List(0.0, 13.0, 0.025210084033613446, 0.5384615384615384, 0.07692307692307693, 119.0, 7.0, 1.0, 13.0))",3,coo.31924000431662
"List(1, 9, List(), List(0.0, 28.0, 0.07865168539325842, 0.7142857142857143, 0.32142857142857145, 178.0, 20.0, 9.0, 28.0))",4,coo.31924000431662
"List(1, 9, List(), List(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))",5,coo.31924000431662
"List(1, 9, List(), List(0.0, 24.0, 0.036, 0.4166666666666667, 0.0, 250.0, 10.0, 0.0, 24.0))",6,coo.31924000431662
"List(1, 9, List(), List(0.0, 26.0, 0.024822695035460994, 0.2692307692307692, 0.0, 282.0, 7.0, 0.0, 26.0))",7,coo.31924000431662
"List(1, 9, List(), List(0.0, 43.0, 0.07368421052631578, 0.5348837209302325, 0.0, 380.0, 23.0, 0.0, 43.0))",8,coo.31924000431662
"List(1, 9, List(), List(0.0, 45.0, 0.026200873362445413, 0.24444444444444444, 0.022222222222222223, 458.0, 11.0, 1.0, 45.0))",9,coo.31924000431662


In [54]:
filtered_features = filtered_sql.join(features_df,["vol_id","page"]).select("vol_id","page","label1","features")

In [55]:
display(filtered_features)

vol_id,page,label1,features
chi.40727635,8,factual,"List(1, 9, List(), List(0.0, 35.0, 0.1259259259259259, 0.6571428571428571, 0.4, 405.0, 23.0, 14.0, 35.0))"
coo.31924004057562,7,no_content,"List(1, 9, List(), List(0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0))"
coo.31924059815310,10,creative,"List(1, 9, List(), List(1.0, 24.0, 0.11818181818181818, 0.4782608695652174, 0.08695652173913043, 220.0, 11.0, 2.0, 23.0))"
coo.31924074255153,3,no_content,"List(1, 9, List(), List(0.0, 4.0, 0.3333333333333333, 0.5, 0.0, 6.0, 2.0, 0.0, 4.0))"
coo.31924074717806,7,creative,"List(1, 9, List(), List(9.0, 65.0, 0.08092485549132948, 0.32142857142857145, 0.03571428571428571, 519.0, 18.0, 2.0, 56.0))"
coo.31924074847017,9,no_content,"List(1, 9, List(), List(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))"
coo.31924089842185,1,no_content,"List(1, 9, List(), List(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))"
coo.31924090208350,0,factual,"List(1, 9, List(), List(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))"
ien.35556003318441,4,no_content,"List(1, 9, List(), List(1.0, 4.0, 0.3, 0.3333333333333333, 0.0, 10.0, 1.0, 0.0, 3.0))"
inu.30000053359323,12,creative,"List(1, 9, List(), List(0.0, 31.0, 0.004219409282700422, 0.06451612903225806, 0.0, 237.0, 2.0, 0.0, 31.0))"


Here I present a Pipeline to train and test the model prediction using Spark Pipeline all at once

In [57]:
from pyspark.ml.feature import StandardScaler

labelIndexer = StringIndexer(inputCol="label1", outputCol="indexedLabel").fit(filtered_features)
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=True)
featureIndexer = VectorIndexer(inputCol="scaledFeatures", outputCol="indexedFeatures", maxCategories=20)

# random forest training model
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=30)

# logistic regression training model
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="indexedLabel", featuresCol="indexedFeatures",maxIter=10, regParam=0.3, elasticNetParam=0.8)

# linearSVC on sparkml is just for binary classifier, in exchange i use one vs all logistic regression classiffier
#from pyspark.ml.classification import LinearSVC
#lsvc = LinearSVC(labelCol="indexedLabel", featuresCol="indexedFeatures",maxIter=10, regParam=0.1)

# one vs rest machine
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# instantiate the base classifier.
lr2 = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)
# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(labelCol="indexedLabel", featuresCol="indexedFeatures",classifier=lr2)

# Multilayer perceptron

from pyspark.ml.classification import MultilayerPerceptronClassifier
layers = [9, 5, 4, 3]
mlp = MultilayerPerceptronClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures",maxIter=100, layers=layers, blockSize=128, seed=1234)


# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline_rf = Pipeline(stages=[labelIndexer, scaler, featureIndexer, rf, labelConverter])
pipeline_lr = Pipeline(stages=[labelIndexer, scaler, featureIndexer, lr, labelConverter])
pipeline_ovr = Pipeline(stages=[labelIndexer, scaler, featureIndexer, ovr, labelConverter])
pipeline_mlp = Pipeline(stages=[labelIndexer, scaler, featureIndexer, mlp, labelConverter])

create training and testing set

In [59]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = filtered_features.randomSplit([0.7, 0.3],seed=7)

Random Forest

In [61]:
# Train model.  This also runs the indexers.
model_rf = pipeline_rf.fit(trainingData)

# Make predictions.
predictions = model_rf.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label1", "features").show(10)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))

rfModel = model_rf.stages[2]
print(rfModel)  # summary only

Logistic Regression

In [63]:
# Train model.  This also runs the indexers.
model_lr = pipeline_lr.fit(trainingData)

# Make predictions.
predictions = model_lr.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label1", "features").show(10)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))

lrModel = model_lr.stages[2]
print(lrModel)  # summary only

One vs rest classifier

In [65]:
# Train model.  This also runs the indexers.
model_ovr = pipeline_ovr.fit(trainingData)

# Make predictions.
predictions = model_ovr.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label1", "features").show(10)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))

ovrModel = model_ovr.stages[2]
print(ovrModel)  # summary only

Multilayer Perceptron

In [67]:
# Train model.  This also runs the indexers.
model_mlp = pipeline_mlp.fit(trainingData)

# Make predictions.
predictions = model_mlp.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label1", "features").show(10)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))

mlpModel = model_mlp.stages[2]
print(mlpModel)  # summary only

as we can see from these four different models, Random Forest performs the best with 84% accuracy in total followed by multilayer perceptron (neural network) with 83.9% accuracy overall. However, the training time for multilayer perceptron is quite long with 4.45 minutes compared to the random forest model that only takes 1.11 minutes to finish. Now we will impelement this to the new volume set that we assume is the new collection that we want to publish / analyze and added into the training set to improve our prediction accuracy. This is where the iterative sampling begin.

Analyze the prediction bias for creative content

In [70]:
predictions = model_rf.transform(testData)
display(predictions)

vol_id,page,label1,features,indexedLabel,scaledFeatures,indexedFeatures,rawPrediction,probability,prediction,predictedLabel
chi.40727635,8,factual,"List(1, 9, List(), List(0.0, 35.0, 0.1259259259259259, 0.6571428571428571, 0.4, 405.0, 23.0, 14.0, 35.0))",2.0,"List(1, 9, List(), List(-0.14083985005207128, 0.10872764360979133, 0.11349570021998304, 0.9923018497182697, 1.5799325373371156, 0.6151953665760024, 0.6541587092571453, 0.47980025911217056, 0.12641774611303694))","List(1, 9, List(), List(-0.14083985005207128, 0.10872764360979133, 0.11349570021998304, 0.9923018497182697, 1.5799325373371156, 0.6151953665760024, 0.6541587092571453, 0.47980025911217056, 0.12641774611303694))","List(1, 3, List(), List(3.992565557995982, 0.5610521713508145, 25.446382270653206))","List(1, 3, List(), List(0.13308551859986603, 0.018701739045027147, 0.8482127423551068))",2.0,factual
ien.35556003318441,4,no_content,"List(1, 9, List(), List(1.0, 4.0, 0.3, 0.3333333333333333, 0.0, 10.0, 1.0, 0.0, 3.0))",1.0,"List(1, 9, List(), List(0.0480661887858422, -0.5521479698599675, 0.9318915912335227, 0.008297616770778468, -0.4635135593002045, -0.7416135791802555, -0.5003726731809515, -0.2063252631589718, -0.565607599247053))","List(1, 9, List(), List(0.0480661887858422, -0.5521479698599675, 0.9318915912335227, 0.008297616770778468, -0.4635135593002045, -0.7416135791802555, -0.5003726731809515, -0.2063252631589718, -0.565607599247053))","List(1, 3, List(), List(7.841356832758467, 10.529931100290622, 11.62871206695091))","List(1, 3, List(), List(0.2613785610919489, 0.3509977033430207, 0.3876237355650304))",2.0,factual
inu.30000053359323,12,creative,"List(1, 9, List(), List(0.0, 31.0, 0.004219409282700422, 0.06451612903225806, 0.0, 237.0, 2.0, 0.0, 31.0))",0.0,"List(1, 9, List(), List(-0.14083985005207128, 0.02345337090401599, -0.4586981411673449, -0.8085939428393304, -0.4635135593002045, 0.03812219471004965, -0.44789397397921976, -0.2063252631589718, 0.0399145779430257))","List(1, 9, List(), List(-0.14083985005207128, 0.02345337090401599, -0.4586981411673449, -0.8085939428393304, -0.4635135593002045, 0.03812219471004965, -0.44789397397921976, -0.2063252631589718, 0.0399145779430257))","List(1, 3, List(), List(27.881487004905136, 0.3305274488513941, 1.7879855462434757))","List(1, 3, List(), List(0.9293829001635043, 0.011017581628379802, 0.05959951820811584))",0.0,creative
mdp.35128001747722,17,factual,"List(1, 9, List(), List(0.0, 26.0, 0.0410958904109589, 0.5, 0.7692307692307693, 146.0, 13.0, 20.0, 26.0))",2.0,"List(1, 9, List(), List(-0.14083985005207128, -0.08313946997820318, -0.285326197120337, 0.514770383729046, 3.4661904726946413, -0.2744591067173414, 0.1293717172398286, 0.7738540543712316, -0.06821438226948834))","List(1, 9, List(), List(-0.14083985005207128, -0.08313946997820318, -0.285326197120337, 0.514770383729046, 3.4661904726946413, -0.2744591067173414, 0.1293717172398286, 0.7738540543712316, -0.06821438226948834))","List(1, 3, List(), List(6.2920191444615465, 0.5729965295994984, 23.13498432593896))","List(1, 3, List(), List(0.20973397148205153, 0.019099884319983277, 0.7711661441979653))",2.0,factual
mdp.39015000443450,3,no_content,"List(1, 9, List(), List(0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0))",1.0,"List(1, 9, List(), List(-0.14083985005207128, -0.616103674389299, -0.47853536987491796, -1.0046479171457565, -0.4635135593002045, -0.7725282133873601, -0.5528513723826831, -0.2063252631589718, -0.6088591833320586))","List(1, 9, List(), List(-0.14083985005207128, -0.616103674389299, -0.47853536987491796, -1.0046479171457565, -0.4635135593002045, -0.7725282133873601, -0.5528513723826831, -0.2063252631589718, -0.6088591833320586))","List(1, 3, List(), List(1.8920243132414303, 26.676225726810937, 1.431749959947636))","List(1, 3, List(), List(0.06306747710804766, 0.8892075242270311, 0.047724998664921185))",1.0,no_content
mdp.39015004726710,5,no_content,"List(1, 9, List(), List(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))",1.0,"List(1, 9, List(), List(-0.14083985005207128, -0.6374222425657429, -0.47853536987491796, -1.0046479171457565, -0.4635135593002045, -0.7759631727437051, -0.5528513723826831, -0.2063252631589718, -0.6304849753745614))","List(1, 9, List(), List(-0.14083985005207128, -0.6374222425657429, -0.47853536987491796, -1.0046479171457565, -0.4635135593002045, -0.7759631727437051, -0.5528513723826831, -0.2063252631589718, -0.6304849753745614))","List(1, 3, List(), List(0.8884803806426139, 28.026450799088927, 1.0850688202684582))","List(1, 3, List(), List(0.02961601268808713, 0.9342150266362975, 0.03616896067561527))",1.0,no_content
mdp.39015010784448,16,creative,"List(1, 9, List(), List(1.0, 120.0, 0.03397508493771234, 0.2689075630252101, 0.01680672268907563, 883.0, 32.0, 2.0, 119.0))",0.0,"List(1, 9, List(), List(0.0480661887858422, 1.9208059386075174, -0.3188041172012554, -0.18748177213745937, -0.37765447960956083, 2.2571059389088917, 1.1264670020727303, -0.10830733140595145, 1.9429842776832729))","List(1, 9, List(), List(0.0480661887858422, 1.9208059386075174, -0.3188041172012554, -0.18748177213745937, -0.37765447960956083, 2.2571059389088917, 1.1264670020727303, -0.10830733140595145, 1.9429842776832729))","List(1, 3, List(), List(27.179330325198332, 0.25839459818365956, 2.5622750766180125))","List(1, 3, List(), List(0.905977677506611, 0.008613153272788651, 0.0854091692206004))",0.0,creative
mdp.39015014365863,12,creative,"List(1, 9, List(), List(0.0, 29.0, 0.05555555555555555, 0.3103448275862069, 0.034482758620689655, 324.0, 9.0, 1.0, 29.0))",0.0,"List(1, 9, List(), List(-0.14083985005207128, -0.019183765448871678, -0.2173451918918734, -0.06156069591312042, -0.28735441303836656, 0.33696365871206085, -0.0805430795670981, -0.15731629728246163, -0.003337006141979918))","List(1, 9, List(), List(-0.14083985005207128, -0.019183765448871678, -0.2173451918918734, -0.06156069591312042, -0.28735441303836656, 0.33696365871206085, -0.0805430795670981, -0.15731629728246163, -0.003337006141979918))","List(1, 3, List(), List(26.80743500674088, 0.324942306269154, 2.867622686989969))","List(1, 3, List(), List(0.8935811668913626, 0.010831410208971798, 0.09558742289966561))",0.0,creative
mdp.39015015416517,9,no_content,"List(1, 9, List(), List(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))",1.0,"List(1, 9, List(), List(-0.14083985005207128, -0.6374222425657429, -0.47853536987491796, -1.0046479171457565, -0.4635135593002045, -0.7759631727437051, -0.5528513723826831, -0.2063252631589718, -0.6304849753745614))","List(1, 9, List(), List(-0.14083985005207128, -0.6374222425657429, -0.47853536987491796, -1.0046479171457565, -0.4635135593002045, -0.7759631727437051, -0.5528513723826831, -0.2063252631589718, -0.6304849753745614))","List(1, 3, List(), List(0.8884803806426139, 28.026450799088927, 1.0850688202684582))","List(1, 3, List(), List(0.02961601268808713, 0.9342150266362975, 0.03616896067561527))",1.0,no_content
mdp.39015030651031,13,factual,"List(1, 9, List(), List(0.0, 11.0, 0.04838709677419355, 1.0, 0.7272727272727273, 62.0, 11.0, 8.0, 11.0))",2.0,"List(1, 9, List(), List(-0.14083985005207128, -0.4029179926248607, -0.2510471503412985, 2.0341886846038486, 3.2518429800403768, -0.5629956926503178, 0.024414318836365245, 0.18574646385310958, -0.39260126290703046))","List(1, 9, List(), List(-0.14083985005207128, -0.4029179926248607, -0.2510471503412985, 2.0341886846038486, 3.2518429800403768, -0.5629956926503178, 0.024414318836365245, 0.18574646385310958, -0.39260126290703046))","List(1, 3, List(), List(4.037369365099161, 1.4340460712459762, 24.52858456365486))","List(1, 3, List(), List(0.13457897883663872, 0.047801535708199214, 0.817619485455162))",2.0,factual


In [71]:
# create temporary view for predictions to do easy query
predictions.createOrReplaceTempView("rf_predictions")

In [72]:
rf_probability = spark.sql('select vol_id,page,label1,predictedLabel,probability from rf_predictions where label1="creative" and predictedLabel<>"creative"').rdd.map(lambda x:x.probability)

density of probability distribution of missclassified creative prediction

In [74]:
from pyspark.sql.types import Row
rf_probability_1 = spark.createDataFrame(rf_probability.map(lambda x:[float(y) for y in x]))
display(rf_probability_1)
#rf_probability.take(5)

_1,_2,_3
0.4551417711480927,0.018812647751343,0.5260455811005642
0.2704119727045347,0.3497655991681754,0.3798224281272899
0.3134963089246677,0.0285738826702049,0.6579298084051274
0.3112763326927446,0.2370298050025336,0.4516938623047218
0.4139564077142601,0.0702700987677142,0.5157734935180257
0.2606978892882995,0.0474232682181006,0.6918788424935998
0.1628459674247278,0.1033775747313167,0.7337764578439555
0.3488054007151037,0.0226650862105825,0.6285295130743138
0.4472655247846386,0.0131220403101828,0.5396124349051785
0.2133711194071939,0.0358074662073179,0.7508214143854881


In [75]:
rf_probability_pd = rf_probability_1.toPandas()
rf_probability_pd.describe()

Unnamed: 0,_1,_2,_3
count,341.0,341.0,341.0
mean,0.2379,0.284697,0.477403
std,0.111012,0.322861,0.255188
min,0.029616,0.006415,0.036169
25%,0.159236,0.031276,0.298027
50%,0.219722,0.103886,0.557453
75%,0.320488,0.514332,0.679532
max,0.489981,0.934215,0.848263


In [76]:
min_threshold = rf_probability_pd.describe()._1["25%"]
min_threshold

In [77]:
import requests

next_training = "https://raw.githubusercontent.com/htrc/ht-frontmatter-analysis/master/Jupyter/FrontMatter-next.csv"

# download data using request command
get_resp = requests.request("GET",next_training)
# store the response text in databricks store
with open("/dbfs/tmp/FrontMatter-next.csv","w") as file:
  file.write(get_resp.text)

In [78]:
pages_df = read_pages("dbfs:/tmp/FrontMatter-next.csv")
pages_df.write.partitionBy("file_name").format("delta").mode("append").save("/delta/pages")

In [79]:
%sql
select file_name,count(1) from pages group by file_name

file_name,count(1)
dbfs:/tmp/FrontMatter-initial.csv,16000
dbfs:/tmp/FrontMatter-next.csv,2000


get new statistical features values from the API

In [81]:
next_file_name = "dbfs:/tmp/FrontMatter-next.csv"
stat_df,delta_vol_df = fetch_volumes_api("dbfs:/tmp/FrontMatter-next.csv")
# write the data into table
if stat_df!=None:
  stat_df.write.format("delta").mode("overwrite").save("/delta/page_stat")
  spark.sql("CREATE TABLE IF NOT EXISTS page_stat USING DELTA LOCATION '/delta/page_stat/'")
else:
  print("There is no new volume need to be fetch")

Now i will recreate the new volume set and test the prediction accuracy, using the trained multi layer perceptron model that performed the best on the training I will test the accuracy over the new volume set

In [83]:
filtered_sql = spark.sql("""
select s.*,p.label1,p.label2,p.desc from page_stat as s, pages as p
where p.vol_id = s.vol_id and p.page = s.page
and p.file_name='dbfs:/tmp/FrontMatter-next.csv'
""")

from pyspark.sql.functions import col,when
filtered_sql = filtered_sql.withColumn("label1",when(col("label1")=="mixed","creative").otherwise(col("label1")))

from pyspark.ml.linalg import Vectors
features_df = spark.createDataFrame(filtered_sql.rdd.map(lambda x:{"vol_id":x.vol_id,"page":x.page,"features":Vectors.dense([x.empty_line_count,
x.line_count,
x.pct_all_caps,
x.pct_begin_char_caps,
x.pct_end_numeric,
#x.pct_roman,
x.token_count,
x.total_capital,
x.total_numeric,
x.total_sentence])}))
#.sample(fraction=0.1,withReplacement=False))
#.collect()
#.map(lambda x: Row(tuple(x))).collect()

filtered_features = filtered_sql.join(features_df,["vol_id","page"]).select("vol_id","page","label1","features")

we assume we don't know about the truth label yet, we will use the threshold information to give us the vol_id and pages that are above the threshold

In [85]:
predictions = model_rf.transform(filtered_features)

# Assumption we don't know about the truth value of the label
"""
# Select example rows to display.
predictions.select("predictedLabel", "label1", "features").show(10)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Error = %g" % (1-accuracy))
"""

In [86]:
display(predictions)

vol_id,page,label1,features,indexedLabel,scaledFeatures,indexedFeatures,rawPrediction,probability,prediction,predictedLabel
inu.30000095243873,18,creative,"List(1, 9, List(), List(0.0, 20.0, 0.0, 0.55, 0.05, 138.0, 11.0, 1.0, 20.0))",0.0,"List(1, 9, List(), List(-0.14083985005207128, -0.2110508790368662, -0.47853536987491796, 0.6667122138165265, -0.20808279722053952, -0.30193878156810106, 0.024414318836365245, -0.15731629728246163, -0.19796913452450518))","List(1, 9, List(), List(-0.14083985005207128, -0.2110508790368662, -0.47853536987491796, 0.6667122138165265, -0.20808279722053952, -0.30193878156810106, 0.024414318836365245, -0.15731629728246163, -0.19796913452450518))","List(1, 3, List(), List(14.406420778561262, 0.5985992920185057, 14.994979929420232))","List(1, 3, List(), List(0.4802140259520421, 0.01995330973395019, 0.49983266431400775))",2.0,factual
osu.32435003329760,3,no_content,"List(1, 9, List(), List(11.0, 17.0, 0.4444444444444444, 0.8333333333333334, 0.0, 9.0, 5.0, 0.0, 6.0))",1.0,"List(1, 9, List(), List(1.937126577164977, -0.27500658356619767, 1.6109860539894385, 1.5277159176455812, -0.4635135593002045, -0.7450485385366005, -0.29045787637402476, -0.2063252631589718, -0.5007302231195445))","List(1, 9, List(), List(1.937126577164977, -0.27500658356619767, 1.6109860539894385, 1.5277159176455812, -0.4635135593002045, -0.7450485385366005, -0.29045787637402476, -0.2063252631589718, -0.5007302231195445))","List(1, 3, List(), List(5.282384711507669, 4.4394626987342, 20.278152589758133))","List(1, 3, List(), List(0.17607949038358897, 0.14798208995780668, 0.6759384196586045))",2.0,factual
pst.000017038669,11,creative,"List(1, 9, List(), List(0.0, 26.0, 0.013793103448275862, 0.11538461538461539, 0.0, 290.0, 3.0, 0.0, 26.0))",0.0,"List(1, 9, List(), List(-0.14083985005207128, -0.08313946997820318, -0.413688153272231, -0.6540129246361867, -0.4635135593002045, 0.22017504059633236, -0.3954152747774881, -0.2063252631589718, -0.06821438226948834))","List(1, 9, List(), List(-0.14083985005207128, -0.08313946997820318, -0.413688153272231, -0.6540129246361867, -0.4635135593002045, 0.22017504059633236, -0.3954152747774881, -0.2063252631589718, -0.06821438226948834))","List(1, 3, List(), List(28.094594373034337, 0.21522281171269583, 1.6901828152529719))","List(1, 3, List(), List(0.9364864791011445, 0.007174093723756527, 0.05633942717509906))",0.0,creative
uc1.$b113745,15,creative,"List(1, 9, List(), List(0.0, 47.0, 0.04607046070460705, 0.1702127659574468, 0.02127659574468085, 369.0, 8.0, 1.0, 47.0))",0.0,"List(1, 9, List(), List(-0.14083985005207128, 0.36455046172711736, -0.26193863691336877, -0.487399133869228, -0.35481961798970874, 0.4915368297475839, -0.13302177876882976, -0.15731629728246163, 0.38592725062307065))","List(1, 9, List(), List(-0.14083985005207128, 0.36455046172711736, -0.26193863691336877, -0.487399133869228, -0.35481961798970874, 0.4915368297475839, -0.13302177876882976, -0.15731629728246163, 0.38592725062307065))","List(1, 3, List(), List(27.765013147073912, 0.21394198812731677, 2.021044864798773))","List(1, 3, List(), List(0.9255004382357971, 0.007131399604243892, 0.0673681621599591))",0.0,creative
uc1.$b215121,8,factual,"List(1, 9, List(), List(0.0, 34.0, 0.07216494845360824, 0.6470588235294118, 0.4411764705882353, 291.0, 22.0, 15.0, 34.0))",2.0,"List(1, 9, List(), List(-0.14083985005207128, 0.0874090754333475, -0.13925740672168482, 0.9616581192804586, 1.7902872825791925, 0.2236099999526773, 0.6016800100554136, 0.5288092249886808, 0.10479195407053413))","List(1, 9, List(), List(-0.14083985005207128, 0.0874090754333475, -0.13925740672168482, 0.9616581192804586, 1.7902872825791925, 0.2236099999526773, 0.6016800100554136, 0.5288092249886808, 0.10479195407053413))","List(1, 3, List(), List(4.301001897985746, 0.38541155534697097, 25.313586546667285))","List(1, 3, List(), List(0.14336672993285818, 0.012847051844899033, 0.8437862182222429))",2.0,factual
uc1.$b392877,12,factual,"List(1, 9, List(), List(0.0, 6.0, 0.14285714285714285, 0.5, 0.3333333333333333, 14.0, 3.0, 2.0, 6.0))",2.0,"List(1, 9, List(), List(-0.14083985005207128, -0.5095108335070799, 0.19309651636719666, 0.514770383729046, 1.239358187897562, -0.7278737417548756, -0.3954152747774881, -0.10830733140595145, -0.5007302231195445))","List(1, 9, List(), List(-0.14083985005207128, -0.5095108335070799, 0.19309651636719666, 0.514770383729046, 1.239358187897562, -0.7278737417548756, -0.3954152747774881, -0.10830733140595145, -0.5007302231195445))","List(1, 3, List(), List(4.415313836446357, 4.478356494154497, 21.106329669399145))","List(1, 3, List(), List(0.14717712788154522, 0.1492785498051499, 0.7035443223133048))",2.0,factual
uc1.31175029086306,2,no_content,"List(1, 9, List(), List(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))",1.0,"List(1, 9, List(), List(-0.14083985005207128, -0.6374222425657429, -0.47853536987491796, -1.0046479171457565, -0.4635135593002045, -0.7759631727437051, -0.5528513723826831, -0.2063252631589718, -0.6304849753745614))","List(1, 9, List(), List(-0.14083985005207128, -0.6374222425657429, -0.47853536987491796, -1.0046479171457565, -0.4635135593002045, -0.7759631727437051, -0.5528513723826831, -0.2063252631589718, -0.6304849753745614))","List(1, 3, List(), List(0.8884803806426139, 28.026450799088927, 1.0850688202684582))","List(1, 3, List(), List(0.02961601268808713, 0.9342150266362975, 0.03616896067561527))",1.0,no_content
uc1.31822025422197,4,creative,"List(1, 9, List(), List(0.0, 36.0, 0.030373831775700934, 0.3333333333333333, 0.027777777777777776, 428.0, 12.0, 1.0, 36.0))",0.0,"List(1, 9, List(), List(-0.14083985005207128, 0.13004621178623516, -0.3357351323795151, 0.008297616770778468, -0.3216075803670573, 0.6941994317719363, 0.07689301803809692, -0.15731629728246163, 0.14804353815553975))","List(1, 9, List(), List(-0.14083985005207128, 0.13004621178623516, -0.3357351323795151, 0.008297616770778468, -0.3216075803670573, 0.6941994317719363, 0.07689301803809692, -0.15731629728246163, 0.14804353815553975))","List(1, 3, List(), List(27.765013147073912, 0.21394198812731677, 2.021044864798773))","List(1, 3, List(), List(0.9255004382357971, 0.007131399604243892, 0.0673681621599591))",0.0,creative
uc1.31822034002154,19,factual,"List(1, 9, List(), List(3.0, 20.0, 0.0, 0.4117647058823529, 0.0, 56.0, 7.0, 0.0, 17.0))",2.0,"List(1, 9, List(), List(0.42587826646166915, -0.2110508790368662, -0.47853536987491796, 0.24663774239819847, -0.4635135593002045, -0.5836054487883875, -0.18550047797056143, -0.2063252631589718, -0.26284651065201364))","List(1, 9, List(), List(0.42587826646166915, -0.2110508790368662, -0.47853536987491796, 0.24663774239819847, -0.4635135593002045, -0.5836054487883875, -0.18550047797056143, -0.2063252631589718, -0.26284651065201364))","List(1, 3, List(), List(23.559933637748358, 1.3558466540940723, 5.084219708157568))","List(1, 3, List(), List(0.7853311212582786, 0.045194888469802415, 0.16947399027191895))",0.0,creative
uc1.b3597514,12,factual,"List(1, 9, List(), List(0.0, 92.0, 0.8844621513944223, 0.9891304347826086, 0.0, 251.0, 91.0, 0.0, 92.0))",2.0,"List(1, 9, List(), List(-0.14083985005207128, 1.32388602966709, 3.6796955114806447, 2.0011578519761355, -0.4635135593002045, 0.08621162569887904, 4.222710254974899, -0.2063252631589718, 1.359087892535697))","List(1, 9, List(), List(-0.14083985005207128, 1.32388602966709, 3.6796955114806447, 2.0011578519761355, -0.4635135593002045, 0.08621162569887904, 4.222710254974899, -0.2063252631589718, 1.359087892535697))","List(1, 3, List(), List(5.838132030545092, 1.0100510368870252, 23.151816932567886))","List(1, 3, List(), List(0.1946044010181697, 0.03366836789623417, 0.7717272310855962))",2.0,factual


In [87]:
# get the vol_id and pages that are not categorized as creative content and probability greater than min_threshold
# send this as the feed back to the coder/labeler or HIT
import numpy as np
#creative_feedbacks = spark.createDataFrame(predictions.rdd.map(lambda x: {"vol_id":x.vol_id,"page":x.page,"p_c":float(x.probability[0]),"p_f":float(x.probability[1]),"p_n":float(x.probability[2])}).filter(lambda x: np.argmax(list(x.values())[2:])!=0 and list(x.values())[2]>=min_threshold))
creative_feedbacks = spark.createDataFrame(predictions.rdd.map(lambda x: {"vol_id":x.vol_id,"pred_label":x.predictedLabel,"page":x.page,"pred":x.prediction,"prob":[float(x.probability[0]),float(x.probability[1]),float(x.probability[2])]}).filter(lambda x: np.argmax(x["prob"])!=0 and x["prob"][2]>=min_threshold))
                                           #.map(lambda x:list(x.values())))

In [88]:
display(creative_feedbacks)

page,pred,pred_label,prob,vol_id
18,2.0,factual,"List(0.4802140259520421, 0.01995330973395019, 0.49983266431400775)",inu.30000095243873
3,2.0,factual,"List(0.17607949038358897, 0.14798208995780668, 0.6759384196586045)",osu.32435003329760
8,2.0,factual,"List(0.14336672993285818, 0.012847051844899033, 0.8437862182222429)",uc1.$b215121
12,2.0,factual,"List(0.14717712788154522, 0.1492785498051499, 0.7035443223133048)",uc1.$b392877
12,2.0,factual,"List(0.1946044010181697, 0.03366836789623417, 0.7717272310855962)",uc1.b3597514
8,2.0,factual,"List(0.21177685125215553, 0.022281172902980907, 0.7659419758448636)",uc1.c059195501
19,2.0,factual,"List(0.3517390820212944, 0.011827782092262118, 0.6364331358864435)",umn.31951d01945217i
8,2.0,factual,"List(0.06479258283689876, 0.35427154904992164, 0.5809358681131797)",uva.x000082628
12,2.0,factual,"List(0.43878963513610697, 0.10137822247835467, 0.45983214238553843)",mdp.39015071417169
16,2.0,factual,"List(0.173103092151659, 0.010924251688234975, 0.8159726561601061)",uc1.b3585156


In [89]:
creative_feedbacks.count()

In [90]:
from pyspark.sql.functions import lit

creative_feedbacks = creative_feedbacks.withColumn('file_name', lit(next_file_name))

In [91]:
display(creative_feedbacks)

page,pred,pred_label,prob,vol_id,file_name
18,2.0,factual,"List(0.4802140259520421, 0.01995330973395019, 0.49983266431400775)",inu.30000095243873,dbfs:/tmp/FrontMatter-next.csv
3,2.0,factual,"List(0.17607949038358897, 0.14798208995780668, 0.6759384196586045)",osu.32435003329760,dbfs:/tmp/FrontMatter-next.csv
8,2.0,factual,"List(0.14336672993285818, 0.012847051844899033, 0.8437862182222429)",uc1.$b215121,dbfs:/tmp/FrontMatter-next.csv
12,2.0,factual,"List(0.14717712788154522, 0.1492785498051499, 0.7035443223133048)",uc1.$b392877,dbfs:/tmp/FrontMatter-next.csv
12,2.0,factual,"List(0.1946044010181697, 0.03366836789623417, 0.7717272310855962)",uc1.b3597514,dbfs:/tmp/FrontMatter-next.csv
8,2.0,factual,"List(0.21177685125215553, 0.022281172902980907, 0.7659419758448636)",uc1.c059195501,dbfs:/tmp/FrontMatter-next.csv
19,2.0,factual,"List(0.3517390820212944, 0.011827782092262118, 0.6364331358864435)",umn.31951d01945217i,dbfs:/tmp/FrontMatter-next.csv
8,2.0,factual,"List(0.06479258283689876, 0.35427154904992164, 0.5809358681131797)",uva.x000082628,dbfs:/tmp/FrontMatter-next.csv
12,2.0,factual,"List(0.43878963513610697, 0.10137822247835467, 0.45983214238553843)",mdp.39015071417169,dbfs:/tmp/FrontMatter-next.csv
16,2.0,factual,"List(0.173103092151659, 0.010924251688234975, 0.8159726561601061)",uc1.b3585156,dbfs:/tmp/FrontMatter-next.csv


from 2000 pages, there are about 623 pages that is need to recheck because it has prediction probability above our threshold. This might be the case of misclassification for creative content prediction. We will store this suspect pages in the table to be checked by the coders for the next iteration

In [93]:
dbutils.fs.rm("/delta/suspect_pages",recurse=True)
creative_feedbacks.write.partitionBy("file_name").format("delta").mode("overwrite").save("/delta/suspect_pages")
spark.sql("CREATE TABLE IF NOT EXISTS suspect_pages USING DELTA LOCATION '/delta/suspect_pages/'")

In [94]:
%sql
select * from suspect_pages

page,pred,pred_label,prob,vol_id,file_name
5,2.0,factual,"List(0.23782212940462796, 0.028889673570289476, 0.7332881970250825)",coo.31924054774900,dbfs:/tmp/FrontMatter-next.csv
7,2.0,factual,"List(0.13815251612580406, 0.07438012541938134, 0.7874673584548145)",uc1.$b260868,dbfs:/tmp/FrontMatter-next.csv
16,2.0,factual,"List(0.4082072498142188, 0.01471822686347107, 0.5770745233223101)",uc1.32106010619358,dbfs:/tmp/FrontMatter-next.csv
14,2.0,factual,"List(0.3068431393928134, 0.022709005555988142, 0.6704478550511985)",uc1.b2882064,dbfs:/tmp/FrontMatter-next.csv
6,2.0,factual,"List(0.09143003968776578, 0.2798539368441794, 0.6287160234680548)",uc1.b3441714,dbfs:/tmp/FrontMatter-next.csv
14,2.0,factual,"List(0.06527331658174418, 0.2979887465117379, 0.6367379369065179)",uc1.b3441714,dbfs:/tmp/FrontMatter-next.csv
2,2.0,factual,"List(0.1563957341199594, 0.049206593856204676, 0.7943976720238359)",wu.89065829251,dbfs:/tmp/FrontMatter-next.csv
6,2.0,factual,"List(0.18713041178287954, 0.11867907109289348, 0.694190517124227)",wu.89091310656,dbfs:/tmp/FrontMatter-next.csv
4,2.0,factual,"List(0.08932016562498575, 0.21216438838174823, 0.698515445993266)",wu.89099391773,dbfs:/tmp/FrontMatter-next.csv
11,2.0,factual,"List(0.2411774697048909, 0.006774384098204178, 0.752048146196905)",coo.31924054774900,dbfs:/tmp/FrontMatter-next.csv


From the saved table we can export it as a csv or import it somewhere in the table that is accessible to the coders to improve the prediction label.

Next, supposed they already checked the label, we can compare the accuracy of our creative prediction content compare to the truth value

In [96]:
#filtered_sql = filtered_sql.withColumn("label1",when(col("label1")=="mixed","creative").otherwise(col("label1")))
"""
from pyspark.sql.functions import col,lit,array,create_map
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def pred_tolabel(pred,labelIndex = ['creative', 'no_content', 'factual']):  
  return labelIndex[pred]

udf_func = udf(pred_tolabel,StringType())

creative_feedbacks = creative_feedbacks.withColumn("label1", udf_func(creative_feedbacks.pred))
creative_feedbacks.createOrReplaceTempView("suspect_pages")
"""

now we select mismatch between creative_feedbacks and the truth value

In [98]:
pages_df = spark.sql("select * from pages")
pages_df = pages_df.withColumn("label1",when(col("label1")=="mixed","creative").otherwise(col("label1")))

In [99]:
# select everything that is not match with label1
mismatch_df = pages_df.join(creative_feedbacks,["vol_id","page"]).filter((col("label1")!=col("pred_label")) & (col("label1")==lit("creative")))

In [100]:
display(mismatch_df)

vol_id,page,desc,file_name,label1,label2,pred,pred_label,prob,file_name.1
inu.30000095243873,18,,dbfs:/tmp/FrontMatter-next.csv,creative,poem,2.0,factual,"List(0.4802140259520421, 0.01995330973395019, 0.49983266431400775)",dbfs:/tmp/FrontMatter-next.csv
umn.31951d01945217i,19,,dbfs:/tmp/FrontMatter-next.csv,creative,main_text,2.0,factual,"List(0.3517390820212944, 0.011827782092262118, 0.6364331358864435)",dbfs:/tmp/FrontMatter-next.csv
mdp.39015071417169,12,,dbfs:/tmp/FrontMatter-next.csv,creative,image,2.0,factual,"List(0.43878963513610697, 0.10137822247835467, 0.45983214238553843)",dbfs:/tmp/FrontMatter-next.csv
pst.000003434253,16,,dbfs:/tmp/FrontMatter-next.csv,creative,main_text,2.0,factual,"List(0.48850120956378185, 0.013106788571256613, 0.4983920018649615)",dbfs:/tmp/FrontMatter-next.csv
uc1.31822036236420,0,cover design,dbfs:/tmp/FrontMatter-next.csv,creative,cover,2.0,factual,"List(0.20029764167144665, 0.32202940866677715, 0.4776729496617762)",dbfs:/tmp/FrontMatter-next.csv
uc1.$b463115,5,,dbfs:/tmp/FrontMatter-next.csv,creative,ad,2.0,factual,"List(0.30749314739250155, 0.08081371074874903, 0.6116931418587495)",dbfs:/tmp/FrontMatter-next.csv
miun.acw8491.0009.001,7,premise,dbfs:/tmp/FrontMatter-next.csv,creative,pref_text,2.0,factual,"List(0.23769217465010056, 0.09660615114090937, 0.6657016742089901)",dbfs:/tmp/FrontMatter-next.csv
umn.31951d009032865,0,,dbfs:/tmp/FrontMatter-next.csv,creative,image,2.0,factual,"List(0.19884963703016323, 0.3897717982026678, 0.411378564767169)",dbfs:/tmp/FrontMatter-next.csv
uva.x001273725,6,cover art,dbfs:/tmp/FrontMatter-next.csv,creative,cover,2.0,factual,"List(0.17965892093412428, 0.10793474011593768, 0.712406338949938)",dbfs:/tmp/FrontMatter-next.csv
uc1.b4342361,6,cover design,dbfs:/tmp/FrontMatter-next.csv,creative,cover,2.0,factual,"List(0.14108655838216363, 0.07755221495240816, 0.7813612266654282)",dbfs:/tmp/FrontMatter-next.csv


In [101]:
mismatch_df.count()

From the observation above, we can see that we misclassified 119 non-creative pages in which 100 pages are predicted as factual content and 20 pages as no_content

now using all features data from the first and second labeling process, we build a new random forest model

In [104]:
filtered_sql = spark.sql("""
select s.*,p.label1,p.label2,p.desc from page_stat as s, pages as p
where p.vol_id = s.vol_id and p.page = s.page
and p.file_name='dbfs:/tmp/FrontMatter-next.csv'
""")

from pyspark.sql.functions import col,when
filtered_sql = filtered_sql.withColumn("label1",when(col("label1")=="mixed","creative").otherwise(col("label1")))

from pyspark.ml.linalg import Vectors
features_df = spark.createDataFrame(filtered_sql.rdd.map(lambda x:{"vol_id":x.vol_id,"page":x.page,"features":Vectors.dense([x.empty_line_count,
x.line_count,
x.pct_all_caps,
x.pct_begin_char_caps,
x.pct_end_numeric,
#x.pct_roman,
x.token_count,
x.total_capital,
x.total_numeric,
x.total_sentence])}))
#.sample(fraction=0.1,withReplacement=False))
#.collect()
#.map(lambda x: Row(tuple(x))).collect()

filtered_features = filtered_sql.join(features_df,["vol_id","page"]).select("vol_id","page","label1","features")

In [105]:
predictions = model_rf.transform(filtered_features)

# Select example rows to display.
predictions.select("predictedLabel", "label1", "features").show(10)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))
print("Error = %g" % (1-accuracy))

from the iterative training model, the iterative training dataset can actively improve the random forest prediction accuracy a little bit better than the first training process. We should follow up this by retraining the all models and choose the best model or doing cross validation. However, as we already did one full cycle of this Human in The Loop process, this finishes our proof of concept on iteratively producing new training data and retrain model to get a better predictor model for detecting creative content in the front matter pages.