## Project Straylight

### Common Crawl Domain Reconnaissance

__Introduction:__
This notebook will provide the ability to configure and search the publicly available Common Crawl dataset of websites. Common Crawl is a freely available dataset which contains over 8 years of crawled data including over 25 billion websites, trillions of links, and petabytes of data.

__GitHub:__
* https://github.com/brevityinmotion/straylight

__Blog:__
* [Search the html across 25 billion websites for passive reconnaissance using common crawl](https://medium.com/@brevityinmotion/search-the-html-across-25-billion-websites-for-passive-reconnaissance-using-common-crawl-7fe109250b83?sk=5b8b4a7c506d5acba572c0b30137f7aa)

___Credits:___
* Special thank you to Sebastian Nagel for the tutorials and insight for utilizing the dataset!
* Many of the functions and code have been adapted from: http://netpreserve.org/ga2019/wp-content/uploads/2019/07/IIPCWAC2019-SEBASTIAN_NAGEL-Accessing_WARC_files_via_SQL-poster.pdf


### Prior to utilizing this notebook, the following three queries should be run within AWS Athena to configure the Common Crawl database.

#### Query 1
<code>CREATE DATABASE ccindex</code>

#### Query 2
<code>CREATE EXTERNAL TABLE IF NOT EXISTS ccindex (
  url_surtkey                   STRING,
  url                           STRING,
  url_host_name                 STRING,
  url_host_tld                  STRING,
  url_host_2nd_last_part        STRING,
  url_host_3rd_last_part        STRING,
  url_host_4th_last_part        STRING,
  url_host_5th_last_part        STRING,
  url_host_registry_suffix      STRING,
  url_host_registered_domain    STRING,
  url_host_private_suffix       STRING,
  url_host_private_domain       STRING,
  url_protocol                  STRING,
  url_port                      INT,
  url_path                      STRING,
  url_query                     STRING,
  fetch_time                    TIMESTAMP,
  fetch_status                  SMALLINT,
  content_digest                STRING,
  content_mime_type             STRING,
  content_mime_detected         STRING,
  content_charset               STRING,
  content_languages             STRING,
  warc_filename                 STRING,
  warc_record_offset            INT,
  warc_record_length            INT,
  warc_segment                  STRING)
PARTITIONED BY (
  crawl                         STRING,
  subset                        STRING)
STORED AS parquet
LOCATION 's3://commoncrawl/cc-index/table/cc-main/warc/';</code>

#### Query 3
<code>MSCK REPAIR TABLE ccindex</code>

In [None]:
import json
import boto3
import os

In [None]:
# This accelerator provides a function to retrieve a secret value from AWS secrets manager. The secret name along with the region must be passed into the function as parameters. The code has been adapted into a parameterized function from the canned template provided by AWS.
import json, boto3, time, requests, io, base64
import pandas as pd
from botocore.exceptions import ClientError

def get_secret(secret_name, region_name):

    #secret_name = "AmazonSageMaker-gmaps"
    #region_name = "us-east-2"

    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    # In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
    # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
    # We rethrow the exception by default.

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        if e.response['Error']['Code'] == 'DecryptionFailureException':
            # Secrets Manager can't decrypt the protected secret text using the provided KMS key.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'InternalServiceErrorException':
            # An error occurred on the server side.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'InvalidParameterException':
            # You provided an invalid value for a parameter.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'InvalidRequestException':
            # You provided a parameter value that is not valid for the current state of the resource.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
        elif e.response['Error']['Code'] == 'ResourceNotFoundException':
            # We can't find the resource that you asked for.
            # Deal with the exception here, and/or rethrow at your discretion.
            raise e
    else:
        # Decrypts secret using the associated KMS CMK.
        # Depending on whether the secret is a string or binary, one of these fields will be populated.
        if 'SecretString' in get_secret_value_response:
            secret = get_secret_value_response['SecretString']
            #secret = json.loads(secret)
            return json.loads(secret)
        else:
            decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])
            return json.loads(secret)

In [None]:
# This accelerator provides a standardized function for passing queries to Athena.

def queryathena(athenadb, athenabucket, query):
    athena = boto3.client('athena', region_name='us-east-1')
    qexec = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database':athenadb
        },
        ResultConfiguration={
            'OutputLocation':athenabucket
        }
    )
    execid = qexec['QueryExecutionId']
    return execid

In [None]:
# This accelerator provides a standardized pattern for retrieving Athena query results based on the execution id.
# This code is adapted from Evan Perotti from http://securityriskadvisors.com/blog/creating-a-project-sonar-fdns-api-with-aws/ and was adapted from the Lambda.

