In [43]:
# main.py
import os
import io
import json
import gzip
from datetime import datetime
from pathlib import Path
import logging
from logging.handlers import TimedRotatingFileHandler
import requests
import click

import boto3
from botocore.exceptions import ClientError
import psycopg2
import polars as pl
from polars.exceptions import ComputeError

try:
    BASE_DIR = Path(__file__).parent.parent.parent.resolve()
except NameError:
    # Fallback for interactive sessions (no __file__)
    BASE_DIR = Path(os.getcwd()).parent.parent.resolve()
LOG_DIR = BASE_DIR / "logs"


def setup_logging(
    log_dir: str = LOG_DIR,
    logger_name: str = "dash_extract",
    when: str = "midnight",
    interval: int = 1,
    backup_count: int = 7,
    level: int = logging.INFO,
):
    """
    Setup logging with both console and time-based rotating file handlers.

    Args:
        log_dir (str): directory where logs are stored
        log_file (str): base log file name
        when (str): rotation interval type (e.g., 'S','M','H','D','midnight','W0'-'W6')
        interval (int): number of intervals between rotations
        backup_count (int): how many old logs to keep
        level (int): logging level
    """
    Path(log_dir).mkdir(parents=True, exist_ok=True)
    log_file = f"{logger_name}.log"
    log_path = os.path.join(log_dir, log_file)

    formatter = logging.Formatter(
        "%(asctime)s [%(levelname)s] %(name)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )

    # Rotating file handler
    file_handler = TimedRotatingFileHandler(
        log_path,
        when=when,
        interval=interval,
        backupCount=backup_count,
        encoding="utf-8",
    )
    file_handler.setFormatter(formatter)

    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)

    # Root logger config
    logging.basicConfig(level=level, handlers=[file_handler, console_handler])

    return logging.getLogger(logger_name)


def get_credentials():
    with open(BASE_DIR / "env.json", "r") as file:
        creds = json.load(file)
    return creds


credentials = get_credentials()

DAYSMART_CLIENT_ID = credentials["DAYSMART_CLIENT_ID"]
DAYSMART_CLIENT_SECRET = credentials["DAYSMART_CLIENT_SECRET"]
DAYSMART_API_GRANT_TYPE = "client_credentials"
POSTGRES_DB = credentials["POSTGRES_DB"]
POSTGRES_USER = credentials["POSTGRES_USER"]
POSTGRES_PASSWORD = credentials["POSTGRES_PASSWORD"]
S3_BUCKET = credentials["S3_BUCKET"]
BASE_URL = "https://api.dashplatform.com"
LOG = setup_logging()


def get_bearer_token():
    url = BASE_URL + "/v1/auth/token"
    headers = {"Content-Type": "application/vnd.api+json"}
    payload = {
        "grant_type": DAYSMART_API_GRANT_TYPE,
        "client_id": DAYSMART_CLIENT_ID,
        "client_secret": DAYSMART_CLIENT_SECRET,
    }
    response = requests.post(url, json=payload, headers=headers)

    if response.status_code == 200:
        return response.json().get("access_token")
    else:
        raise Exception(response.json())


# -------------------------
# Fetcher
# -------------------------
def get_index_data(end_point, bearer_token):
    url = BASE_URL + "/v1/" + end_point
    headers = {
        "Content-Type": "application/vnd.api+json",
        "Authorization": f"Bearer {bearer_token}",
    }
    params = {"company": "allstars", "sort": "id"}
    all_data = []
    next_url = url
    while next_url:
        response = requests.get(next_url, headers=headers, params=params)

        if response.status_code == 200:
            data = response.json()

            # Append the current page's data (assuming it's under a 'data' key)
            if "data" in data:
                all_data.extend(data["data"])

            # Update the next_url for the next iteration
            next_url = data.get("links", {}).get("next")
        else:
            raise Exception(response.json())

    return all_data


