In [None]:
%stop_session

In [None]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.2X
%number_of_workers 10
%additional_python_modules  langdetect, datasketch, kshingle, beautifulsoup4,htmldate,wordninja, torch,keybert, transformers==4.20.1, python_docx, docx,spacy, pikepdf, PyPDF2, openpyxl, PyMuPDF, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/pdfminer.six-20221105-py3-none-any.whl, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/word_forms-2.1.0-py3-none-any.whl, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/en_core_web_sm-3.5.0-py3-none-any.whl, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/en_core_web_lg-3.5.0-py3-none-any.whl
%extra_py_files s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/date_generation.zip, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/text_hashing.zip, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/document_type_identification.zip, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/docx_to_text.zip, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/html_to_text.zip, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/keyword_extraction.zip, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/legislative_origin.zip, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/odf_to_text.zip, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/pdf_to_text.zip, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/title_generation.zip, s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/python_modules/summarisation.zip
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
logger = glueContext.get_logger()

In [None]:
DATA_SOURCE_PREFIX='temp'
DATA_SOURCE_BUCKET_NAME='beis-dev-datalake'
DTI_RULEBOOK='dti/doc_type_rules_v.2.jsonl'
TOPIC_MAPPING_FILE_PATH='s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/resources/topic_id_mapping.parquet'
RESOURCES_BUCKET='s3://aws-glue-assets-412071276468-eu-west-2/glue_resources/resources/'
PROCESSED_METADATA_BUCKET=f's3://{DATA_SOURCE_BUCKET_NAME}/glue_processed_metadata/'

In [None]:
import nltk
sc.addFile(RESOURCES_BUCKET,True)

import pandas as pd
import spacy
import json
from pdf_to_text.pdf_to_text import pdf_converter
from odf_to_text.odf_to_text import odf_converter
from docx_to_text.docx_to_text import docx_converter
from html_to_text.html_to_text import html_converter
from text_hashing.hashing import create_hash
from date_generation.date_generation import date_generation
from legislative_origin.lo_extraction import lo_extraction
from title_generation.title_generation import title_generator
from document_type_identification.rule_based_dti import dti
from keyword_extraction.keyword_extraction import  keyword_extraction
from summarisation.summarisation import summarizer
import boto3
import io
from pyspark.sql.types import StructType,StructField, StringType,ArrayType
from pyspark import SparkFiles
from datetime import datetime
import pyspark.sql.functions as F
from uuid import uuid4

s3_rsc=boto3.resource('s3')
s3_cli=boto3.client('s3')

doc_format_map = {
    'PDF': pdf_converter,
    'ODF': odf_converter,
    'ODT': odf_converter,
    'DOCX': docx_converter,
    # 'doc': docx_converter,
    'HTML': html_converter
}
md_schema = StructType([
        StructField("raw_uri", StringType(), True),
        StructField("text", StringType(), True),
        StructField("uri", StringType(), True),
        StructField("title", StringType(), True),
        StructField("date_published", StringType(), True),
        StructField("document_uid", StringType(), True),
        StructField("regulator_id", StringType(), True),
        StructField("summary", StringType(), True),
        StructField("language", StringType(), True),
        StructField("document_type", StringType(), True),
        StructField("hash_text", StringType(), True),
        StructField("regulatory_topic",ArrayType(StringType()), True),
        StructField("assigned_orp_topic",ArrayType(StringType()), True),
        StructField("legislative_origins", ArrayType(
            StructType([
                StructField("title", StringType(), True),
                StructField("ref", StringType(), True),
                StructField("href", StringType(), True),
                StructField("number", StringType(), True),
                StructField("division", StringType(), True),
                StructField("type", StringType(), True)
             ]))),
        StructField("keywords",ArrayType(StringType()), True),
    
        ])

null_ret = ([None]*10)+[[],[], [], []]

In [None]:
rule_json = s3_cli.get_object(Bucket=DATA_SOURCE_BUCKET_NAME, Key=DTI_RULEBOOK)['Body'].read().decode('utf-8')
dti_patterns =[json.loads(line) for line in rule_json.split('\n') if line.strip()]
nlp = spacy.load("en_core_web_sm", exclude=['entity_ruler',  'ner'])
nlp.add_pipe("entity_ruler", config={'phrase_matcher_attr':'LOWER'}).add_patterns(dti_patterns)

def download_text(s3_client, object_key, source_bucket):
        '''Downloads the PDF from S3 ready for conversion and metadata extraction'''

        document = s3_client.get_object(
            Bucket=source_bucket,
            Key=object_key
        )['Body'].read()

        doc_bytes_io = io.BytesIO(document)
        return doc_bytes_io