def retrieveresults(execid):
    athena = boto3.client('athena', region_name='us-east-1')
    s3 = boto3.client('s3')
    queryres = athena.get_query_execution(
        QueryExecutionId = execid
    )
    
    # Athena query checking code is from https://medium.com/dataseries/automating-athena-queries-from-s3-with-python-and-save-it-as-csv-8917258b1045
    # Loop until results are ready or fail after 5 minutes
    status = 'RUNNING'
    iterations = 60
    
    while (iterations>0):
        iterations = iterations - 1
        response_get_query_details = athena.get_query_execution(
        QueryExecutionId = execid
        )
        status = response_get_query_details['QueryExecution']['Status']['State']
        print(status)
        if (status == 'FAILED') or (status == 'CANCELLED'):
            return False, False
        elif status == 'SUCCEEDED':
            try:
                outputloc = queryres['QueryExecution']['ResultConfiguration']['OutputLocation']
                full = outputloc[5:] # trim s3:// prefix
                bucketloc = full.split('/')[0] # get bucket from full path
                keyloc = full.replace(bucketloc,'')[1:] # get key and remove starting /
    
                url = s3.generate_presigned_url(
                    'get_object',
                    Params={
                    'Bucket':bucketloc,
                    'Key':keyloc
                    }
                )
                return url
            except:
                url = "No results"
                return url
        else:
            time.sleep(5)

In [None]:
# Make sure to update these values
DOMAIN_TO_QUERY = 'thefarmersjournal.com' # This should look like 'domain.com'. The wildcard will be added automatically later.
ATHENA_BUCKET = 's3://cc.index.gkn0672/' # This will need to be customized and specific to your own account (i.e. s3://customname-athena').
ATHENA_DB = 'ccindex' # This should align with the database and not need changed if it was created using the previous queries.
ATHENA_TABLE = 'ccindex' # This should align with the table and not need changed if it was created using the previous queries.

# Do not modify this query unless the intent is to customize
query = "SELECT url, url_query, warc_filename, warc_record_offset, warc_record_length, fetch_time FROM %s WHERE subset = 'warc' AND url_host_registered_domain = '%s';" % (ATHENA_TABLE, DOMAIN_TO_QUERY)

execid = queryathena(ATHENA_DB, ATHENA_BUCKET, query)
print(execid)

In [None]:
import json, boto3, time, requests
import pandas as pd
import io
    
# Utilize executionID to retrieve results
downloadURL = retrieveresults(execid)

# Load output into dataframe
s=requests.get(downloadURL).content
dfhosts=pd.read_csv(io.StringIO(s.decode('utf-8')))
dfhosts

In [None]:
# Drop duplicates keeping the latest version - if you want to review changes between fetch, you may not want to run this
dfhosts = dfhosts.sort_values('fetch_time').drop_duplicates('url',keep='last',ignore_index=True)
dfhosts

In [None]:
# Output results to excel spreadsheet
dfhosts['url'].to_csv("cc-urls-loigiaihay.csv") 

In [None]:
pd.set_option('display.max_colwidth', None)
dfhosts['url'].head(10)

In [None]:
import warcio
from warcio.archiveiterator import ArchiveIterator
import os
from bs4 import BeautifulSoup 
from bs4 import Comment
import io
from io import BytesIO
import logging
import dask.dataframe as ddf
import multiprocessing
import dask.bag as db
import gc

# Thank you to Sebastian Nagel for your instructions and code to perform the following step.
# http://netpreserve.org/ga2019/wp-content/uploads/2019/07/IIPCWAC2019-SEBASTIAN_NAGEL-Accessing_WARC_files_via_SQL-poster.pdf
titles_list = []
uris_list = []
links_list = []
comments_list = []
body_list = []