# -------------------------
# JSON → Polars
# -------------------------
def records_to_polars(records: list[dict]) -> pl.DataFrame:
    """
    Lifts JSON:API objects (id/type + attributes.*).
    Builds a Polars DF with full-length schema inference.
    """
    if not records:
        return pl.DataFrame([])

    def lift_one(r: dict):
        if isinstance(r, dict) and "attributes" in r:
            base = {
                k: v for k, v in r.items() if k not in ("attributes", "relationships")
            }
            base.update(r.get("attributes", {}))
            return base
        return r

    lifted = [lift_one(r) for r in records]

    # ALWAYS convert nested structures to JSON strings to preserve key-value pairs
    def process_nested_values(row):
        processed_row = {}
        for k, v in row.items():
            if v is None:
                processed_row[k] = None
            elif isinstance(v, (dict, list)):
                # Convert nested structures to JSON strings to preserve structure
                processed_row[k] = json.dumps(v, separators=(",", ":"), default=str)
            else:
                processed_row[k] = v
        return processed_row

    # Process all rows to convert nested structures to JSON strings
    processed_lifted = [process_nested_values(row) for row in lifted]

    # Create DataFrame with processed data
    df = pl.from_dicts(processed_lifted, infer_schema_length=None)

    # Put id first if present
    if "id" in df.columns:
        df = df.select(["id", *[c for c in df.columns if c != "id"]])

    now = datetime.now()  # timezone-aware UTC timestamp

    df = df.with_columns(
        [
            pl.lit(now).alias("inserted_dt"),
            pl.lit(now).alias("updated_dt"),
        ]
    )

    return df


# -------------------------
# S3 Writers
# -------------------------
def write_jsonl_to_s3(records: list[dict], bucket: str, key: str, s3_client=None):
    """
    Writes list-of-dicts as JSONL.gz to s3://bucket/key.
    """
    if not records:
        return
    if s3_client is None:
        s3_client = boto3.Session(profile_name="etl_user").client("s3")

    buf = io.BytesIO()
    with gzip.GzipFile(fileobj=buf, mode="wb") as gz:
        for r in records:
            line = (json.dumps(r, separators=(",", ":"), default=str) + "\n").encode(
                "utf-8"
            )
            gz.write(line)
    body = buf.getvalue()

    try:
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=body,
            ContentType="application/json",
            ContentEncoding="gzip",
        )
    except ClientError as e:
        raise RuntimeError(f"S3 JSONL upload failed: {e}")


def write_csv_to_s3_polars(
    df: pl.DataFrame,
    bucket: str,
    key: str,
    compress: bool = False,
    include_header: bool = True,
    s3_client=None,
    **to_csv_kwargs,
):
    """
    Write polars DataFrame to s3://bucket/key as CSV (optionally .gz).
    to_csv_kwargs → passed to Polars write_csv (e.g., separator=',', quote='\"').
    """
    if df.height == 0:
        return
    if s3_client is None:
        s3_client = boto3.Session(profile_name="etl_user").client("s3")

    # Polars write_csv expects a text buffer; wrap a BytesIO.
    raw_buf = io.BytesIO()
    text_buf = io.TextIOWrapper(raw_buf, encoding="utf-8", newline="")
    df.write_csv(text_buf, include_header=include_header, **to_csv_kwargs)
    text_buf.flush()
    raw_bytes = raw_buf.getvalue()

    extra = {"ContentType": "text/csv; charset=utf-8"}
    if compress:
        gz = io.BytesIO()
        with gzip.GzipFile(fileobj=gz, mode="wb") as z:
            z.write(raw_bytes)
        body = gz.getvalue()
        extra["ContentEncoding"] = "gzip"
    else:
        body = raw_bytes

    try:
        s3_client.put_object(Bucket=bucket, Key=key, Body=body, **extra)
    except ClientError as e:
        raise RuntimeError(f"S3 CSV upload failed: {e}")


