In [24]:
import os
import boto3
import gzip
import shutil
import polars as pl
from botocore.config import Config
from bet_edge.data_io.env_cred_provider import EnvCredProvider

# Initialize a session using your credentials
ecp = EnvCredProvider()
creds = ecp.get_credentials()

session = boto3.Session(
   aws_access_key_id=creds['polygon_access_key_id'],
   aws_secret_access_key=creds['polygon_secret_access_key'],
)

# Create an S3 client
s3 = session.client(
   's3',
   endpoint_url='https://files.polygon.io',
   config=Config(signature_version='s3v4'),
)

# Set bucket name and prefix
bucket_name = 'flatfiles'
prefix = 'us_options_opra/day_aggs_v1'  # Only process 'day_aggs_v1' data

# Define local storage path
base_local_dir = os.path.join('..', '..', 'data_lake')

# Initialize paginator for listing S3 objects
paginator = s3.get_paginator('list_objects_v2')

# Process files
for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
    for obj in page.get('Contents', []):
        object_key = obj['Key']

        # Skip files that are not .csv.gz
        if not object_key.endswith('.csv.gz'):
            continue

        try:
            # Construct full local file path preserving folder structure
            local_file_path = os.path.join(base_local_dir, *object_key.split('/'))
            decompressed_file_path = local_file_path.replace('.gz', '')
            parquet_file_path = decompressed_file_path.replace('.csv', '.parquet')

            # Ensure directory structure exists
            os.makedirs(os.path.dirname(local_file_path), exist_ok=True)

            # Download file from S3
            print(f"Downloading: {object_key}")
            s3.download_file(bucket_name, object_key, local_file_path)

            # Decompress the file
            print(f"Decompressing: {local_file_path}")
            with gzip.open(local_file_path, 'rb') as f_in:
                with open(decompressed_file_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)

            # Convert CSV to Parquet
            print(f"Converting to Parquet: {parquet_file_path}")
            df = pl.read_csv(decompressed_file_path)
            df.write_parquet(parquet_file_path)

            # Clean up - remove original compressed and decompressed CSV
            os.remove(local_file_path)
            os.remove(decompressed_file_path)

            print(f"Stored as Parquet: {parquet_file_path}\n")

        except Exception as e:
            print(f"Error processing {object_key}: {e}")


Downloading: us_options_opra/day_aggs_v1/2014/06/2014-06-02.csv.gz
Error processing us_options_opra/day_aggs_v1/2014/06/2014-06-02.csv.gz: An error occurred (403) when calling the HeadObject operation: Forbidden
Downloading: us_options_opra/day_aggs_v1/2014/06/2014-06-03.csv.gz
Error processing us_options_opra/day_aggs_v1/2014/06/2014-06-03.csv.gz: An error occurred (403) when calling the HeadObject operation: Forbidden
Downloading: us_options_opra/day_aggs_v1/2014/06/2014-06-04.csv.gz
Error processing us_options_opra/day_aggs_v1/2014/06/2014-06-04.csv.gz: An error occurred (403) when calling the HeadObject operation: Forbidden
Downloading: us_options_opra/day_aggs_v1/2014/06/2014-06-05.csv.gz
Error processing us_options_opra/day_aggs_v1/2014/06/2014-06-05.csv.gz: An error occurred (403) when calling the HeadObject operation: Forbidden
Downloading: us_options_opra/day_aggs_v1/2014/06/2014-06-06.csv.gz
Error processing us_options_opra/day_aggs_v1/2014/06/2014-06-06.csv.gz: An error occu

In [25]:
import os

def remove_empty_dirs(root_dir):
    """
    Recursively removes all empty subdirectories from the given root directory.
    """
    for dirpath, dirnames, filenames in os.walk(root_dir, topdown=False):
        for dirname in dirnames:
            dir_full_path = os.path.join(dirpath, dirname)
            if not os.listdir(dir_full_path):  # Check if directory is empty
                os.rmdir(dir_full_path)
                print(f"Deleted empty folder: {dir_full_path}")

# Define your base directory
data_lake_path = os.path.join('..', '..', 'data_lake')

# Remove empty directories
remove_empty_dirs(data_lake_path)