#Fetch all WARC records defined by filenames and offsets in rows, parse the records and the contained HTML, split the text into words and emit pairs <word, 1>
def processwarcrecords(dfhosts, writefiles, howmanyrecords):
    s3client = boto3.client('s3')
    recordcount = 0
    skippedrecords = 0
    processedrecords = 0
    totalrecords = len(dfhosts.index)
    if howmanyrecords == 0:
        howmanyrecords = totalrecords
    for index, row in dfhosts.iterrows():
        if recordcount > howmanyrecords:
            break
        recordcount = recordcount + 1
        # print('Processing row ' + str(recordcount) + ' of ' + str(totalrecords) + ' total rows.')
        if recordcount % 1000 == 0:
            print('Processed ' + str(processedrecords) + ' records.')
        url = row['url']
        warc_path = row['warc_filename']
        offset = int(row['warc_record_offset'])
        length = int(row['warc_record_length'])
        rangereq = 'bytes={}-{}'.format(offset, (offset+length-1))
        filepath = os.path.join(os.getcwd(), 'tmp', url.replace("https://","").replace("http://","") + str(offset) + '.html')
         # Check if the file already exists
        if os.path.exists(filepath):
            # print('File exists: ' + filepath)
            skippedrecords = skippedrecords + 1
            if skippedrecords % 1000 == 0:
                print('Skipped ' + str(skippedrecords) + ' records.')
            continue  # Skip processing this record
        response = s3client.get_object(Bucket='commoncrawl',Key=warc_path,Range=rangereq)

        max_retries = 10
        retry_delay = 60  # seconds

        for retry_count in range(max_retries):
            try:
                response = s3client.get_object(Bucket='commoncrawl', Key=warc_path, Range=rangereq)
                # Process the response
                break  # Break out of the retry loop if successful
            except Exception as e:
                print(f"Error: {e}")
                time.sleep(retry_delay * 2**retry_count)

        record_stream = BytesIO(response["Body"].read())
        for record in ArchiveIterator(record_stream):
            if record.rec_type == 'response':
                try:
                    warc_target_uri = record.rec_headers.get_header('WARC-Target-URI')
                    page = record.content_stream().read()
                    soup = BeautifulSoup(page, 'html.parser') # lxml should be faster but is not
                    title = soup.title.string
                    titles_list.append((warc_target_uri, title))
                    uris_list.append((warc_target_uri))

                    # Find all links
                    for link in soup.find_all('a'):
                        links_list.append((warc_target_uri, link.get('href')))
                    # Find all links
                    for comment in soup.find_all(text=lambda text: isinstance(text, Comment)):
                        comments_list.append((warc_target_uri, comment))
                    # Find all body text
                    for body in soup.find_all('body'):
                        body_list.append((warc_target_uri, body))

                    if writefiles == 'yes':
                        page = page.decode("utf-8") 
                        url = url.replace("https://","")
                        url = url.replace("http://","")
                        url = url + str(offset) + '.html'
                        filepath = os.getcwd() + '/tmp/' + url
                        os.makedirs(os.path.dirname(filepath), exist_ok=True)

                        if os.path.exists(filepath):
                        # print('File exists: ' + filepath)
                            skippedrecords = skippedrecords + 1
                            if skippedrecords % 1000 == 0:
                                print('Skipped ' + str(skippedrecords) + ' records.')

                        else:
                            with open(filepath, "w", encoding='utf-8') as text_file:
                                text_file.write(soup.prettify())
                                processedrecords = processedrecords + 1
                    # Clear memory after processing each record
                    del page, soup, title
                    gc.collect()


                except Exception as e:
                    logger = logging.getLogger('errorhandler')
                    print(logger.error('Error: '+ str(e)))
                    skippedrecords = skippedrecords + 1
                    print('Skipped ' + str(skippedrecords) + ' records.')
            del response, record_stream
            gc.collect()


# searchfiles = 'yes' # anything other than 'yes' will not process
writefiles = 'yes' # anything other than 'yes' will not process
howmanyrecords = 0 # 0 is all records; other options would be a numeric value
processwarcrecords(dfhosts,writefiles,howmanyrecords)

df_dask = ddf.from_pandas(dfhosts, npartitions=6)   # where the number of partitions is the number of cores you want to use
df_dask.apply(lambda x: processwarcrecords(dfhosts,writefiles,howmanyrecords), axis=1, meta=('str')).compute(scheduler='multiprocessing')
# df_dask['output'] = df_dask.apply(lambda x: (x), meta=('str')).compute(scheduler='multiprocessing')



In [None]:
# Load lists into dataframes for further processing
dfcomments = pd.DataFrame(comments_list,columns=['URI','Comment'])
dftitles = pd.DataFrame(titles_list,columns=['URI','Title'])
dflinks = pd.DataFrame(links_list,columns=['URI','Link'])
dfbody = pd.DataFrame(body_list,columns=['URI','Body'])
#dflinks.head(10)
dfcomments.head(10)

In [None]:
# Search for keywords within the comments
import re
pd.set_option('display.max_colwidth', None)
search_values = []
dfcomments[dfcomments.Comment.str.contains('|'.join(search_values ),flags=re.IGNORECASE)]

In [None]:
# Export html search results to excel

#with pd.ExcelWriter('cc-domains.xlsx') as writer:  
#    dfcomments.to_excel(writer, sheet_name='comments')
#    dftitles.to_excel(writer, sheet_name='titles')
#    dflinks.to_excel(writer, sheet_name='links')

# if dataframe has over 65535 rows, Excel will skip data. In this situation, .csv is better.
#compression_opts = dict(method='zip',archive_name='out.csv')  
dfcomments.to_csv('dfcomments.csv', header=True, index=False)
dftitles.to_csv('dftitles.csv', header=True, index=False) 
dflinks.to_csv('dflinks.csv', header=True, index=False) 
dfbody.to_csv('dfbody.csv', header=True, index=False)

In [None]:
# Zip the file for download out of Jupyter
filepath = os.getcwd() + '/tmp/'
# This will create a file named domainoutput.tar.gz with the full html files in the structure of the website. It can be downloaded from the same directory running the notebook.
! tar -zcvf domainoutput.tar.gz $filepath
# This will clean-up the tmp folder
! rm -rf tmp/*

In [None]:
# Additional example queries to run with this configuration.
# These can be run directly within the Athena query window in the AWS console or can be integrated into this notebook instead of using the pre-defined query.

# Search the entire common crawl data set for specific URL parameters.
SELECT url,
       warc_filename,
       warc_record_offset,
       warc_record_length,
       url_query
FROM "ccindex"."ccindex"
WHERE subset = 'warc'
  AND url_query like 'cmd='

In [None]:
# Count the number of distinct websites for a specific domain

SELECT DISTINCT COUNT(url) as URLCount
FROM "ccindex"."ccindex"
WHERE  subset = 'warc'
  AND url_host_registered_domain = 'domain.com'