## AWS Pipeline
This notebook reads in all the files in the Safegraph S3 bucket into the local raw data folder.

In [15]:
import os, sys
from dotenv import load_dotenv, find_dotenv
import boto3
from pathlib import Path
from loguru import logger
from datetime import datetime, timezone
import tempfile

from src import DATA_DIR

In [9]:
# find .env automagically by walking up directories until it's found
dotenv_path = find_dotenv()

# load up the entries as environment variables
load_dotenv(dotenv_path)

aws_access_key = os.environ.get("AWS_ACCESS_KEY")
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY")
aws_bucket = os.environ.get("AWS_BUCKET")

In [10]:
# Initialize the session with authentication
session = boto3.Session(
    aws_access_key_id=aws_access_key,
    aws_secret_access_key=aws_secret_access_key,
)

s3_client = session.client("s3", endpoint_url="https://s3.wasabisys.com")

In [17]:
PATTERNS_DIR = "monthly-patterns-2020-12"

In [28]:
AWS_FOLDERS = (
    "core-places-delivery/",
    f"{PATTERNS_DIR}/normalization_stats/",
    f"{PATTERNS_DIR}/normalization_stats_backfill/2020/12/13/04/2019/",
    f"{PATTERNS_DIR}/normalization_stats_backfill/2020/12/13/04/2020/",
    f"{PATTERNS_DIR}/patterns/",
    f"{PATTERNS_DIR}/patterns_backfill/2020/12/13/04/2019/",
    f"{PATTERNS_DIR}/patterns_backfill/2020/12/13/04/2020/",
)

In [29]:
def get_aws_files(folder):
    """Return the files in the s3 buckets as an iterator."""

    objects = s3_client.list_objects_v2(Bucket=aws_bucket, Prefix=folder)
    for response in objects["Contents"]:
        yield response

In [30]:
def download_with_progress_bar(local_path, remote_path):
    """Download a file from AWS and display a simple progress bar"""

    meta_data = s3_client.head_object(Bucket=aws_bucket, Key=remote_path)
    total_length = int(meta_data.get("ContentLength", 0))
    downloaded = 0

    def progress(chunk):
        nonlocal downloaded
        downloaded += chunk
        done = int(50 * downloaded / total_length)
        sys.stdout.write("\r[%s%s]" % ("=" * done, " " * (50 - done)))
        sys.stdout.flush()

    with open(local_path, "wb") as f:
        s3_client.download_fileobj(aws_bucket, remote_path, f, Callback=progress)

In [32]:
# Loop over each AWS folder we want to download
for aws_folder in AWS_FOLDERS:

    # Walk the AWS folder structure
    for response in get_aws_files(aws_folder):

        # Setup paths
        aws_path = Path(response["Key"])
        local_path = DATA_DIR / "raw" / aws_path

        # Download the new file if it doesn't exist or is out of date
        if (
            not local_path.exists()
            or datetime.utcfromtimestamp(local_path.stat().st_mtime).replace(
                tzinfo=timezone.utc
            )
            < response["LastModified"]
        ):

            # Log it
            logger.info(f"Processing {aws_path.name} from AWS...")

            # Setup local path
            if not local_path.parent.exists():
                local_path.parent.mkdir(parents=True)

            # Download
            download_with_progress_bar(str(local_path), str(aws_path))

In [None]:
aw