def write_parquet_to_s3_polars(
    df: pl.DataFrame,
    bucket: str,
    key: str,
    compression: str = "snappy",
    s3_client=None,
    **to_parquet_kwargs,
):
    """
    Write polars DataFrame to s3://bucket/key as Parquet.
    to_parquet_kwargs → passed to Polars write_parquet (e.g., compression='snappy').
    """
    if df.height == 0:
        return
    if s3_client is None:
        s3_client = boto3.Session(profile_name="etl_user").client("s3")

    # Write DataFrame to bytes buffer
    buf = io.BytesIO()
    df.write_parquet(buf, compression=compression, **to_parquet_kwargs)
    body = buf.getvalue()

    try:
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=body,
            ContentType="application/octet-stream",
        )
    except ClientError as e:
        raise RuntimeError(f"S3 Parquet upload failed: {e}")


def write_parquet_to_s3_polars_native(
    df: pl.DataFrame,
    bucket: str,
    key: str,
    compression: str = "snappy",
    **to_parquet_kwargs,
):
    """
    Write polars DataFrame directly to s3://bucket/key as Parquet using native S3 support.
    This is more memory efficient as it doesn't buffer the entire file in memory.
    """
    if df.height == 0:
        return
    
    # Construct S3 path
    s3_path = f"s3://{bucket}/{key}"
    
    try:
        # Write directly to S3
        df.write_parquet(
            s3_path, 
            compression=compression, 
            **to_parquet_kwargs
        )
    except Exception as e:
        raise RuntimeError(f"S3 Parquet upload failed: {e}")

# -------------------------
# Postgres Loader (TRUNCATE + INSERT)
# -------------------------
def load_polars_df_to_postgres(
    df: pl.DataFrame,
    table: str,
    conn_str: str,
    schema: str = "public",
):
    """
    Replaces target table: TRUNCATE then bulk INSERT via Polars write_to_database.
    Creates table if absent (all columns TEXT for simplicity).
    """
    if df.height == 0:
        LOG.info("No rows; skipping Postgres replace.")
        return

    # Convert psycopg2 connection string to URL format
    conn_parts = {}
    for part in conn_str.split():
        if '=' in part:
            key, value = part.split('=', 1)
            conn_parts[key] = value
    
    # Create PostgreSQL URL
    pg_url = f"postgresql://{conn_parts['user']}:{conn_parts['password']}@{conn_parts['host']}:{conn_parts['port']}/{conn_parts['dbname']}"
    
    log.info(f"pg_url={pg_url}")

    conn = psycopg2.connect(conn_str)
    conn.autocommit = True
    try:
        with conn.cursor() as cur:
            # Create schema
            cur.execute(f"CREATE SCHEMA IF NOT EXISTS {schema};")
            # DROP target
            cur.execute(f'DROP TABLE IF EXISTS "{schema}"."{table}";')
            
            # Create table with proper schema inference
            # Let Polars handle the table creation with proper data types
            df.write_database(
                table_name=f"{schema}.{table}",
                connection=pg_url,
                if_table_exists="replace"
            )

        conn.commit()
    except Exception as e:
        conn.rollback()
        raise
    finally:
        conn.close()



In [6]:
env = "dev"
"""Extract data from Dash API and load to S3 and Postgres."""
# Setup logging
log = setup_logging()

# Get bearer token
bearer_token = get_bearer_token()

# Configure environment-specific settings
s3_bucket = f"{S3_BUCKET}{env}"
database_name = f"{POSTGRES_DB}{env}"
pg_conn = f"dbname={database_name} user={POSTGRES_USER} password={POSTGRES_PASSWORD} host=localhost port=5432"
keep_raw_jsonl = False
csv_compress = True

log.info(f"Starting extraction for environment: {env}")
log.info(f"Using S3 bucket: {s3_bucket}")


entity = "events"
log.info(f"Processing entity: {entity}")
s3_target = f"""all_{entity.replace("-","_")}"""
db_schema = "sch_raw"
db_target = entity.replace("-","_")
records = get_index_data(entity, bearer_token)
log.info(f"Fetched {len(records)} records from /v1/{entity}")


