# Rearc Data Quest - Part 1

## Install dependencies into the kernel

In [1]:
%pip install --quiet --upgrade pip
%pip install --quiet boto3 requests tqdm

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


## Configuration

In [1]:
from datetime import datetime
BUCKET_NAME = 'rearc-dataquest-harpreet'
AWS_REGION  = 'us-east-2'
S3_PREFIX   = 'part1/bls/pr/'
DEFAULT_SOURCE = 'https://download.bls.gov/pub/time.series/pr/'
DELETE_MISSING_IN_S3 = True
DRY_RUN = False
RATE_LIMIT_SECONDS = 0.2
MAX_RETRIES = 3
print(f'Bucket: s3://{BUCKET_NAME}/{S3_PREFIX} | Region: {AWS_REGION}')


Bucket: s3://rearc-dataquest-harpreet/part1/bls/pr/ | Region: us-east-2


## Imports & hardened HTTP session

In [2]:
# Imports & hardened HTTP session (non-blocking)
import os, argparse, hashlib, json, random, sys, time, re
from urllib.parse import urljoin, urlparse

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from tqdm import tqdm

# --- IMPORTANT: prevent boto3 from hanging while probing EC2 metadata (IMDS) for creds
os.environ.setdefault("AWS_EC2_METADATA_DISABLED", "true")

UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Rearc-DataQuest/1.0 (+https://rearc.io)"

def build_http_session():
    s = requests.Session()
    s.headers.update({
        "User-Agent": UA,
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
        "Connection": "keep-alive",
    })
    retry = Retry(
        total=5,
        backoff_factor=0.5,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET", "HEAD"],
        raise_on_status=False,
    )
    adapter = HTTPAdapter(max_retries=retry, pool_connections=20, pool_maxsize=20)
    s.mount("https://", adapter)
    s.mount("http://", adapter)
    return s

HTTP = build_http_session()

# --- Defer S3 client creation until needed (avoids any credential probing in this cell)
def get_s3():
    # You can also set profile_name=... if you want to force a specific profile
    cfg = Config(retries={"max_attempts": 10, "mode": "standard"},
                 connect_timeout=5, read_timeout=20)
    session = boto3.Session(region_name=AWS_REGION)
    return session.client("s3", config=cfg)

print("HTTP session ready; S3 client will be created lazily by get_s3(). Cell finished.")


HTTP session ready; S3 client will be created lazily by get_s3(). Cell finished.


## Directory discovery with fallback

In [3]:
FALLBACK_FILENAMES = [
    'pr.class','pr.contacts','pr.data.0.Current','pr.data.1.AllData',
    'pr.duration','pr.footnote','pr.measure','pr.period',
    'pr.seasonal','pr.sector','pr.series','pr.txt'
]

def robust_list_files(index_url: str):
    try:
        resp = HTTP.get(index_url, timeout=(10,30))
        if resp.status_code == 403:
            raise RuntimeError('403 on index')
        resp.raise_for_status()
        html = resp.text
        hrefs = re.findall(r'href\s*=\s*[\'\"][^\'\"]+[\'\"]', html, flags=re.IGNORECASE)
        keep = []
        for m in re.finditer(r'href\s*=\s*[\'\"]([^\'\"]+)[\'\"]', html, flags=re.IGNORECASE):
            h = m.group(1)
            if h.startswith('?') or h.endswith('/'):
                continue
            parsed = urlparse(h)
            segment = (parsed.path or h).split('/')[-1].strip()
            if segment.startswith('./'):
                segment = segment[2:]
            if segment.lower().startswith('pr.'):
                keep.append((segment, urljoin(index_url, h)))
        print(f'Parsed {len(hrefs)} hrefs; keeping {len(keep)} pr.* files')
        if keep:
            for name, u in keep[:10]:
                print(' -', name, '->', u)
            return sorted(keep)
        print('Index parsed but empty; falling back to known filenames…')
        raise RuntimeError('empty-parse')
    except Exception as e:
        print(f'Directory listing unavailable ({e}); using fallback list.')
        keep = [(name, urljoin(index_url, name)) for name in FALLBACK_FILENAMES]
        for name, u in keep:
            print(' - (fallback)', name, '->', u)
        return keep


## HTTP download with backoff & polite rate limiting

In [4]:
def http_get_with_backoff(url: str, rate_limit_sec: float = RATE_LIMIT_SECONDS, max_retries: int = MAX_RETRIES) -> bytes:
    for attempt in range(1, max_retries+1):
        try:
            r = HTTP.get(url, timeout=(10,60))
            if r.status_code == 403:
                raise RuntimeError('403 from source; try again later.')
            r.raise_for_status()
            time.sleep(rate_limit_sec)
            return r.content
        except requests.exceptions.RequestException as e:
            if attempt == max_retries:
                raise
            backoff = rate_limit_sec * (2 ** (attempt-1)) * (1 + random.random()*0.2)
            print(f"Transient error '{e}', retrying in {backoff:.2f}s … ({attempt}/{max_retries})")
            time.sleep(backoff)


## S3 helpers

In [5]:
def sha256_bytes(b: bytes) -> str:
    import hashlib
    return hashlib.sha256(b).hexdigest()

def s3_key_for(prefix: str, name: str) -> str:
    return f'{prefix}{name}' if prefix else name

def head_sha256(s3_client, bucket: str, key: str):
    try:
        resp = s3_client.head_object(Bucket=bucket, Key=key)
        return resp.get('Metadata', {}).get('sha256')
    except ClientError as e:
        code = e.response.get('Error', {}).get('Code')
        if code in ('404','NoSuchKey','NotFound'):
            return None
        raise

def put_with_metadata(s3_client, bucket: str, key: str, body: bytes, sha256: str, source_url: str, dry_run: bool):
    if dry_run:
        return 'would-put'
    s3_client.put_object(
        Bucket=bucket,
        Key=key,
        Body=body,
        ContentType='text/plain',
        Metadata={'sha256': sha256, 'source_url': source_url},
        ServerSideEncryption='AES256',
    )
    return 'put'

def list_keys(s3_client, bucket: str, prefix: str):
    keys = set()
    paginator = s3_client.get_paginator('list_objects_v2')
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get('Contents', []):
            keys.add(obj['Key'])
    return keys

def delete_key(s3_client, bucket: str, key: str, dry_run: bool):
    if dry_run:
        return 'would-delete'
    s3_client.delete_object(Bucket=bucket, Key=key)
    return 'deleted'


## Core sync

In [6]:
def sync_bls_to_s3(bucket: str, region: str, prefix: str,
                   source_url: str = DEFAULT_SOURCE,
                   delete_missing: bool = True,
                   dry_run: bool = False,
                   rate_limit_sec: float = RATE_LIMIT_SECONDS,
                   max_retries: int = MAX_RETRIES) -> dict:
    session = boto3.Session(region_name=region)
    s3_client = session.client('s3')
    print('Discovering source files…')
    src_files = robust_list_files(source_url)
    print(f'Found {len(src_files)} files at source.')
    before = list_keys(s3_client, bucket, prefix)
    seen = set()
    results = []
    for name, url in tqdm(src_files, desc='Syncing', leave=False):
        key = s3_key_for(prefix, name)
        try:
            b = http_get_with_backoff(url, rate_limit_sec, max_retries)
            h = sha256_bytes(b)
            existing = head_sha256(s3_client, bucket, key)
            if existing == h:
                action = 'skipped'
            else:
                action = put_with_metadata(s3_client, bucket, key, b, h, url, dry_run)
            results.append({'name': name, 'key': key, 'action': action, 'sha256': h})
            seen.add(key)
        except Exception as e:
            results.append({'name': name, 'key': key, 'action': 'error', 'error': str(e)})
    removed = []
    if delete_missing:
        to_delete = sorted(before - seen)
        for key in to_delete:
            removed.append({'key': key, 'action': delete_key(s3_client, bucket, key, dry_run)})
    return {'synced': results, 'removed': removed}


## Run the sync

In [7]:
summary = sync_bls_to_s3(
    bucket=BUCKET_NAME,
    region=AWS_REGION,
    prefix=S3_PREFIX,
    source_url=DEFAULT_SOURCE,
    delete_missing=DELETE_MISSING_IN_S3,
    dry_run=DRY_RUN,
    rate_limit_sec=RATE_LIMIT_SECONDS,
    max_retries=MAX_RETRIES,
)

uploaded = sum(1 for x in summary['synced'] if x['action'] == 'put')
skipped  = sum(1 for x in summary['synced'] if x['action'] == 'skipped')
errors   = [x for x in summary['synced'] if x['action'] == 'error']
print('\n=== Sync Summary ===')
print('Uploaded/Updated:', uploaded)
print('Skipped         :', skipped)
print('Errors          :', len(errors))
if errors[:3]:
    import json as _json
    print('Sample error   :', _json.dumps(errors[:3], indent=2)[:800])
print('Removed        :', len(summary['removed']))


Discovering source files…
Parsed 13 hrefs; keeping 12 pr.* files
 - pr.class -> https://download.bls.gov/pub/time.series/pr/pr.class
 - pr.contacts -> https://download.bls.gov/pub/time.series/pr/pr.contacts
 - pr.data.0.Current -> https://download.bls.gov/pub/time.series/pr/pr.data.0.Current
 - pr.data.1.AllData -> https://download.bls.gov/pub/time.series/pr/pr.data.1.AllData
 - pr.duration -> https://download.bls.gov/pub/time.series/pr/pr.duration
 - pr.footnote -> https://download.bls.gov/pub/time.series/pr/pr.footnote
 - pr.measure -> https://download.bls.gov/pub/time.series/pr/pr.measure
 - pr.period -> https://download.bls.gov/pub/time.series/pr/pr.period
 - pr.seasonal -> https://download.bls.gov/pub/time.series/pr/pr.seasonal
 - pr.sector -> https://download.bls.gov/pub/time.series/pr/pr.sector
Found 12 files at source.


                                                        


=== Sync Summary ===
Uploaded/Updated: 12
Skipped         : 0
Errors          : 0
Removed        : 0




## Verify in S3 (programmatic check)

In [8]:
# Lazily create S3 client
s3 = get_s3()

resp = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=S3_PREFIX)
print('KeyCount:', resp.get('KeyCount', 0))
for obj in (resp.get('Contents') or [])[:20]:
    print(' -', obj['Key'], obj['Size'])


KeyCount: 12
 - part1/bls/pr/pr.class 102
 - part1/bls/pr/pr.contacts 562
 - part1/bls/pr/pr.data.0.Current 1564284
 - part1/bls/pr/pr.data.1.AllData 3187878
 - part1/bls/pr/pr.duration 176
 - part1/bls/pr/pr.footnote 40
 - part1/bls/pr/pr.measure 745
 - part1/bls/pr/pr.period 146
 - part1/bls/pr/pr.seasonal 79
 - part1/bls/pr/pr.sector 263
 - part1/bls/pr/pr.series 15657
 - part1/bls/pr/pr.txt 18343
