From 37c2cee4a72936997259b5088f929ed3d2277959 Mon Sep 17 00:00:00 2001 From: Allison Wang Date: Tue, 4 Nov 2025 12:33:39 -0800 Subject: [PATCH 1/2] docs: re-add MkDocs documentation --- CLAUDE.md | 248 ++++++++++++++++++++++++++++ contributing/DEVELOPMENT.md | 25 ++- contributing/RELEASE.md | 15 ++ docs/datasources/arrow.md | 6 + docs/datasources/fake.md | 6 + docs/datasources/github.md | 3 + docs/datasources/googlesheets.md | 3 + docs/datasources/huggingface.md | 5 + docs/datasources/jsonplaceholder.md | 3 + docs/datasources/kaggle.md | 5 + docs/datasources/lance.md | 6 + docs/datasources/opensky.md | 5 + docs/datasources/robinhood.md | 6 + docs/datasources/salesforce.md | 6 + docs/datasources/simplejson.md | 3 + docs/datasources/stock.md | 3 + docs/datasources/weather.md | 5 + docs/index.md | 45 +++++ mkdocs.yml | 53 ++++++ 19 files changed, 447 insertions(+), 4 deletions(-) create mode 100644 CLAUDE.md create mode 100644 docs/datasources/arrow.md create mode 100644 docs/datasources/fake.md create mode 100644 docs/datasources/github.md create mode 100644 docs/datasources/googlesheets.md create mode 100644 docs/datasources/huggingface.md create mode 100644 docs/datasources/jsonplaceholder.md create mode 100644 docs/datasources/kaggle.md create mode 100644 docs/datasources/lance.md create mode 100644 docs/datasources/opensky.md create mode 100644 docs/datasources/robinhood.md create mode 100644 docs/datasources/salesforce.md create mode 100644 docs/datasources/simplejson.md create mode 100644 docs/datasources/stock.md create mode 100644 docs/datasources/weather.md create mode 100644 docs/index.md create mode 100644 mkdocs.yml diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..b6bad61 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,248 @@ +# PySpark Data Sources - Project Context for Claude + +## Project Overview +This is a demonstration library showcasing custom Spark data sources built using Apache Spark 4.0's new Python Data Source API. The project provides various data source connectors for reading from external APIs and services. + +**Important**: This is a demo/educational project and not intended for production use. + +## Tech Stack +- **Language**: Python (3.9-3.12) +- **Framework**: Apache Spark 4.0+ (PySpark) +- **Package Management**: Poetry +- **Documentation**: MkDocs with Material theme +- **Testing**: pytest +- **Dependencies**: PyArrow, requests, faker, and optional extras + +## Project Structure +``` +pyspark_datasources/ +├── __init__.py # Main package exports +├── fake.py # Fake data generator using Faker +├── github.py # GitHub repository data connector +├── googlesheets.py # Public Google Sheets reader +├── huggingface.py # Hugging Face datasets connector +├── kaggle.py # Kaggle datasets connector +├── lance.py # Lance vector database connector +├── opensky.py # OpenSky flight data connector +├── simplejson.py # JSON writer for Databricks DBFS +├── stock.py # Alpha Vantage stock data reader +└── weather.py # Weather data connector +``` + +## Available Data Sources +| Short Name | File | Description | Dependencies | +|---------------|------|-------------|--------------| +| `fake` | fake.py | Generate fake data using Faker | faker | +| `github` | github.py | Read GitHub repository PRs | None | +| `googlesheets`| googlesheets.py | Read public Google Sheets | None | +| `huggingface` | huggingface.py | Access Hugging Face datasets | datasets | +| `kaggle` | kaggle.py | Read Kaggle datasets | kagglehub, pandas | +| `opensky` | opensky.py | Flight data from OpenSky Network | None | +| `simplejson` | simplejson.py | Write JSON to Databricks DBFS | databricks-sdk | +| `stock` | stock.py | Stock data from Alpha Vantage | None | + +## Development Commands + +### Environment Setup +```bash +poetry install # Install dependencies +poetry install --extras all # Install all optional dependencies +poetry shell # Activate virtual environment +``` + +### Testing +```bash +# Note: On macOS, set this environment variable to avoid fork safety issues +export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES + +pytest # Run all tests +pytest tests/test_data_sources.py # Run specific test file +pytest tests/test_data_sources.py::test_arrow_datasource_single_file -v # Run a specific test +``` + +### Documentation +```bash +mkdocs serve # Start local docs server +mkdocs build # Build static documentation +``` + +### Code Formatting +This project uses [Ruff](https://github.com/astral-sh/ruff) for code formatting and linting. + +```bash +poetry run ruff format . # Format code +poetry run ruff check . # Run linter +poetry run ruff check . --fix # Run linter with auto-fix +``` + +### Package Management +Please refer to RELEASE.md for more details. +```bash +poetry build # Build package +poetry publish # Publish to PyPI (requires auth) +poetry add # Add new dependency +poetry update # Update dependencies +``` + +## Usage Patterns +All data sources follow the Spark Data Source API pattern: + +```python +from pyspark_datasources import FakeDataSource + +# Register the data source +spark.dataSource.register(FakeDataSource) + +# Batch reading +df = spark.read.format("fake").option("numRows", 100).load() + +# Streaming (where supported) +stream = spark.readStream.format("fake").load() +``` + +## Testing Strategy +- Tests use pytest with PySpark session fixtures +- Each data source has basic functionality tests +- Tests verify data reading and schema validation +- Some tests may require external API access + +## Key Implementation Details +- All data sources inherit from Spark's DataSource base class +- Implements reader() method for batch reading +- Some implement streamReader() for streaming +- Schema is defined using PySpark StructType +- Options are passed via Spark's option() method + +## External Dependencies +- **GitHub API**: Uses public API, no auth required +- **Alpha Vantage**: Stock data API (may require API key) +- **Google Sheets**: Public sheets only, no auth +- **Kaggle**: Requires Kaggle API credentials +- **Databricks**: SDK for DBFS access +- **OpenSky**: Public flight data API + +## Common Issues +- Ensure PySpark >= 4.0.0 is installed +- Some data sources require API keys/credentials +- Network connectivity required for external APIs +- Rate limiting may affect some external services + +## Python Data Source API Specification + +### Core Abstract Base Classes + +#### DataSource +Primary abstract base class for custom data sources supporting read/write operations. + +**Key Methods:** +- `__init__(self, options: Dict[str, str])` - Initialize with user options (Optional; base class provides default) +- `name() -> str` - Return format name (Optional to override; defaults to class name) +- `schema() -> StructType` - Define data source schema (Required) +- `reader(schema: StructType) -> DataSourceReader` - Create batch reader (Required if batch read is supported) +- `writer(schema: StructType, overwrite: bool) -> DataSourceWriter` - Create batch writer (Required if batch write is supported) +- `streamReader(schema: StructType) -> DataSourceStreamReader` - Create streaming reader (Required if streaming read is supported and `simpleStreamReader` is not implemented) +- `streamWriter(schema: StructType, overwrite: bool) -> DataSourceStreamWriter` - Create streaming writer (Required if streaming write is supported) +- `simpleStreamReader(schema: StructType) -> SimpleDataSourceStreamReader` - Create simple streaming reader (Required if streaming read is supported and `streamReader` is not implemented) + +#### DataSourceReader +Abstract base class for reading data from sources. + +**Key Methods:** +- `read(partition) -> Iterator` - Read data from partition, returns tuples/Rows/pyarrow.RecordBatch (Required) +- `partitions() -> List[InputPartition]` - Return input partitions for parallel reading (Optional; defaults to a single partition) + +#### DataSourceStreamReader +Abstract base class for streaming data sources with offset management. + +**Key Methods:** +- `initialOffset() -> dict` - Return starting offset (Required) +- `latestOffset() -> dict` - Return latest available offset (Required) +- `partitions(start: dict, end: dict) -> List[InputPartition]` - Get partitions for offset range (Required) +- `read(partition) -> Iterator` - Read data from partition (Required) +- `commit(end: dict) -> None` - Mark offsets as processed (Optional) +- `stop() -> None` - Clean up resources (Optional) + +#### SimpleDataSourceStreamReader +Simplified streaming reader interface without partition planning. + +**Key Methods:** +- `initialOffset() -> dict` - Return starting offset (Required) +- `read(start: dict) -> Tuple[Iterator, dict]` - Read from start offset; return an iterator and the next start offset (Required) +- `readBetweenOffsets(start: dict, end: dict) -> Iterator` - Deterministic replay between offsets for recovery (Optional; recommended for reliable recovery) +- `commit(end: dict) -> None` - Mark offsets as processed (Optional) + +#### DataSourceWriter +Abstract base class for writing data to external sources. + +**Key Methods:** +- `write(iterator) -> WriteResult` - Write data from iterator (Required) +- `abort(messages: List[WriterCommitMessage])` - Handle write failures (Optional) +- `commit(messages: List[WriterCommitMessage]) -> WriteResult` - Commit successful writes (Required) + +#### DataSourceStreamWriter +Abstract base class for writing data to external sinks in streaming queries. + +**Key Methods:** +- `write(iterator) -> WriterCommitMessage` - Write data for a partition and return a commit message (Required) +- `commit(messages: List[WriterCommitMessage], batchId: int) -> None` - Commit successful microbatch writes (Required) +- `abort(messages: List[WriterCommitMessage], batchId: int) -> None` - Handle write failures for a microbatch (Optional) + +#### DataSourceArrowWriter +Optimized writer using PyArrow RecordBatch for improved performance. + +### Implementation Requirements + +1. **Serialization**: All classes must be pickle serializable +2. **Schema Definition**: Use PySpark StructType for schema specification +3. **Data Types**: Support standard Spark SQL data types +4. **Error Handling**: Implement proper exception handling +5. **Resource Management**: Clean up resources properly in streaming sources +6. **Use load() for paths**: Specify file paths in `load("/path")`, not `option("path", "/path")` + +### Usage Patterns + +```python +# Custom data source implementation +class MyDataSource(DataSource): + def __init__(self, options): + self.options = options + + def name(self): + return "myformat" + + def schema(self): + return StructType([StructField("id", IntegerType(), True)]) + + def reader(self, schema): + return MyDataSourceReader(self.options, schema) + +# Registration and usage +spark.dataSource.register(MyDataSource) +df = spark.read.format("myformat").option("key", "value").load() +``` + +### Performance Optimizations + +1. **Arrow Integration**: Return `pyarrow.RecordBatch` for better serialization +2. **Partitioning**: Implement `partitions()` for parallel processing +3. **Lazy Evaluation**: Defer expensive operations until read time + +## Documentation +- Main docs: https://allisonwang-db.github.io/pyspark-data-sources/ +- Individual data source docs in `docs/datasources/` +- Spark Data Source API: https://spark.apache.org/docs/4.0.0/api/python/tutorial/sql/python_data_source.html +- API Source Code: https://github.com/apache/spark/blob/master/python/pyspark/sql/datasource.py + +### Data Source Docstring Guidelines +When creating new data sources, include these sections in the class docstring: + +**Required Sections:** +- Brief description and `Name: "format_name"` +- `Options` section documenting all parameters with types/defaults +- `Examples` section with registration and basic usage + +**Key Guidelines:** +- **Include schema output**: Show `df.printSchema()` results for clarity +- **Document error cases**: Show what happens with missing files/invalid options +- **Document partitioning strategy**: Show how data sources leverage partitioning to increase performance +- **Document Arrow optimization**: Show how data data sources use Arrow to transmit data diff --git a/contributing/DEVELOPMENT.md b/contributing/DEVELOPMENT.md index c5f2a6c..50fb372 100644 --- a/contributing/DEVELOPMENT.md +++ b/contributing/DEVELOPMENT.md @@ -111,10 +111,27 @@ pre-commit run --all-files ### Building Documentation -The project previously used MkDocs for documentation. Documentation now lives primarily in: -- README.md - Main documentation -- Docstrings in source code -- Contributing guides in /contributing +MkDocs (Material theme) powers the public documentation site hosted at `https://allisonwang-db.github.io/pyspark-data-sources/`. + +#### Preview Locally + +Run the live preview server (restarts on save): + +```bash +poetry run mkdocs serve +``` + +The site is served at `http://127.0.0.1:8000/` by default. + +#### Build for Verification + +Before sending a PR, ensure the static build succeeds and address any warnings: + +```bash +poetry run mkdocs build +``` + +Common warnings include missing navigation entries or broken links—update `mkdocs.yml` or the relevant Markdown files to resolve them. ### Writing Docstrings diff --git a/contributing/RELEASE.md b/contributing/RELEASE.md index 6fe91c1..a2ac3ed 100644 --- a/contributing/RELEASE.md +++ b/contributing/RELEASE.md @@ -173,6 +173,21 @@ gh workflow run docs.yml # Go to Actions tab → Deploy MkDocs to GitHub Pages → Run workflow ``` +### Releasing the Documentation Site + +Follow these steps when you want to publish documentation updates: + +1. Verify the docs build locally: + ```bash + poetry run mkdocs build + ``` +2. Commit any updated Markdown or configuration files and push to the default branch. This triggers the `docs.yml` workflow, which rebuilds and publishes the site to GitHub Pages. +3. (Optional) If you need to deploy immediately without waiting for CI, run: + ```bash + poetry run mkdocs gh-deploy + ``` + This command builds the site and pushes it to the `gh-pages` branch directly. + ### Documentation URLs - **Live Docs**: https://allisonwang-db.github.io/pyspark-data-sources diff --git a/docs/datasources/arrow.md b/docs/datasources/arrow.md new file mode 100644 index 0000000..c64d848 --- /dev/null +++ b/docs/datasources/arrow.md @@ -0,0 +1,6 @@ +# ArrowDataSource + +> Requires the [`PyArrow`](https://arrow.apache.org/docs/python/) library. You can install it manually: `pip install pyarrow` +> or use `pip install pyspark-data-sources[arrow]`. + +::: pyspark_datasources.arrow.ArrowDataSource diff --git a/docs/datasources/fake.md b/docs/datasources/fake.md new file mode 100644 index 0000000..acb3ddc --- /dev/null +++ b/docs/datasources/fake.md @@ -0,0 +1,6 @@ +# FakeDataSource + +> Requires the [`Faker`](https://github.com/joke2k/faker) library. You can install it manually: `pip install faker` +> or use `pip install pyspark-data-sources[faker]`. + +::: pyspark_datasources.fake.FakeDataSource diff --git a/docs/datasources/github.md b/docs/datasources/github.md new file mode 100644 index 0000000..22daa7f --- /dev/null +++ b/docs/datasources/github.md @@ -0,0 +1,3 @@ +# GithubDataSource + +::: pyspark_datasources.github.GithubDataSource diff --git a/docs/datasources/googlesheets.md b/docs/datasources/googlesheets.md new file mode 100644 index 0000000..084191b --- /dev/null +++ b/docs/datasources/googlesheets.md @@ -0,0 +1,3 @@ +# GoogleSheetsDataSource + +::: pyspark_datasources.googlesheets.GoogleSheetsDataSource diff --git a/docs/datasources/huggingface.md b/docs/datasources/huggingface.md new file mode 100644 index 0000000..f4937ab --- /dev/null +++ b/docs/datasources/huggingface.md @@ -0,0 +1,5 @@ +# HuggingFaceDatasets + +> Requires the [`datasets`](https://huggingface.co/docs/datasets/en/index) library. + +::: pyspark_datasources.huggingface.HuggingFaceDatasets diff --git a/docs/datasources/jsonplaceholder.md b/docs/datasources/jsonplaceholder.md new file mode 100644 index 0000000..a175dd9 --- /dev/null +++ b/docs/datasources/jsonplaceholder.md @@ -0,0 +1,3 @@ +# JSONPlaceholderDataSource + +::: pyspark_datasources.jsonplaceholder.JSONPlaceholderDataSource \ No newline at end of file diff --git a/docs/datasources/kaggle.md b/docs/datasources/kaggle.md new file mode 100644 index 0000000..b031ad0 --- /dev/null +++ b/docs/datasources/kaggle.md @@ -0,0 +1,5 @@ +# KaggleDataSource + +> Requires the [`kagglehub`](https://github.com/Kaggle/kagglehub) library. + +::: pyspark_datasources.kaggle.KaggleDataSource diff --git a/docs/datasources/lance.md b/docs/datasources/lance.md new file mode 100644 index 0000000..e6c7848 --- /dev/null +++ b/docs/datasources/lance.md @@ -0,0 +1,6 @@ +# LanceSink + +> Requires the [`Lance`](https://lancedb.github.io/lance/) library. You can install it manually: `pip install lance` +> or use `pip install pyspark-data-sources[lance]`. + +::: pyspark_datasources.lance.LanceSink diff --git a/docs/datasources/opensky.md b/docs/datasources/opensky.md new file mode 100644 index 0000000..f611186 --- /dev/null +++ b/docs/datasources/opensky.md @@ -0,0 +1,5 @@ +# OpenSkyDataSource + +> No additional dependencies required. Uses the OpenSky Network REST API for real-time aircraft tracking data. + +::: pyspark_datasources.opensky.OpenSkyDataSource diff --git a/docs/datasources/robinhood.md b/docs/datasources/robinhood.md new file mode 100644 index 0000000..ccfb33e --- /dev/null +++ b/docs/datasources/robinhood.md @@ -0,0 +1,6 @@ +# RobinhoodDataSource + +> Requires the [`pynacl`](https://github.com/pyca/pynacl) library for cryptographic signing. You can install it manually: `pip install pynacl` +> or use `pip install pyspark-data-sources[robinhood]`. + +::: pyspark_datasources.robinhood.RobinhoodDataSource diff --git a/docs/datasources/salesforce.md b/docs/datasources/salesforce.md new file mode 100644 index 0000000..688b61c --- /dev/null +++ b/docs/datasources/salesforce.md @@ -0,0 +1,6 @@ +# SalesforceDataSource + +> Requires the [`simple-salesforce`](https://github.com/simple-salesforce/simple-salesforce) library. You can install it manually: `pip install simple-salesforce` +> or use `pip install pyspark-data-sources[salesforce]`. + +::: pyspark_datasources.salesforce.SalesforceDataSource \ No newline at end of file diff --git a/docs/datasources/simplejson.md b/docs/datasources/simplejson.md new file mode 100644 index 0000000..c72e846 --- /dev/null +++ b/docs/datasources/simplejson.md @@ -0,0 +1,3 @@ +# SimpleJsonDataSource + +::: pyspark_datasources.simplejson.SimpleJsonDataSource diff --git a/docs/datasources/stock.md b/docs/datasources/stock.md new file mode 100644 index 0000000..b6b506e --- /dev/null +++ b/docs/datasources/stock.md @@ -0,0 +1,3 @@ +# StockDataSource + +::: pyspark_datasources.stock.StockDataSource \ No newline at end of file diff --git a/docs/datasources/weather.md b/docs/datasources/weather.md new file mode 100644 index 0000000..f7f5258 --- /dev/null +++ b/docs/datasources/weather.md @@ -0,0 +1,5 @@ +# WeatherDataSource + +> No additional dependencies required. Uses the Tomorrow.io API for weather data. Requires an API key. + +::: pyspark_datasources.weather.WeatherDataSource diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000..ed53cd7 --- /dev/null +++ b/docs/index.md @@ -0,0 +1,45 @@ +# PySpark Data Sources + +Custom Spark data sources for reading and writing data in Apache Spark, using the Python Data Source API. + +## Installation + +```bash +pip install pyspark-data-sources +``` + +If you want to install all extra dependencies, use: + +```bash +pip install pyspark-data-sources[all] +``` + +## Usage + +```python +from pyspark_datasources.fake import FakeDataSource + +# Register the data source +spark.dataSource.register(FakeDataSource) + +spark.read.format("fake").load().show() + +# For streaming data generation +spark.readStream.format("fake").load().writeStream.format("console").start() +``` + + +## Data Sources + +| Data Source | Short Name | Description | Dependencies | +| ------------------------------------------------------- | -------------- | --------------------------------------------- | --------------------- | +| [GithubDataSource](./datasources/github.md) | `github` | Read pull requests from a Github repository | None | +| [FakeDataSource](./datasources/fake.md) | `fake` | Generate fake data using the `Faker` library | `faker` | +| [HuggingFaceDatasets](./datasources/huggingface.md) | `huggingface` | Read datasets from the HuggingFace Hub | `datasets` | +| [StockDataSource](./datasources/stock.md) | `stock` | Read stock data from Alpha Vantage | None | +| [SalesforceDataSource](./datasources/salesforce.md) | `pyspark.datasource.salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` | +| [GoogleSheetsDataSource](./datasources/googlesheets.md) | `googlesheets` | Read table from public Google Sheets document | None | +| [KaggleDataSource](./datasources/kaggle.md) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` | +| [JSONPlaceHolder](./datasources/jsonplaceholder.md) | `jsonplaceholder` | Read JSON data for testing and prototyping | None | +| [RobinhoodDataSource](./datasources/robinhood.md) | `robinhood` | Read cryptocurrency market data from Robinhood API | `pynacl` | +| [SalesforceDataSource](./datasources/salesforce.md) | `salesforce` | Write streaming data to Salesforce objects |`simple-salesforce` | \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml new file mode 100644 index 0000000..cdace9f --- /dev/null +++ b/mkdocs.yml @@ -0,0 +1,53 @@ +# yaml-language-server: $schema=https://squidfunk.github.io/mkdocs-material/schema.json + +site_name: PySpark Data Sources +site_url: https://allisonwang-db.github.io/pyspark-data-sources +repo_url: https://github.com/allisonwang-db/pyspark-data-sources +theme: + name: material + +plugins: + - mkdocstrings: + default_handler: python + handlers: + python: + options: + docstring_style: numpy + - search + +nav: + - Index: index.md + - Guides: + - API Reference: api-reference.md + - Building Data Sources: building-data-sources.md + - Data Sources Guide: data-sources-guide.md + - Simple Stream Reader Architecture: simple-stream-reader-architecture.md + - Data Sources: + - datasources/arrow.md + - datasources/github.md + - datasources/fake.md + - datasources/huggingface.md + - datasources/stock.md + - datasources/simplejson.md + - datasources/salesforce.md + - datasources/googlesheets.md + - datasources/kaggle.md + - datasources/jsonplaceholder.md + - datasources/robinhood.md + - datasources/lance.md + - datasources/opensky.md + - datasources/weather.md + +markdown_extensions: + - pymdownx.highlight: + anchor_linenums: true + - pymdownx.inlinehilite + - pymdownx.snippets + - admonition + - pymdownx.arithmatex: + generic: true + - footnotes + - pymdownx.details + - pymdownx.superfences + - pymdownx.mark + - attr_list From 4a3693f3fc9186ddab6aba80b6c1c7786242d427 Mon Sep 17 00:00:00 2001 From: Allison Wang Date: Tue, 4 Nov 2025 13:57:17 -0800 Subject: [PATCH 2/2] docs: remove CLAUDE context --- CLAUDE.md | 248 ------------------------------------------------------ 1 file changed, 248 deletions(-) delete mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md deleted file mode 100644 index b6bad61..0000000 --- a/CLAUDE.md +++ /dev/null @@ -1,248 +0,0 @@ -# PySpark Data Sources - Project Context for Claude - -## Project Overview -This is a demonstration library showcasing custom Spark data sources built using Apache Spark 4.0's new Python Data Source API. The project provides various data source connectors for reading from external APIs and services. - -**Important**: This is a demo/educational project and not intended for production use. - -## Tech Stack -- **Language**: Python (3.9-3.12) -- **Framework**: Apache Spark 4.0+ (PySpark) -- **Package Management**: Poetry -- **Documentation**: MkDocs with Material theme -- **Testing**: pytest -- **Dependencies**: PyArrow, requests, faker, and optional extras - -## Project Structure -``` -pyspark_datasources/ -├── __init__.py # Main package exports -├── fake.py # Fake data generator using Faker -├── github.py # GitHub repository data connector -├── googlesheets.py # Public Google Sheets reader -├── huggingface.py # Hugging Face datasets connector -├── kaggle.py # Kaggle datasets connector -├── lance.py # Lance vector database connector -├── opensky.py # OpenSky flight data connector -├── simplejson.py # JSON writer for Databricks DBFS -├── stock.py # Alpha Vantage stock data reader -└── weather.py # Weather data connector -``` - -## Available Data Sources -| Short Name | File | Description | Dependencies | -|---------------|------|-------------|--------------| -| `fake` | fake.py | Generate fake data using Faker | faker | -| `github` | github.py | Read GitHub repository PRs | None | -| `googlesheets`| googlesheets.py | Read public Google Sheets | None | -| `huggingface` | huggingface.py | Access Hugging Face datasets | datasets | -| `kaggle` | kaggle.py | Read Kaggle datasets | kagglehub, pandas | -| `opensky` | opensky.py | Flight data from OpenSky Network | None | -| `simplejson` | simplejson.py | Write JSON to Databricks DBFS | databricks-sdk | -| `stock` | stock.py | Stock data from Alpha Vantage | None | - -## Development Commands - -### Environment Setup -```bash -poetry install # Install dependencies -poetry install --extras all # Install all optional dependencies -poetry shell # Activate virtual environment -``` - -### Testing -```bash -# Note: On macOS, set this environment variable to avoid fork safety issues -export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES - -pytest # Run all tests -pytest tests/test_data_sources.py # Run specific test file -pytest tests/test_data_sources.py::test_arrow_datasource_single_file -v # Run a specific test -``` - -### Documentation -```bash -mkdocs serve # Start local docs server -mkdocs build # Build static documentation -``` - -### Code Formatting -This project uses [Ruff](https://github.com/astral-sh/ruff) for code formatting and linting. - -```bash -poetry run ruff format . # Format code -poetry run ruff check . # Run linter -poetry run ruff check . --fix # Run linter with auto-fix -``` - -### Package Management -Please refer to RELEASE.md for more details. -```bash -poetry build # Build package -poetry publish # Publish to PyPI (requires auth) -poetry add # Add new dependency -poetry update # Update dependencies -``` - -## Usage Patterns -All data sources follow the Spark Data Source API pattern: - -```python -from pyspark_datasources import FakeDataSource - -# Register the data source -spark.dataSource.register(FakeDataSource) - -# Batch reading -df = spark.read.format("fake").option("numRows", 100).load() - -# Streaming (where supported) -stream = spark.readStream.format("fake").load() -``` - -## Testing Strategy -- Tests use pytest with PySpark session fixtures -- Each data source has basic functionality tests -- Tests verify data reading and schema validation -- Some tests may require external API access - -## Key Implementation Details -- All data sources inherit from Spark's DataSource base class -- Implements reader() method for batch reading -- Some implement streamReader() for streaming -- Schema is defined using PySpark StructType -- Options are passed via Spark's option() method - -## External Dependencies -- **GitHub API**: Uses public API, no auth required -- **Alpha Vantage**: Stock data API (may require API key) -- **Google Sheets**: Public sheets only, no auth -- **Kaggle**: Requires Kaggle API credentials -- **Databricks**: SDK for DBFS access -- **OpenSky**: Public flight data API - -## Common Issues -- Ensure PySpark >= 4.0.0 is installed -- Some data sources require API keys/credentials -- Network connectivity required for external APIs -- Rate limiting may affect some external services - -## Python Data Source API Specification - -### Core Abstract Base Classes - -#### DataSource -Primary abstract base class for custom data sources supporting read/write operations. - -**Key Methods:** -- `__init__(self, options: Dict[str, str])` - Initialize with user options (Optional; base class provides default) -- `name() -> str` - Return format name (Optional to override; defaults to class name) -- `schema() -> StructType` - Define data source schema (Required) -- `reader(schema: StructType) -> DataSourceReader` - Create batch reader (Required if batch read is supported) -- `writer(schema: StructType, overwrite: bool) -> DataSourceWriter` - Create batch writer (Required if batch write is supported) -- `streamReader(schema: StructType) -> DataSourceStreamReader` - Create streaming reader (Required if streaming read is supported and `simpleStreamReader` is not implemented) -- `streamWriter(schema: StructType, overwrite: bool) -> DataSourceStreamWriter` - Create streaming writer (Required if streaming write is supported) -- `simpleStreamReader(schema: StructType) -> SimpleDataSourceStreamReader` - Create simple streaming reader (Required if streaming read is supported and `streamReader` is not implemented) - -#### DataSourceReader -Abstract base class for reading data from sources. - -**Key Methods:** -- `read(partition) -> Iterator` - Read data from partition, returns tuples/Rows/pyarrow.RecordBatch (Required) -- `partitions() -> List[InputPartition]` - Return input partitions for parallel reading (Optional; defaults to a single partition) - -#### DataSourceStreamReader -Abstract base class for streaming data sources with offset management. - -**Key Methods:** -- `initialOffset() -> dict` - Return starting offset (Required) -- `latestOffset() -> dict` - Return latest available offset (Required) -- `partitions(start: dict, end: dict) -> List[InputPartition]` - Get partitions for offset range (Required) -- `read(partition) -> Iterator` - Read data from partition (Required) -- `commit(end: dict) -> None` - Mark offsets as processed (Optional) -- `stop() -> None` - Clean up resources (Optional) - -#### SimpleDataSourceStreamReader -Simplified streaming reader interface without partition planning. - -**Key Methods:** -- `initialOffset() -> dict` - Return starting offset (Required) -- `read(start: dict) -> Tuple[Iterator, dict]` - Read from start offset; return an iterator and the next start offset (Required) -- `readBetweenOffsets(start: dict, end: dict) -> Iterator` - Deterministic replay between offsets for recovery (Optional; recommended for reliable recovery) -- `commit(end: dict) -> None` - Mark offsets as processed (Optional) - -#### DataSourceWriter -Abstract base class for writing data to external sources. - -**Key Methods:** -- `write(iterator) -> WriteResult` - Write data from iterator (Required) -- `abort(messages: List[WriterCommitMessage])` - Handle write failures (Optional) -- `commit(messages: List[WriterCommitMessage]) -> WriteResult` - Commit successful writes (Required) - -#### DataSourceStreamWriter -Abstract base class for writing data to external sinks in streaming queries. - -**Key Methods:** -- `write(iterator) -> WriterCommitMessage` - Write data for a partition and return a commit message (Required) -- `commit(messages: List[WriterCommitMessage], batchId: int) -> None` - Commit successful microbatch writes (Required) -- `abort(messages: List[WriterCommitMessage], batchId: int) -> None` - Handle write failures for a microbatch (Optional) - -#### DataSourceArrowWriter -Optimized writer using PyArrow RecordBatch for improved performance. - -### Implementation Requirements - -1. **Serialization**: All classes must be pickle serializable -2. **Schema Definition**: Use PySpark StructType for schema specification -3. **Data Types**: Support standard Spark SQL data types -4. **Error Handling**: Implement proper exception handling -5. **Resource Management**: Clean up resources properly in streaming sources -6. **Use load() for paths**: Specify file paths in `load("/path")`, not `option("path", "/path")` - -### Usage Patterns - -```python -# Custom data source implementation -class MyDataSource(DataSource): - def __init__(self, options): - self.options = options - - def name(self): - return "myformat" - - def schema(self): - return StructType([StructField("id", IntegerType(), True)]) - - def reader(self, schema): - return MyDataSourceReader(self.options, schema) - -# Registration and usage -spark.dataSource.register(MyDataSource) -df = spark.read.format("myformat").option("key", "value").load() -``` - -### Performance Optimizations - -1. **Arrow Integration**: Return `pyarrow.RecordBatch` for better serialization -2. **Partitioning**: Implement `partitions()` for parallel processing -3. **Lazy Evaluation**: Defer expensive operations until read time - -## Documentation -- Main docs: https://allisonwang-db.github.io/pyspark-data-sources/ -- Individual data source docs in `docs/datasources/` -- Spark Data Source API: https://spark.apache.org/docs/4.0.0/api/python/tutorial/sql/python_data_source.html -- API Source Code: https://github.com/apache/spark/blob/master/python/pyspark/sql/datasource.py - -### Data Source Docstring Guidelines -When creating new data sources, include these sections in the class docstring: - -**Required Sections:** -- Brief description and `Name: "format_name"` -- `Options` section documenting all parameters with types/defaults -- `Examples` section with registration and basic usage - -**Key Guidelines:** -- **Include schema output**: Show `df.printSchema()` results for clarity -- **Document error cases**: Show what happens with missing files/invalid options -- **Document partitioning strategy**: Show how data sources leverage partitioning to increase performance -- **Document Arrow optimization**: Show how data data sources use Arrow to transmit data