2025-08-17 23:27:04 [INFO] dash_extract - Starting extraction for environment: dev
2025-08-17 23:27:04 [INFO] dash_extract - Using S3 bucket: allstars-dl-us-west-2-dev
2025-08-17 23:27:04 [INFO] dash_extract - Processing entity: events
2025-08-17 23:27:13 [INFO] dash_extract - Fetched 2901 records from /v1/events


In [7]:
records = [{'type': 'events',
  'id': '2',
  'attributes': {'repeat_id': 2,
   'resource_id': 3,
   'resource_area_id': 0,
   'desc': 'Closed for renevation',
   'event_type_id': 'b',
   'sub_type': 'regular',
   'start': '2025-02-28T07:00:00',
   'start_gmt': '2025-02-28 15:00:00',
   'end': '2025-02-28T12:00:00',
   'end_gmt': '2025-02-28 20:00:00',
   'customer_id': 0,
   'hteam_id': None,
   'vteam_id': None,
   'league_id': None,
   'home_score': None,
   'visiting_score': None,
   'publish': False,
   'outcome': '',
   'register_capacity': 0,
   'create_u': 'root',
   'created_user_type': 'SIT_Employee',
   'create_d': '2025-02-28T11:43:30',
   'mod_u': 'root',
   'last_modified_user_type': 'SIT_Employee',
   'mod_d': '2025-02-28T11:45:13',
   'is_overtime': False,
   'booking_id': None,
   'description': None,
   'notice': None,
   'last_resource_id': None,
   'parent_event_id': None,
   'has_gender_locker_rooms': 0,
   'locker_room_type': None,
   'includes_setup_time': False,
   'includes_takedown_time': False,
   'start_date': '2025-02-28T00:00:00',
   'event_start_time': '07:00:00',
   'best_description': None},
  'relationships': {'customer': [],
   'registrants': [],
   'registrations': [],
   'eventType': [],
   'homeTeam': [],
   'visitingTeam': [],
   'summary': [],
   'league': [],
   'booking': [],
   'parentEvent': [],
   'lockers': [],
   'lastResource': [],
   'resource': [],
   'resourceArea': [],
   'tasks': [],
   'teamGroups': [],
   'eventSeries': [],
   'statEvents': [],
   'fees': [],
   'invoices': [],
   'seriesInvoices': [],
   'invoiceItems': [],
   'employees': [],
   'eventEmployees': [],
   'additionalResources': [],
   'setupEvents': [],
   'takedownEvents': [],
   'comments': [],
   'rsvpStates': []}},
 {'type': 'events',
  'id': '307',
  'attributes': {'repeat_id': 306,
   'resource_id': 4,
   'resource_area_id': 0,
   'desc': '',
   'event_type_id': 'k',
   'sub_type': 'regular',
   'start': '2025-05-27T13:00:00',
   'start_gmt': '2025-05-27 18:00:00',
   'end': '2025-05-27T16:00:00',
   'end_gmt': '2025-05-27 21:00:00',
   'customer_id': 0,
   'hteam_id': 7,
   'vteam_id': None,
   'league_id': 3,
   'home_score': None,
   'visiting_score': None,
   'publish': True,
   'outcome': '',
   'register_capacity': 0,
   'create_u': 'csarat',
   'created_user_type': 'SIT_Employee',
   'create_d': '2025-04-22T16:55:15',
   'mod_u': 'neelak',
   'last_modified_user_type': 'SIT_Employee',
   'mod_d': '2025-05-26T22:37:33',
   'is_overtime': False,
   'booking_id': None,
   'description': None,
   'notice': None,
   'last_resource_id': None,
   'parent_event_id': None,
   'has_gender_locker_rooms': 0,
   'locker_room_type': None,
   'includes_setup_time': False,
   'includes_takedown_time': False,
   'start_date': '2025-05-27T00:00:00',
   'event_start_time': '13:00:00',
   'best_description': {'league_id': 3,
    'has_morning_events': True,
    'has_afternoon_events': True,
    'has_evening_events': False,
    'team_count': 4}},
  'relationships': {'customer': [],
   'registrants': [],
   'registrations': [],
   'eventType': [],
   'homeTeam': [],
   'visitingTeam': [],
   'summary': [],
   'league': [],
   'booking': [],
   'parentEvent': [],
   'lockers': [],
   'lastResource': [],
   'resource': [],
   'resourceArea': [],
   'tasks': [],
   'teamGroups': [],
   'eventSeries': [],
   'statEvents': [],
   'fees': [],
   'invoices': [],
   'seriesInvoices': [],
   'invoiceItems': [],
   'employees': [],
   'eventEmployees': [],
   'additionalResources': [],
   'setupEvents': [],
   'takedownEvents': [],
   'comments': [],
   'rsvpStates': []}},
 {'type': 'events',
  'id': '308',
  'attributes': {'repeat_id': 306,
   'resource_id': 5,
   'resource_area_id': 0,
   'desc': '',
   'event_type_id': 'k',
   'sub_type': 'regular',
   'start': '2025-05-28T13:00:00',
   'start_gmt': '2025-05-28 18:00:00',
   'end': '2025-05-28T16:00:00',
   'end_gmt': '2025-05-28 21:00:00',
   'customer_id': 0,
   'hteam_id': 7,
   'vteam_id': None,
   'league_id': 3,
   'home_score': None,
   'visiting_score': None,
   'publish': True,
   'outcome': '',
   'register_capacity': 0,
   'create_u': 'csarat',
   'created_user_type': 'SIT_Employee',
   'create_d': '2025-04-22T16:55:15',
   'mod_u': 'csarat',
   'last_modified_user_type': 'SIT_Employee',
   'mod_d': '2025-04-26T15:31:12',
   'is_overtime': False,
   'booking_id': None,
   'description': None,
   'notice': None,
   'last_resource_id': None,
   'parent_event_id': None,
   'has_gender_locker_rooms': 0,
   'locker_room_type': None,
   'includes_setup_time': False,
   'includes_takedown_time': False,
   'start_date': '2025-05-28T00:00:00',
   'event_start_time': '13:00:00',
   'best_description': {'league_id': 3,
    'has_morning_events': True,
    'has_afternoon_events': True,
    'has_evening_events': False,
    'team_count': 4}},
  'relationships': {'customer': [],
   'registrants': [],
   'registrations': [],
   'eventType': [],
   'homeTeam': [],
   'visitingTeam': [],
   'summary': [],
   'league': [],
   'booking': [],
   'parentEvent': [],
   'lockers': [],
   'lastResource': [],
   'resource': [],
   'resourceArea': [],
   'tasks': [],
   'teamGroups': [],
   'eventSeries': [],
   'statEvents': [],
   'fees': [],
   'invoices': [],
   'seriesInvoices': [],
   'invoiceItems': [],
   'employees': [],
   'eventEmployees': [],
   'additionalResources': [],
   'setupEvents': [],
   'takedownEvents': [],
   'comments': [],
   'rsvpStates': []}}
]

