## 1. Setting up the glue Spark Enviroment

In [None]:
%session_id_prefix common-crawl-etl
%additional_python_modules warcio,smart_open,goose3,bs4

%idle_timeout 600 
%number_of_workers 25
%worker_type G.2X
%glue_version 4.0

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder  \
    .appName("warc")  \
    .getOrCreate()

In [None]:
%list_sessions

## 2. Find out all related warc files in CommonCrawl

#### 2.1 Create MetaInfo CCIndex for CommonCrawl Data 

In [None]:
spark.sql("""CREATE DATABASE if not exists ccindex;""")

In [None]:
spark.sql("""
CREATE EXTERNAL TABLE IF NOT EXISTS ccindex.ccindex (
  url_surtkey                   STRING, 
  url                           STRING, 
  url_host_name                 STRING, 
  url_host_tld                  STRING, 
  url_host_2nd_last_part        STRING,
  url_host_3rd_last_part        STRING,
  url_host_4th_last_part        STRING,
  url_host_5th_last_part        STRING,
  url_host_registry_suffix      STRING, 
  url_host_registered_domain    STRING, 
  url_host_private_suffix       STRING,
  url_host_private_domain       STRING, 
  url_protocol                  STRING, 
  url_port                      INT, 
  url_path                      STRING, 
  url_query                     STRING,
  fetch_time                    TIMESTAMP, 
  fetch_status                  SMALLINT,
  content_digest                STRING, 
  content_mime_type             STRING, 
  content_mime_detected         STRING, 
  content_charset               STRING, 
  content_languages             STRING, 
  warc_filename                 STRING, 
  warc_record_offset            INT, 
  warc_record_length            INT, 
  warc_segment                  STRING) 
PARTITIONED BY (
  crawl                         STRING, 
  subset                        STRING)
STORED AS parquet
LOCATION 's3://commoncrawl/cc-index/table/cc-main/warc/';
""")

In [None]:
spark.sql("""MSCK REPAIR TABLE ccindex.ccindex;""")

#### 2.2 filter for the target warc files for specific knowledege 

- You need to modify the s3_bucket ( you can get the s3_bucket in output of cloudformation )

In [None]:
!aws sts get-caller-identity --query Account --output text

In [None]:
target_domain = "nbcnews.com"
url_host_domain = "https://www.nbcnews.com/"
topic = 'ufo'
s3_bucket = '687752207838-23-08-31-15-47-19-bucket'
# s3_bucket = '{}-knowledge-bucket'.format('687752207838')

In [None]:
url_pattern = f"{url_host_domain}%{topic}%"

filtered_data = spark.sql(
"""select * from ccindex.ccindex WHERE url_host_registered_domain = '{}' 
and crawl ='CC-MAIN-2023-23' and subset ='warc' and content_mime_detected = 'text/html'
and url like '{}' and date(fetch_time) = (date '2023-06-06')
""".format(target_domain, url_pattern)
 )

**Note: please be patient,  which may last for few minutes**

In [None]:
filename_list = ["s3://commoncrawl/"+row.warc_filename for row in filtered_data.select("warc_filename").collect()]

#### 2.3 check all related warc files

In [None]:
filename_list

## 3. Parse, extract and clean all these warc Files and write them into S3 bucket

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType, lit, udf, col
from pyspark.sql.types import StructType, StructField, StringType, MapType, ArrayType
from smart_open import open
from warcio.archiveiterator import ArchiveIterator
from goose3 import Goose
import pandas as pd
import re
from bs4 import BeautifulSoup

def post_process(content):
    content = content.replace('\r\n', '\n')
    content = re.sub(r'\n+', '\n', content)
    return content