def get_reg_id(uri, doc_format):
    if doc_format == 'HTML':
        return 'hse' if 'hse.gov.uk' in uri else 'ea'
    else: return uri.split('/')[1].lower()

# convert topics into graph-friendly topics 
topic_df = pd.read_parquet(TOPIC_MAPPING_FILE_PATH)
def get_topic_path(topics):
    ftopics = []
    assigned = []
    for topic in topics:
        idx = topic_df.where(topic_df==topic).dropna(how='all').dropna(axis=1).index
        if list(idx):
            assigned.append(topic_df.loc[idx[0]].path_id)
            t= assigned[-1].split('/')
            ftopics.extend(['/'.join(t[:i]) for i in range(2,len(t)+1)])
    return list(set(ftopics)), assigned

def extract_data( uri, parent_uri, doc_format, topics, nlp):
    try:
        s3 = boto3.client('s3')
        nltk.data.path.append(SparkFiles.get('resources/nltk_data'))
        btext = uri if doc_format=='HTML' else download_text(s3, uri, DATA_SOURCE_BUCKET_NAME)
        text, title, date_published = doc_format_map[doc_format](btext)
        document_uid = uuid4().hex
        reg_id =  get_reg_id(uri, doc_format)
        summary, lang = summarizer(text)
        ntitle = title_generator(text, title)
        ndp = date_generation(text, date_published)
        los = lo_extraction(text)
        keywords = keyword_extraction(text, title)
        document_type = dti(uri, parent_uri, text, ntitle, nlp)
        hash_text = create_hash(text)
        nuri = uri if doc_format=='HTML' else f'bulk/{uri.split("/")[-1]}'
        reg_topic, assigned = get_topic_path(topics)
        return uri,text, nuri, ntitle, ndp, document_uid, reg_id, summary,lang, document_type, hash_text,reg_topic, assigned, los, keywords
    except Exception as e:
        print(f'ERROR: {uri} \t{doc_format}')
        print(f'ERR.BODY:\n{e}')
        return [uri] + null_ret

    

In [None]:
# Import data from S3 into pyspark dataframe
flist=[obj.key for obj in s3_rsc.Bucket(DATA_SOURCE_BUCKET_NAME).objects.all() if obj.key.startswith(DATA_SOURCE_PREFIX)]

df= pd.DataFrame(flist, columns=['raw_uri'])
ext_type=('pdf','docx','odt','odf', 'html')
df['document_format'] =  df.raw_uri.apply(lambda x: 'dir' if x.endswith('/') else x.split('.')[-1])

# get parent_uri
colname = {
2:['topics','uri'],
3:['topics','parent_uri', 'uri'] ,
4:['topics','parent_uri','org_uri', 'uri'] }
p_ext = df[df.document_format=='parquet'].raw_uri
df_ext = pd.DataFrame()
for lk in p_ext:
    a = pd.read_parquet(download_text(s3_cli, lk, DATA_SOURCE_BUCKET_NAME))
    a.columns =  colname[a.shape[1]]
    df_ext=pd.concat([df_ext, a])
df['uri'] = df.apply(lambda x: x.raw_uri if x.document_format=='html' else x.raw_uri.split('/')[-1].split('.')[0], axis=1)

df = df.merge(df_ext, on='uri', how='left')
df_ext = df_ext[df_ext.uri.apply(str.startswith, args=['http'])]
df_ext['document_format']='html'
df_ext.rename(columns={'uri':'raw_uri'}, inplace=True)
df = pd.concat([df,df_ext])
df=df.drop('uri', axis=1)

df = df[df.document_format.isin(ext_type)].reset_index(drop=True)
df.document_format = df.document_format.apply(str.upper)


df.topics=df.topics.apply(lambda x: [x] if type(x)==str else list(x))

In [None]:
schema = StructType([
        StructField("raw_uri", StringType(), True),
        StructField("document_format", StringType(), True),
        StructField("topics",ArrayType(StringType()), True),
        StructField("parent_uri", StringType(), True)
])

DF = spark.createDataFrame(df, schema=schema)
static_md = [
('date_uploaded', datetime.now().isoformat()),
    ('status','published'),
    ('user_id','bulk_uploader'),
    ('version', 1)
]
for k,v in static_md:
    DF = DF.withColumn(k, F.lit(v))

In [None]:
out = DF.rdd.map(lambda x: extract_data(x['raw_uri'],x['parent_uri'], x['document_format'],x['topics'], nlp))

df2=out.toDF(schema=md_schema)

dff = DF.join(df2, on='raw_uri', how='outer')


In [None]:
from time import time
t = time()
dff.write.mode('overwrite').parquet(PROCESSED_METADATA_BUCKET)
print(f'JOB DONE: {time()-t} sec')