In [26]:

# 2) Optional: write raw JSONL.gz to S3
run_date = datetime.now().strftime("%Y-%m-%d")
s3_client = boto3.Session(profile_name="etl_user").client("s3")

# if keep_raw_jsonl and records:
#     raw_key = f"raw/{s3_target}/ingest_date={run_date}/{s3_target}.jsonl.gz"
#     log.info(f"Writing raw JSONL to s3://{s3_bucket}/{raw_key}")
#     write_jsonl_to_s3(records, s3_bucket, raw_key, s3_client)

# 3) Normalize to Polars
df = records_to_polars(records)
log.info(f"Normalized to Polars: {df.height} rows, {len(df.columns)} columns")


2025-08-17 23:45:28 [INFO] botocore.credentials - Found credentials in shared credentials file: ~/.aws/credentials
2025-08-17 23:45:28 [INFO] dash_extract - Normalized to Polars: 3 rows, 42 columns


In [27]:
df

id,type,repeat_id,resource_id,resource_area_id,desc,event_type_id,sub_type,start,start_gmt,end,end_gmt,customer_id,hteam_id,vteam_id,league_id,home_score,visiting_score,publish,outcome,register_capacity,create_u,created_user_type,create_d,mod_u,last_modified_user_type,mod_d,is_overtime,booking_id,description,notice,last_resource_id,parent_event_id,has_gender_locker_rooms,locker_room_type,includes_setup_time,includes_takedown_time,start_date,event_start_time,best_description,inserted_dt,updated_dt
str,str,i64,i64,i64,str,str,str,str,str,str,str,i64,i64,null,i64,null,null,bool,str,i64,str,str,str,str,str,str,bool,null,null,null,null,null,i64,null,bool,bool,str,str,str,datetime[μs],datetime[μs]
"""2""","""events""",2,3,0,"""Closed for renevation""","""b""","""regular""","""2025-02-28T07:00:00""","""2025-02-28 15:00:00""","""2025-02-28T12:00:00""","""2025-02-28 20:00:00""",0,,,,,,False,"""""",0,"""root""","""SIT_Employee""","""2025-02-28T11:43:30""","""root""","""SIT_Employee""","""2025-02-28T11:45:13""",False,,,,,,0,,False,False,"""2025-02-28T00:00:00""","""07:00:00""",,2025-08-17 23:45:28.568608,2025-08-17 23:45:28.568608
"""307""","""events""",306,4,0,"""""","""k""","""regular""","""2025-05-27T13:00:00""","""2025-05-27 18:00:00""","""2025-05-27T16:00:00""","""2025-05-27 21:00:00""",0,7.0,,3.0,,,True,"""""",0,"""csarat""","""SIT_Employee""","""2025-04-22T16:55:15""","""neelak""","""SIT_Employee""","""2025-05-26T22:37:33""",False,,,,,,0,,False,False,"""2025-05-27T00:00:00""","""13:00:00""","""{""league_id"":3,""has_morning_ev…",2025-08-17 23:45:28.568608,2025-08-17 23:45:28.568608
"""308""","""events""",306,5,0,"""""","""k""","""regular""","""2025-05-28T13:00:00""","""2025-05-28 18:00:00""","""2025-05-28T16:00:00""","""2025-05-28 21:00:00""",0,7.0,,3.0,,,True,"""""",0,"""csarat""","""SIT_Employee""","""2025-04-22T16:55:15""","""csarat""","""SIT_Employee""","""2025-04-26T15:31:12""",False,,,,,,0,,False,False,"""2025-05-28T00:00:00""","""13:00:00""","""{""league_id"":3,""has_morning_ev…",2025-08-17 23:45:28.568608,2025-08-17 23:45:28.568608


In [28]:

# 4) Write Parquet to S3 using Polars
parquet_key = f"catalog/{s3_target}/ingest_date={run_date}/{s3_target}.parquet"

log.info(f"Writing Parquet to s3://{s3_bucket}/{parquet_key}")
write_parquet_to_s3_polars(
    df, s3_bucket, parquet_key, compression="snappy", s3_client=s3_client
)

# 5) Replace table in Postgres
load_polars_df_to_postgres(
    df, schema=db_schema, table=db_target, conn_str=pg_conn
)
log.info(f"Replaced Postgres table {db_target}")




2025-08-17 23:45:51 [INFO] dash_extract - Writing Parquet to s3://allstars-dl-us-west-2-dev/catalog/all_events/ingest_date=2025-08-17/all_events.parquet
2025-08-17 23:45:51 [INFO] dash_extract - pg_url=postgresql://appuser:changeme@localhost:5432/analytics_dev
2025-08-17 23:45:51 [INFO] dash_extract - Replaced Postgres table events


In [44]:
load_polars_df_to_postgres(
    df, schema=db_schema, table=db_target, conn_str=pg_conn
)
log.info(f"Replaced Postgres table {db_target}")

2025-08-17 23:55:50 [INFO] dash_extract - pg_url=postgresql://appuser:changeme@localhost:5432/analytics_dev
2025-08-17 23:55:50 [INFO] dash_extract - Replaced Postgres table events


In [None]:
process(env)