Deleted empty folder: ..\..\data_lake\us_options_opra\day_aggs_v1\2014\06
Deleted empty folder: ..\..\data_lake\us_options_opra\day_aggs_v1\2014\07
Deleted empty folder: ..\..\data_lake\us_options_opra\day_aggs_v1\2014\08
Deleted empty folder: ..\..\data_lake\us_options_opra\day_aggs_v1\2014\09
Deleted empty folder: ..\..\data_lake\us_options_opra\day_aggs_v1\2014\10
Deleted empty folder: ..\..\data_lake\us_options_opra\day_aggs_v1\2014\11
Deleted empty folder: ..\..\data_lake\us_options_opra\day_aggs_v1\2014\12
Deleted empty folder: ..\..\data_lake\us_options_opra\day_aggs_v1\2015\01
Deleted empty folder: ..\..\data_lake\us_options_opra\day_aggs_v1\2015\02
Deleted empty folder: ..\..\data_lake\us_options_opra\day_aggs_v1\2015\03
Deleted empty folder: ..\..\data_lake\us_options_opra\day_aggs_v1\2015\04
Deleted empty folder: ..\..\data_lake\us_options_opra\day_aggs_v1\2015\05
Deleted empty folder: ..\..\data_lake\us_options_opra\day_aggs_v1\2015\06
Deleted empty folder: ..\..\data_lake\

In [26]:
import os
import polars as pl

# Define base directory
data_lake_path = os.path.join('..', '..', 'data_lake')

# Example: Load a specific Parquet file (update this to match your dataset)
file_path = os.path.join(data_lake_path, "us_options_opra", "day_aggs_v1", "2023", "04", "2023-04-10.parquet")

# Check if file exists
if os.path.exists(file_path):
    # Load the file with Polars
    df = pl.read_parquet(file_path)
    
    # Show basic info
    print(df.head())  # Preview first few rows
    print(df.schema)  # Show column names and types
else:
    print(f"File not found: {file_path}")


shape: (5, 8)
┌────────────────────┬────────┬──────┬───────┬──────┬──────┬─────────────────────┬──────────────┐
│ ticker             ┆ volume ┆ open ┆ close ┆ high ┆ low  ┆ window_start        ┆ transactions │
│ ---                ┆ ---    ┆ ---  ┆ ---   ┆ ---  ┆ ---  ┆ ---                 ┆ ---          │
│ str                ┆ i64    ┆ f64  ┆ f64   ┆ f64  ┆ f64  ┆ i64                 ┆ i64          │
╞════════════════════╪════════╪══════╪═══════╪══════╪══════╪═════════════════════╪══════════════╡
│ O:A230421C00140000 ┆ 3      ┆ 1.15 ┆ 1.41  ┆ 1.41 ┆ 1.15 ┆ 1681099200000000000 ┆ 3            │
│ O:A230421C00145000 ┆ 1      ┆ 0.25 ┆ 0.25  ┆ 0.25 ┆ 0.25 ┆ 1681099200000000000 ┆ 1            │
│ O:A230421C00155000 ┆ 347    ┆ 0.05 ┆ 0.05  ┆ 0.05 ┆ 0.04 ┆ 1681099200000000000 ┆ 37           │
│ O:A230421P00125000 ┆ 2      ┆ 0.3  ┆ 0.28  ┆ 0.3  ┆ 0.28 ┆ 1681099200000000000 ┆ 2            │
│ O:A230421P00130000 ┆ 10     ┆ 0.75 ┆ 0.51  ┆ 0.8  ┆ 0.51 ┆ 1681099200000000000 ┆ 6            │
└─────

In [27]:
df

ticker,volume,open,close,high,low,window_start,transactions
str,i64,f64,f64,f64,f64,i64,i64
"""O:A230421C00140000""",3,1.15,1.41,1.41,1.15,1681099200000000000,3
"""O:A230421C00145000""",1,0.25,0.25,0.25,0.25,1681099200000000000,1
"""O:A230421C00155000""",347,0.05,0.05,0.05,0.04,1681099200000000000,37
"""O:A230421P00125000""",2,0.3,0.28,0.3,0.28,1681099200000000000,2
"""O:A230421P00130000""",10,0.75,0.51,0.8,0.51,1681099200000000000,6
…,…,…,…,…,…,…,…
"""O:ZYME231020P00010000""",5,2.0,2.0,2.0,2.0,1681099200000000000,1
"""O:ZYXI230421P00010000""",5,0.1,0.1,0.1,0.1,1681099200000000000,1
"""O:ZYXI230519C00012500""",3,0.95,0.6,0.95,0.6,1681099200000000000,2
"""O:ZYXI230519C00015000""",1,0.2,0.2,0.2,0.2,1681099200000000000,1
