<h1> Get URIs of interest from commoncrawl. Gathers from August 2024 Capture </h1> 

In [1]:
import requests, json, time, os
from datetime import datetime

path = os.path.join(f"{os.getcwd()}/links.json")
if os.path.exists(path):
    os.remove(path)

In [2]:
# Functions to parse response from CC
def parse_json_lines(json_string):
    json_objects = []
    for line in json_string.splitlines():
        try:
            json_objects.append(json.loads(line))
        except json.JSONDecodeError:
            print(f"Failed to parse line: {line}")
    return json_objects

def get_uris(uri_pattern, master_dump, seen_urls):
    print(f'[*] getting uri for {uri_pattern}')
    base_uri = 'https://index.commoncrawl.org/CC-MAIN-2024-33-index?url={}&output=json'.format(uri_pattern)
    try:
        response = requests.get(base_uri).content
        response = json.loads(response)
    except (TypeError, json.decoder.JSONDecodeError) as _:
        if type(response) == dict and 'message' in response.keys() and 'Please reduce your request rate.' in response['message']:
            time.sleep(2)
            get_uris(uri_pattern)
            print('[-] Got timeout.... trying again')
        parsed_objects = parse_json_lines(response)
        file_name = uri_pattern.split('/')[0].replace('*.', '')
        if file_name in master_dump.keys():
            file_name_new = ''.join(uri_pattern.split('/')[:2]).replace('*.', '')
            print(f"[-] Already parsed a url for {file_name}. Changing to {file_name_new}")
            file_name = file_name_new
        for obj in parsed_objects:
            if obj['url'] in seen_urls:
                continue  # Skip if URL is already seen
            else:
                seen_urls.add(obj['url'])
            
            # Add filename if not in master_dump
            if file_name not in master_dump.keys():
                master_dump[file_name] = {}
            
            # Append URLs to master_dump
            if obj['filename'] not in master_dump[file_name].keys():
                master_dump[file_name][obj['filename']] = [obj['url']]
            else:
                master_dump[file_name][obj['filename']].append(obj['url'])
            print(f"[*] added {uri_pattern} as {obj['filename']}")
        
    return master_dump, seen_urls

In [3]:
# Used ChatGPT to get a bunch of these -- give me more uris like this, and verify that they have valid content using fuzzmatch to identify 404 pages before returning to me. return as a python list:
# '*.nasdaq.com/market-activity/earnings/*', '*.sec.gov/reports/*', '*.sec.gov/data-research/sec-markets-data/*', '*.tradingeconomics.com/*', '*.jpmorgan.com/insights/*', \
#     "*.wsj.com/news/markets/*",
#     "*.bloomberg.com/markets/*",
#     "*.ft.com/markets/*",
#     "*.economist.com/finance-and-economics/*",
#     "*hbr.org/topic/economics*",
#     "*.mckinsey.com/featured-insights/finance/*",
#     "*.goldmansachs.com/insights/topics/economics-and-markets*",
#     "*.jpmorgan.com/insights/*",
#     "*.morganstanley.com/ideas/*",
#     "*.blackrock.com/corporate/insights/blackrock-investment-institute/*",
#     "*.bridgewater.com/research-and-insights/*",
#     "*.imf.org/en/Publications/fandd/*",
#     "*.worldbank.org/en/publication/global-economic-prospects/*",
#     "*.federalreserve.gov/newsevents/speeches*",
#     "*.ecb.europa.eu/pub/economic-bulletin/html/*",
#     "*.ubs.com/global/en/wealth-management/chief-investment-office/market-insights/*",
#     "*.db.com/news/*",
#     "*.credit-suisse.com/about-us/en/reports-research/global-research/*",
#     "*.barclays.co.uk/wealth-management/news-and-insights/*",
#     "*.schroders.com/en/insights/*"

master_dump, seen_urls = {}, set()
uris = ['*.nasdaq.com/market-activity/earnings/*', '*.sec.gov/reports/*', '*.sec.gov/data-research/sec-markets-data/*', '*.tradingeconomics.com/*', '*.jpmorgan.com/insights/*', \
    "*.wsj.com/news/markets/*",
    "*.bloomberg.com/markets/*",
    "*.ft.com/markets/*",
    "*.economist.com/finance-and-economics/*",
    "*.hbr.org/topic/economics*",
    "*.mckinsey.com/featured-insights/finance/*",
    "*.goldmansachs.com/insights/topics/economics-and-markets*",
    "*.jpmorgan.com/insights/*",
    "*.morganstanley.com/ideas/*",
    "*.blackrock.com/corporate/insights/blackrock-investment-institute/*",
    "*.bridgewater.com/research-and-insights/*",
    "*.imf.org/en/Publications/fandd/*",
    "*.worldbank.org/en/publication/global-economic-prospects/*",
    "*.federalreserve.gov/newsevents/speeches*",
    "*.ecb.europa.eu/pub/economic-bulletin/html/*",
    "*.ubs.com/global/en/wealth-management/chief-investment-office/market-insights/*",
    "*.db.com/news/*",
    "*.credit-suisse.com/about-us/en/reports-research/global-research/*",
    "*.barclays.co.uk/wealth-management/news-and-insights/*",
    "*.schroders.com/en/insights/*",
    "*.benzinga.com/*",
    "*.finance.yahoo.com/*"
]