schema = StructType([
    StructField('URL', StringType(), True),
    StructField('MimeType', StringType(), True),
    StructField('RawHTML', StringType(), True),
    StructField('ContentProcessed', StringType(), True),
    StructField('Title', StringType(), True),
    StructField('WARC-Date', StringType(), True),
    StructField('WARC-Record-ID', StringType(), True),
    StructField('Content-Length', StringType(), True),
    StructField('WARC-Filename', StringType(), True),
    StructField('WARC-Warcinfo-ID', StringType(), True),
    StructField('WARC-IP-Address', StringType(), True),
    StructField('WARC-Concurrent-To', StringType(), True),
    StructField('WARC-Payload-Digest', StringType(), True),
    StructField('WARC-Block-Digest', StringType(), True),
    StructField('WARC-Truncated', StringType(), True),
    StructField('WARC-S3Path', StringType(), True)
])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def generate_rows(pdf):
    url = pdf['url'][0]
    extractor = Goose()
    stream = open(url, 'rb')
    rows = []
    
    for record in ArchiveIterator(stream):
        page_uri = record.rec_headers.get_header('WARC-Target-URI', "")

        url_host_domain = "https://www.nbcnews.com/"
        topic = 'ufo'

        if record.rec_type == 'response' and page_uri.startswith(url_host_domain) and topic in page_uri:
            raw_html = record.content_stream().read()
            html_content = str(raw_html, 'utf-8', errors="ignore")
            article = extractor.extract(raw_html=html_content)
            content_processed = post_process(article.cleaned_text)
            title = article.title
            row = {
                'URL': record.rec_headers.get_header('WARC-Target-URI'),
                'MimeType': record.http_headers.get_header('Content-Type'),
                'RawHTML': raw_html,
                'ContentProcessed': content_processed,
                'Title': title,
                'WARC-Date': record.rec_headers.get_header('WARC-Date'),
                'WARC-Record-ID': record.rec_headers.get_header('WARC-Record-ID'),
                'Content-Length': record.rec_headers.get_header('Content-Length'),
                'WARC-Filename': record.rec_headers.get_header('WARC-Filename'),
                'WARC-Warcinfo-ID': record.rec_headers.get_header('WARC-Warcinfo-ID'),
                'WARC-IP-Address': record.rec_headers.get_header('WARC-IP-Address'),
                'WARC-Concurrent-To': record.rec_headers.get_header('WARC-Concurrent-To'),
                'WARC-Payload-Digest': record.rec_headers.get_header('WARC-Payload-Digest'),
                'WARC-Block-Digest': record.rec_headers.get_header('WARC-Block-Digest'),
                'WARC-Truncated': record.rec_headers.get_header('WARC-Truncated'),
                'WARC-S3Path': url
            }
            rows.append(row)
    return pd.DataFrame(rows)

def clean_with_goose_or_bs(raw_html):
    g = Goose()
    article = g.extract(raw_html=raw_html)
    # if article.cleaned_text:
    #     clean_text = article.cleaned_text
    # else:
    soup = BeautifulSoup(raw_html, 'html.parser')
    clean_text = soup.get_text()
    
    # return re.sub('\s+', ' ', clean_text.replace('\n', '')).strip()
    return clean_text

In [None]:
urls_df = spark.createDataFrame([(url,) for url in filename_list], ["url"])
result_df = urls_df.groupby("url").apply(generate_rows)

clean_udf = udf(clean_with_goose_or_bs, StringType())
result_df = result_df.withColumn("CleanText_Goose_or_BS", clean_udf(result_df["RawHTML"]))
# result_df.select('URL','WARC-Date','Title').show(21, truncate=False)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def remove_special_characters(input_string):
    pattern = r'[^a-zA-Z0-9\s]'
    cleaned_string = re.sub(pattern, '', input_string)
    return cleaned_string.replace(' ', '_')

def write_to_s3(title, content, bucket):
    new_title = remove_special_characters(title)
    s3_path = "s3://{}/ai-content/{}.txt".format(bucket, new_title)
    with open(s3_path, 'w') as s3_file:
        s3_file.write(content)
        
    return s3_path

In [None]:
process_row_udf = udf(write_to_s3, StringType())
final_df = result_df.withColumn("s3_path", process_row_udf(col('Title'), col('CleanText_Goose_or_BS'), lit(s3_bucket)))

**Note: please be patient,  which may last for few minutes**

In [None]:
s3_files = final_df.collect()

**Note: After it finish, all content will be ingested into OpenSearch automatically**

In [None]:
spark.stop()