In [1]:
# Run me if working on local machine
import os
import sys
os.chdir("../../..")
sys.path.append('./src')

In [2]:
%pwd

'/Users/tolya/Documents/code/dlrm'

In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
import logging
logger = logging.getLogger("notebooks.debug")

In [5]:
import utils.logging
utils.logging.setup("conf/logging/default.yaml")

In [6]:
import utils.configs
_ = utils.configs.setup("conf/app.yaml")

2023-11-26 01:13:35,421 - utils.configs - INFO - loading app config 'conf/app.yaml'...
2023-11-26 01:13:35,423 - utils.configs - INFO - loading app config 'conf/app.yaml': done


In [7]:
import dotenv
assert dotenv.load_dotenv(dotenv_path="conf/envs/dev.env")

---

In [16]:
import pyarrow
import pyarrow.dataset
import utils.aws.s3

In [9]:
s3_bucket = utils.configs.get("aws.s3.bucket")
s3_prefix = utils.configs.get("aws.s3.prefix")

In [10]:
s3_dir = os.path.join("s3://", s3_bucket, s3_prefix, "joined", "compact")

In [11]:
import boto3
import sagemaker
from sagemaker.spark.processing import PySparkProcessor

2023-11-26 01:13:36,168 - botocore.credentials - INFO - Found credentials in environment variables.
sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Users/tolya/Library/Application Support/sagemaker/config.yaml


In [12]:
boto_session = boto3.Session()
sagemaker_session = sagemaker.Session(boto_session=boto_session)

2023-11-26 01:13:37,285 - botocore.credentials - INFO - Found credentials in environment variables.
sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Users/tolya/Library/Application Support/sagemaker/config.yaml


In [None]:
def create_spark_processor(
    instance_count = 1,
    instance_type = "ml.m5.xlarge",
    volume_size_in_gb = 30,
    max_runtime_in_seconds = 2*60*60,
    framework_version = "2.4",
    spark_app_name = "dlrm-etl",
    sagemaker_role = utils.configs.get("aws.sagemaker.role"),
    tags = utils.configs.get("aws.sagemaker.tags")
):
    spark_processor = PySparkProcessor(
        sagemaker_session = sagemaker_session,
        base_job_name = spark_app_name,
        role = sagemaker_role,
        instance_count = instance_count,
        instance_type = instance_type,
        framework_version = framework_version,
        max_runtime_in_seconds = max_runtime_in_seconds,
        volume_size_in_gb = volume_size_in_gb,
        tags = tags,
    )
    return spark_processor

In [None]:
for day in range(12, 20):
    day = str(day).zfill(2)
    spark_processor = create_spark_processor(instance_count=1)
    spark_processor.run(
        submit_app="/app/code/research/cloud/sagemaker/01-repack.py",
        arguments=[
            "--src-path", os.path.join("s3://", s3_bucket, s3_prefix, "gz", f"day={day}"),
            "--dst-path", os.path.join("s3://", s3_bucket, s3_prefix, "pq", f"day={day}"),
        ],
        spark_event_logs_s3_uri = utils.configs.get("aws.sagemaker.spark_logs_uri"),
        wait = False,
        logs = False
    )

In [None]:
spark_processor = create_spark_processor(instance_count=8, volume_size_in_gb=300, max_runtime_in_seconds=12*60*60)
spark_processor.run(
    submit_app="/app/code/research/cloud/sagemaker/02-parse.py",
    arguments=[
        "--src-path", os.path.join("s3://", s3_bucket, s3_prefix, "pq"),
        "--dst-path", os.path.join("s3://", s3_bucket, s3_prefix, "parsed"),
        # "--day", "0"
    ],
    spark_event_logs_s3_uri = utils.configs.get("aws.sagemaker.spark_logs_uri"),
    wait = True,
    logs = True
)

In [None]:
spark_processor = create_spark_processor(instance_type="ml.m5.4xlarge", instance_count=1, volume_size_in_gb=20, max_runtime_in_seconds=2*60*60)

for feature_idx in range(14, 40):
    logger.info(f"spawning dict job for feature {feature_idx}...")
    spark_processor.run(
        submit_app="/app/code/research/cloud/sagemaker/03-build-dicts.py",
        arguments=[
            "--src-path", os.path.join("s3://", s3_bucket, s3_prefix, "transformed"),
            "--dst-path", os.path.join("s3://", s3_bucket, s3_prefix, "dicts"),
            "--feature-name", f"f{feature_idx}"
        ],
        spark_event_logs_s3_uri = utils.configs.get("aws.sagemaker.spark_logs_uri"),
        wait = False,
        logs = False
    )
    logger.info(f"spawning dict job for feature {feature_idx}: done")

In [None]:
spark_processor = create_spark_processor(instance_type="ml.m5.4xlarge", instance_count=8, volume_size_in_gb=300, max_runtime_in_seconds=5*60*60)

spark_processor.run(
    submit_app="/app/code/research/cloud/sagemaker/04-join-compact.py",
    arguments=[
        "--src-path-logs", os.path.join("s3://", s3_bucket, s3_prefix, "transformed"),
        "--src-path-dicts", os.path.join("s3://", s3_bucket, s3_prefix, "dicts"),
        "--dst-path", os.path.join("s3://", s3_bucket, s3_prefix, "joined/compact"),
        # "--date", "2023-01-01",
        "--repartition", "10",
        "--freq-threshold-abs", "100000",
        "--freq-threshold-pct", "0.95",
    ],
    spark_event_logs_s3_uri = utils.configs.get("aws.sagemaker.spark_logs_uri"),
    wait = False,
    logs = False
)

### Measure data read speed

In [None]:
import time
import tqdm.auto as tqdm
import pyarrow
import pyarrow.dataset

In [None]:
def measure_speed(
    dataset,
    filter,
    batch_size = 10_000,
    limit = None
):
    if limit is None:
        logger.info("getting dataset size...")
        total_records = dataset.count_rows(filter=filter)
        logger.info(f"getting dataset size: done ({total_records} records)")
    else:
        total_records = limit

    logger.info("reading dataset...")
    time_start = time.time()
    pbar = tqdm.tqdm(desc="reading data", total=total_records)
    src_batches = dataset.to_batches(filter=filter, batch_size=batch_size)
    rows_processed = 0
    for batch_id, batch in enumerate(src_batches, start=1):
        batch = batch.to_pandas()
        pbar.set_postfix({'batches': batch_id}, refresh=False)
        pbar.update(batch.shape[0])
        rows_processed += batch.shape[0]
        if limit is not None and rows_processed >= limit:
            break
    pbar.close()

    time_finish = time.time()
    elapsed_time = (time_finish - time_start)
    read_speed = rows_processed / elapsed_time
    logger.info(f"reading dataset: done ({int(elapsed_time)} seconds, {int(read_speed)} rows/sec)")

In [17]:
src_dataset = pyarrow.dataset.dataset(
    s3_dir,
    partitioning = "hive"
)
src_filter = (pyarrow.dataset.field("date") == "2023-01-01")
src_rows = src_dataset.count_rows(filter=src_filter)
src_rows

195841983

In [18]:
next(iter(src_dataset.to_batches(filter=src_filter))).to_pandas()

In [None]:
measure_speed(src_dataset, src_filter, limit=src_rows)