In [4]:
# Write to JSON blob
for index, link in enumerate(range(0, len(uris))):
    master_dump, seen_urls = get_uris(uris[link], master_dump, seen_urls)
    print(f"[+] parsed {uris[link]} ({index+1}/{len(uris)})")
    with open('links.json', 'a') as f:
        json.dump(master_dump, f, indent=4)
        f.write(',\n')
        break


# Add [] to the json file to separate by scrape URL

with open('links.json', 'r') as file:
    content = file.read()

# Wrap the content in square brackets and remove any trailing commas
content = '[\n' + content.strip().rstrip(',') + '\n]'

# Write the fixed content back to a new file
with open('links.json', 'w') as file:
    file.write(content)

[+] parsed *.nasdaq.com/market-activity/earnings/* (1/27)


<h3> Grab from public CC S3 </h3>
<p> Now that we have a blob of common crawl S3 paths and their respective links, we need to download them onto into S3 bucket. </p>

In [5]:
# Get distinct buckets
distinct_buckets = []
nondistinct_buckets = []
with open("links.json", 'r') as f:
    master_dump = json.loads(f.read())

for entry in master_dump:
    for key in entry.keys():
        distinct_buckets.extend(list(set(list(entry[key].keys()))))
        nondistinct_buckets.extend(list(entry[key].keys()))
print(f"Nonunique CC Buckets to pull: {len(nondistinct_buckets)}")
print(f"Unique CC Buckets to pull: {len(distinct_buckets)}")


Nonunique CC Buckets to pull: 12795
Unique CC Buckets to pull: 12795


In [6]:
!pip install boto3 warcio pyspark
!pip install warcio
!pip install https://github.com/commoncrawl/gzipstream/archive/master.zip




[notice] A new release of pip is available: 23.2.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 23.2.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


Collecting https://github.com/commoncrawl/gzipstream/archive/master.zip
  Using cached https://github.com/commoncrawl/gzipstream/archive/master.zip
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'



[notice] A new release of pip is available: 23.2.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


<h3>Tokenization</h3>

In [None]:
!pip install spacy
!python -m spacy download en_core_web_sm
nlp = spacy.load("en_core_web_sm")

In [None]:
# Tokenize content
doc = nlp("$BTC is looking at buying U.K. startup for $1 billion")

# Get NeR
# if ent.label_ == [ORG], save special
for ent in doc.ents:
    print(ent.text, ent.start_char, ent.end_char, ent.label_)

<h3>Transformation layer</h3>

In [8]:

from bs4 import BeautifulSoup
from warcio.archiveiterator import ArchiveIterator
import boto3
from boto.s3.key import Key
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import re

spark = SparkSession.builder \
    .appName("Save WARC JSON as Parquet") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()


def process_text(chunk, count_ner_org):
    doc = nlp(chunk)
    for ent in doc.ents:
        if ent.label_ == 'ORG':
            count_ner_org[ent.text]+=1
    return count_ner_org

def process_partition(uris):
    s3 = boto3.client('s3')
    bucket = "commoncrawl"

# Example: Extract the title of the HTML page
    for key_ in uris:
        try:
            response = s3.get_object(Bucket=bucket, Key=key_)
            file_ = response['Body']

            for record in ArchiveIterator(file_):
                if record.rec_type == 'response':
                    title, body_content, title_content, count_ner_org = None, None, None, None
                    count_ner_org = {}
                    url = record.rec_headers.get_header('WARC-Target-URI')
                    raw_date = record.rec_headers.get_header('WARC-Date')
                    date = datetime.strptime(raw_date, '%Y-%m-%dT%H:%M:%SZ').strftime('%Y-%m-%d %H:%M:%S')
                    content_type = record.http_headers.get_header('Content-Type')
                    content = record.content_stream().read().decode('utf-8')
                    if content_type == None:
                        continue
                    if content_type == 'text/html':
                        content_type_label = 'text/html'
                        soup = BeautifulSoup(content, 'html.parser')
                        title = soup.title.string
                        body_content = [para.get_text() for para in soup.find_all("p")]
                        title_content = [para.get_text() for para in soup.find_all(re.compile('^h[1-6]$'))]
                        for chunk in body_content:
                            count_ner_org = process_text(chunk)
                    elif 'json' in content_type:
                        content_type_label = 'application/json'
                    elif 'pdf' in content_type:
                        content_type_label = 'pdf'
                    elif content_type == 'application/xml':
                        content_type_label = 'xml'
                    elif content_type == 'text/csv':
                        content_type_label = 'csv'
                    elif content_type == 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet':
                        content_type_label = 'xlsx'
                    elif 'image' in content_type:
                        if 'jpeg' in content_type:
                            content_type_label = 'image/jpeg'
                        elif 'png' in content_type:
                            content_type_label = 'image/png'
                    else:
                        continue

                    yield {
                        "url":url,
                        "date":date,
                        "content":content,
                        "content_type":content_type_label,
                        "title": title,
                        "title_content":title_content,
                        "body_content":body_content,
                        "count_ner_org":count_ner_org,
                    }

        except Exception as e:
            print(f"Error accessing {key_}: {e}")
            continue

uri_rdd = spark.sparkContext.parallelize(distinct_buckets, numSlices=len(distinct_buckets))
json_rdd = uri_rdd.mapPartitions(process_partition)
df = json_rdd.map(lambda x: Row(**x)).toDF()

# Option 2: Create DataFrame using spark.createDataFrame()
df = spark.createDataFrame(json_rdd)

# Write DataFrame to Parquet file
output_path = "s3a://ai-crap/data/nasdaq.parquet"
df.write.mode("overwrite").parquet(output_path)

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.