In [None]:
#Samples Random Full Days from API

In [None]:
import requests
import random
import logging
from datetime import datetime, timedelta
import time
import csv

# 🔧 Config
API_KEY_ID = ""
API_KEY_SECRET = ""
BASE_URL = "https://data.ny.gov/resource/wujg-7c2s.json"
NUM_SAMPLES = 20
START_DATE = datetime(2021, 3, 1)
END_DATE = datetime.today()
#OUTPUT_FILE = "/Users/danielbrown/Desktop/Portfolio_Projects/fare_evasion/data/raw/mta_random_days_20.csv"

# 🪵 Logging setup
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

def get_random_date():
    total_days = (END_DATE - START_DATE).days
    random_day = random.randint(0, total_days)
    return (START_DATE + timedelta(days=random_day)).strftime("%Y-%m-%d")

def fetch_data_for_date(date_str, max_retries=3, retry_delay=2):
    logging.info(f"📅 Fetching all records for {date_str}...")
    all_records = []
    offset = 0
    limit = 1000
    done = False

    next_date = (datetime.strptime(date_str, "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d")
    where_clause = f"transit_timestamp >= '{date_str}T00:00:00' AND transit_timestamp < '{next_date}T00:00:00'"

    for attempt in range(1, max_retries + 1):
        try:
            while not done:
                url = (
                    f"{BASE_URL}?$where={where_clause}"
                    f"&$limit={limit}&$offset={offset}"
                )
                response = requests.get(url, auth=(API_KEY_ID, API_KEY_SECRET))
                if response.status_code == 200:
                    batch = response.json()
                    logging.info(f"📦 Retrieved {len(batch)} records at offset {offset}")
                    all_records.extend(batch)

                    if len(batch) < limit:
                        done = True
                    else:
                        offset += limit
                else:
                    logging.warning(f"⚠️ API error {response.status_code}: {response.text}")
                    break

                time.sleep(0.25)  # politeness delay

            if len(all_records) == 0:
                logging.warning(f"🕒 Attempt {attempt}: Received 0 records for {date_str}. Retrying after {retry_delay} seconds...")
                time.sleep(retry_delay)
                offset = 0
                done = False
                retry_delay *= 2
            else:
                break
        except Exception as e:
            logging.error(f"❌ Exception on attempt {attempt} for {date_str}: {e}")
            time.sleep(retry_delay)
            retry_delay *= 2

    if len(all_records) == 0:
        logging.error(f"🚫 No data retrieved for {date_str} after {max_retries} attempts.")

    logging.info(f"✅ Finished fetching {len(all_records)} records for {date_str}")
    return all_records

def write_to_csv(data_list, output_file):
    if not data_list:
        logging.warning("⚠️ No data to write.")
        return

    fieldnames = sorted(set().union(*(d.keys() for d in data_list)))

    try:
        with open(output_file, mode="w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(data_list)
        logging.info(f"📄 Successfully wrote {len(data_list)} rows to '{output_file}'")
    except Exception as e:
        logging.error(f"❌ Failed to write CSV: {e}")

def sample_and_fetch_random_days(n):
    logging.info("🚀 Starting random full-day sampling from MTA ridership dataset...")

    successful_dates = set()
    tried_dates = set()
    all_data = []

    while len(successful_dates) < n:
        date = get_random_date()
        if date in tried_dates:
            continue

        tried_dates.add(date)
        day_data = fetch_data_for_date(date)
        if day_data:
            all_data.extend(day_data)
            successful_dates.add(date)
        else:
            logging.info(f"🔁 Will sample another day to replace failed date: {date}")

        time.sleep(0.3)

    logging.info(f"🏁 Successfully retrieved data for {len(successful_dates)} unique days: {sorted(successful_dates)}")
    write_to_csv(all_data, OUTPUT_FILE)

if __name__ == "__main__":
    sample_and_fetch_random_days(NUM_SAMPLES)

In [None]:
#Samples Random rows from API

In [None]:
import requests
import random
import logging
import time
import csv

# 🔧 Config
API_KEY_ID = ""
API_KEY_SECRET = ""
BASE_URL = "https://data.ny.gov/resource/wujg-7c2s.json"
COUNT_URL = "https://data.ny.gov/resource/wujg-7c2s.json?$select=count(*)"
NUM_SAMPLES = 20
OUTPUT_FILE = "mta_random_rows.csv"

# 🪵 Logging setup
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

def get_total_rows():
    """Fetch total row count from API."""
    logging.info("📊 Fetching total row count...")
    response = requests.get(COUNT_URL, auth=(API_KEY_ID, API_KEY_SECRET))
    response.raise_for_status()
    count = int(response.json()[0]["count"])
    logging.info(f"✅ Total rows available: {count}")
    return count

def fetch_row_at_offset(offset, max_retries=3, retry_delay=2):
    """Fetch a single row at a given offset."""
    url = f"{BASE_URL}?$limit=1&$offset={offset}"
    for attempt in range(1, max_retries + 1):
        try:
            response = requests.get(url, auth=(API_KEY_ID, API_KEY_SECRET))
            if response.status_code == 200:
                data = response.json()
                if data:
                    return data[0]
            else:
                logging.warning(f"⚠️ API error {response.status_code}: {response.text}")
        except Exception as e:
            logging.error(f"❌ Exception on attempt {attempt}: {e}")
        time.sleep(retry_delay)
    return None

def write_to_csv(data_list, output_file):
    if not data_list:
        logging.warning("⚠️ No data to write.")
        return
    fieldnames = sorted(set().union(*(d.keys() for d in data_list)))
    try:
        with open(output_file, mode="w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(data_list)
        logging.info(f"📄 Successfully wrote {len(data_list)} rows to '{output_file}'")
    except Exception as e:
        logging.error(f"❌ Failed to write CSV: {e}")

def sample_random_rows(n):
    total_rows = get_total_rows()
    logging.info(f"🚀 Sampling {n} random rows from {total_rows} available rows...")

    sampled_data = []
    offsets = random.sample(range(total_rows), n)

    for i, offset in enumerate(offsets, start=1):
        row = fetch_row_at_offset(offset)
        if row:
            sampled_data.append(row)
            logging.info(f"✅ Retrieved random row {i}/{n} (offset={offset})")
        else:
            logging.warning(f"⚠️ Failed to retrieve row at offset {offset}")

        time.sleep(0.2)  # politeness delay

    write_to_csv(sampled_data, OUTPUT_FILE)

if __name__ == "__main__":
    sample_random_rows(NUM_SAMPLES)
