diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..3705425 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,222 @@ +# PySpark Data Sources - Project Context for Claude + +## Project Overview +This is a demonstration library showcasing custom Spark data sources built using Apache Spark 4.0's new Python Data Source API. The project provides various data source connectors for reading from external APIs and services. + +**Important**: This is a demo/educational project and not intended for production use. + +## Tech Stack +- **Language**: Python (3.9-3.12) +- **Framework**: Apache Spark 4.0+ (PySpark) +- **Package Management**: Poetry +- **Documentation**: MkDocs with Material theme +- **Testing**: pytest +- **Dependencies**: PyArrow, requests, faker, and optional extras + +## Project Structure +``` +pyspark_datasources/ +├── __init__.py # Main package exports +├── fake.py # Fake data generator using Faker +├── github.py # GitHub repository data connector +├── googlesheets.py # Public Google Sheets reader +├── huggingface.py # Hugging Face datasets connector +├── kaggle.py # Kaggle datasets connector +├── lance.py # Lance vector database connector +├── opensky.py # OpenSky flight data connector +├── simplejson.py # JSON writer for Databricks DBFS +├── stock.py # Alpha Vantage stock data reader +└── weather.py # Weather data connector +``` + +## Available Data Sources +| Short Name | File | Description | Dependencies | +|---------------|------|-------------|--------------| +| `fake` | fake.py | Generate fake data using Faker | faker | +| `github` | github.py | Read GitHub repository PRs | None | +| `googlesheets`| googlesheets.py | Read public Google Sheets | None | +| `huggingface` | huggingface.py | Access Hugging Face datasets | datasets | +| `kaggle` | kaggle.py | Read Kaggle datasets | kagglehub, pandas | +| `opensky` | opensky.py | Flight data from OpenSky Network | None | +| `simplejson` | simplejson.py | Write JSON to Databricks DBFS | databricks-sdk | +| `stock` | stock.py | Stock data from Alpha Vantage | None | + +## Development Commands + +### Environment Setup +```bash +poetry install # Install dependencies +poetry install --extras all # Install all optional dependencies +poetry shell # Activate virtual environment +``` + +### Testing +```bash +# Note: On macOS, set this environment variable to avoid fork safety issues +export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES + +pytest # Run all tests +pytest tests/test_data_sources.py # Run specific test file +pytest tests/test_data_sources.py::test_arrow_datasource_single_file -v # Run a specific test +``` + +### Documentation +```bash +mkdocs serve # Start local docs server +mkdocs build # Build static documentation +``` + +### Package Management +Please refer to RELEASE.md for more details. +```bash +poetry build # Build package +poetry publish # Publish to PyPI (requires auth) +poetry add # Add new dependency +poetry update # Update dependencies +``` + +## Usage Patterns +All data sources follow the Spark Data Source API pattern: + +```python +from pyspark_datasources import FakeDataSource + +# Register the data source +spark.dataSource.register(FakeDataSource) + +# Batch reading +df = spark.read.format("fake").option("numRows", 100).load() + +# Streaming (where supported) +stream = spark.readStream.format("fake").load() +``` + +## Testing Strategy +- Tests use pytest with PySpark session fixtures +- Each data source has basic functionality tests +- Tests verify data reading and schema validation +- Some tests may require external API access + +## Key Implementation Details +- All data sources inherit from Spark's DataSource base class +- Implements reader() method for batch reading +- Some implement streamReader() for streaming +- Schema is defined using PySpark StructType +- Options are passed via Spark's option() method + +## External Dependencies +- **GitHub API**: Uses public API, no auth required +- **Alpha Vantage**: Stock data API (may require API key) +- **Google Sheets**: Public sheets only, no auth +- **Kaggle**: Requires Kaggle API credentials +- **Databricks**: SDK for DBFS access +- **OpenSky**: Public flight data API + +## Common Issues +- Ensure PySpark >= 4.0.0 is installed +- Some data sources require API keys/credentials +- Network connectivity required for external APIs +- Rate limiting may affect some external services + +## Python Data Source API Specification + +### Core Abstract Base Classes + +#### DataSource +Primary abstract base class for custom data sources supporting read/write operations. + +**Key Methods:** +- `__init__(self, options: Dict[str, str])` - Initialize with user options +- `name() -> str` - Return format name (defaults to class name) +- `schema() -> StructType` - Define data source schema +- `reader(schema: StructType) -> DataSourceReader` - Create batch reader +- `writer(schema: StructType, overwrite: bool) -> DataSourceWriter` - Create batch writer +- `streamReader(schema: StructType) -> DataSourceStreamReader` - Create streaming reader +- `streamWriter(schema: StructType, overwrite: bool) -> DataSourceStreamWriter` - Create streaming writer +- `simpleStreamReader(schema: StructType) -> SimpleDataSourceStreamReader` - Create simple streaming reader + +#### DataSourceReader +Abstract base class for reading data from sources. + +**Key Methods:** +- `read(partition) -> Iterator` - Read data from partition, returns tuples/Rows/pyarrow.RecordBatch +- `partitions() -> List[InputPartition]` - Return input partitions for parallel reading + +#### DataSourceStreamReader +Abstract base class for streaming data sources with offset management. + +**Key Methods:** +- `initialOffset() -> Offset` - Return starting offset +- `latestOffset() -> Offset` - Return latest available offset +- `partitions(start: Offset, end: Offset) -> List[InputPartition]` - Get partitions for offset range +- `read(partition) -> Iterator` - Read data from partition +- `commit(end: Offset)` - Mark offsets as processed +- `stop()` - Clean up resources + +#### DataSourceWriter +Abstract base class for writing data to external sources. + +**Key Methods:** +- `write(iterator) -> WriteResult` - Write data from iterator +- `abort(messages: List[WriterCommitMessage])` - Handle write failures +- `commit(messages: List[WriterCommitMessage]) -> WriteResult` - Commit successful writes + +#### DataSourceArrowWriter +Optimized writer using PyArrow RecordBatch for improved performance. + +### Implementation Requirements + +1. **Serialization**: All classes must be pickle serializable +2. **Schema Definition**: Use PySpark StructType for schema specification +3. **Data Types**: Support standard Spark SQL data types +4. **Error Handling**: Implement proper exception handling +5. **Resource Management**: Clean up resources properly in streaming sources +6. **Use load() for paths**: Specify file paths in `load("/path")`, not `option("path", "/path")` + +### Usage Patterns + +```python +# Custom data source implementation +class MyDataSource(DataSource): + def __init__(self, options): + self.options = options + + def name(self): + return "myformat" + + def schema(self): + return StructType([StructField("id", IntegerType(), True)]) + + def reader(self, schema): + return MyDataSourceReader(self.options, schema) + +# Registration and usage +spark.dataSource.register(MyDataSource) +df = spark.read.format("myformat").option("key", "value").load() +``` + +### Performance Optimizations + +1. **Arrow Integration**: Return `pyarrow.RecordBatch` for better serialization +2. **Partitioning**: Implement `partitions()` for parallel processing +3. **Lazy Evaluation**: Defer expensive operations until read time + +## Documentation +- Main docs: https://allisonwang-db.github.io/pyspark-data-sources/ +- Individual data source docs in `docs/datasources/` +- Spark Data Source API: https://spark.apache.org/docs/4.0.0/api/python/tutorial/sql/python_data_source.html +- API Source Code: https://github.com/apache/spark/blob/master/python/pyspark/sql/datasource.py + +### Data Source Docstring Guidelines +When creating new data sources, include these sections in the class docstring: + +**Required Sections:** +- Brief description and `Name: "format_name"` +- `Options` section documenting all parameters with types/defaults +- `Examples` section with registration and basic usage + +**Key Guidelines:** +- **Include schema output**: Show `df.printSchema()` results for clarity +- **Document error cases**: Show what happens with missing files/invalid options +- **Document partitioning strategy**: Show how data sources leverage partitioning to increase performance +- **Document Arrow optimization**: Show how data data sources use Arrow to transmit data diff --git a/RELEASE.md b/RELEASE.md index 1480068..6fe91c1 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -119,6 +119,21 @@ Then follow steps 2-4 above. - Authenticate: `gh auth login` - Check repository access: `gh repo view` +### PyArrow Compatibility Issues + +If you see `objc_initializeAfterForkError` crashes on macOS, set this environment variable: + +```bash +# For single commands +OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES python your_script.py + +# For Poetry environment +OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES poetry run python your_script.py + +# To set permanently in your shell (add to ~/.zshrc or ~/.bash_profile): +export OBJC_DISABLE_INITIALIZE_FORK_SAFETY=YES +``` + ## Useful Poetry Version Commands ```bash diff --git a/pyspark_datasources/__init__.py b/pyspark_datasources/__init__.py index 680dcda..3076200 100644 --- a/pyspark_datasources/__init__.py +++ b/pyspark_datasources/__init__.py @@ -1,3 +1,4 @@ +from .arrow import ArrowDataSource from .fake import FakeDataSource from .github import GithubDataSource from .googlesheets import GoogleSheetsDataSource diff --git a/pyspark_datasources/arrow.py b/pyspark_datasources/arrow.py new file mode 100644 index 0000000..899175c --- /dev/null +++ b/pyspark_datasources/arrow.py @@ -0,0 +1,161 @@ +from typing import List, Iterator, Union +import os +import glob + +import pyarrow as pa +from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition +from pyspark.sql.types import StructType + + +class ArrowDataSource(DataSource): + """ + A data source for reading Apache Arrow files (.arrow) using PyArrow. + + This data source supports reading Arrow IPC files from local filesystem or + cloud storage, leveraging PyArrow's efficient columnar format and returning + PyArrow RecordBatch objects for optimal performance with PySpark's Arrow integration. + + Name: `arrow` + + Path Support + ----------- + Supports various path patterns in the load() method: + - Single file: "/path/to/file.arrow" + - Glob patterns: "/path/to/*.arrow" or "/path/to/data*.arrow" + - Directory: "/path/to/directory" (reads all .arrow files) + + Partitioning Strategy + -------------------- + The data source creates one partition per file for parallel processing: + - Single file: 1 partition + - Multiple files: N partitions (one per file) + - Directory: N partitions (one per .arrow file found) + + This enables Spark to process multiple files in parallel across different + executor cores, improving performance for large datasets. + + Performance Notes + ---------------- + - Returns PyArrow RecordBatch objects for zero-copy data transfer + - Leverages PySpark 4.0's enhanced Arrow integration + - For DataFrames created in Spark, consider using the new df.to_arrow() method + in PySpark 4.0+ for efficient Arrow conversion + + Examples + -------- + Register the data source: + + >>> from pyspark_datasources import ArrowDataSource + >>> spark.dataSource.register(ArrowDataSource) + + Read a single Arrow file: + + >>> df = spark.read.format("arrow").load("/path/to/employees.arrow") + >>> df.show() + +---+-----------+---+-------+----------+------+ + | id| name|age| salary|department|active| + +---+-----------+---+-------+----------+------+ + | 1|Alice Smith| 28|65000.0| Tech| true| + +---+-----------+---+-------+----------+------+ + + Read multiple files with glob pattern (creates multiple partitions): + + >>> df = spark.read.format("arrow").load("/data/sales/sales_*.arrow") + >>> df.show() + >>> print(f"Number of partitions: {df.rdd.getNumPartitions()}") + + Read all Arrow files in a directory: + + >>> df = spark.read.format("arrow").load("/data/warehouse/") + >>> df.show() + + Working with the result DataFrame and PySpark 4.0 Arrow integration: + + >>> df = spark.read.format("arrow").load("/path/to/data.arrow") + >>> + >>> # Process with Spark + >>> result = df.filter(df.age > 25).groupBy("department").count() + >>> result.show() + >>> + >>> # Convert back to Arrow using PySpark 4.0+ feature + >>> arrow_table = result.to_arrow() # New in PySpark 4.0+ + >>> print(f"Arrow table: {arrow_table}") + + Schema inference example: + + >>> # Schema is automatically inferred from the first file + >>> df = spark.read.format("arrow").load("/path/to/*.arrow") + >>> df.printSchema() + root + |-- product_id: long (nullable = true) + |-- product_name: string (nullable = true) + |-- price: double (nullable = true) + """ + + @classmethod + def name(cls): + return "arrow" + + def schema(self) -> StructType: + path = self.options.get("path") + if not path: + raise ValueError("Path option is required for Arrow data source") + + # Get the first file to determine schema + files = self._get_files(path) + if not files: + raise ValueError(f"No files found at path: {path}") + + # Read schema from first file (Arrow IPC format) + with pa.ipc.open_file(files[0]) as reader: + table = reader.read_all() + + # Convert PyArrow schema to Spark schema using PySpark utility + from pyspark.sql.pandas.types import from_arrow_schema + return from_arrow_schema(table.schema) + + def reader(self, schema: StructType) -> "ArrowDataSourceReader": + return ArrowDataSourceReader(schema, self.options) + + def _get_files(self, path: str) -> List[str]: + """Get list of files matching the path pattern.""" + if os.path.isfile(path): + return [path] + elif os.path.isdir(path): + # Find all arrow files in directory + arrow_files = glob.glob(os.path.join(path, "*.arrow")) + return sorted(arrow_files) + else: + # Treat as glob pattern + return sorted(glob.glob(path)) + + + +class ArrowDataSourceReader(DataSourceReader): + """Reader for Arrow data source.""" + + def __init__(self, schema: StructType, options: dict) -> None: + self.schema = schema + self.options = options + self.path = options.get("path") + if not self.path: + raise ValueError("Path option is required") + + def partitions(self) -> List[InputPartition]: + """Create partitions, one per file for parallel reading.""" + data_source = ArrowDataSource(self.options) + files = data_source._get_files(self.path) + return [InputPartition(file_path) for file_path in files] + + def read(self, partition: InputPartition) -> Iterator[pa.RecordBatch]: + """Read data from a single file partition, returning PyArrow RecordBatch.""" + file_path = partition.value + + try: + # Read Arrow IPC file + with pa.ipc.open_file(file_path) as reader: + for i in range(reader.num_record_batches): + batch = reader.get_batch(i) + yield batch + except Exception as e: + raise RuntimeError(f"Failed to read Arrow file {file_path}: {str(e)}") \ No newline at end of file diff --git a/tests/test_data_sources.py b/tests/test_data_sources.py index 54cb133..836c416 100644 --- a/tests/test_data_sources.py +++ b/tests/test_data_sources.py @@ -1,4 +1,7 @@ import pytest +import tempfile +import os +import pyarrow as pa from pyspark.sql import SparkSession from pyspark_datasources import * @@ -91,3 +94,87 @@ def test_salesforce_datasource_registration(spark): error_msg = str(e).lower() # The error can be about unsupported mode or missing writer assert "unsupported" in error_msg or "writer" in error_msg or "not implemented" in error_msg + + +def test_arrow_datasource_single_file(spark): + """Test reading a single Arrow file.""" + spark.dataSource.register(ArrowDataSource) + + # Create test data + test_data = pa.table({ + 'id': [1, 2, 3], + 'name': ['Alice', 'Bob', 'Charlie'], + 'age': [25, 30, 35] + }) + + # Write to temporary Arrow file + with tempfile.NamedTemporaryFile(suffix='.arrow', delete=False) as tmp_file: + tmp_path = tmp_file.name + + try: + with pa.ipc.new_file(tmp_path, test_data.schema) as writer: + writer.write_table(test_data) + + # Read using Arrow data source + df = spark.read.format("arrow").load(tmp_path) + + # Verify results + assert df.count() == 3 + assert len(df.columns) == 3 + assert set(df.columns) == {'id', 'name', 'age'} + + # Verify data content + rows = df.collect() + assert len(rows) == 3 + assert rows[0]['name'] == 'Alice' + + finally: + # Clean up + if os.path.exists(tmp_path): + os.unlink(tmp_path) + + +def test_arrow_datasource_multiple_files(spark): + """Test reading multiple Arrow files from a directory.""" + spark.dataSource.register(ArrowDataSource) + + # Create test data for multiple files + test_data1 = pa.table({ + 'id': [1, 2], + 'name': ['Alice', 'Bob'], + 'department': ['Engineering', 'Sales'] + }) + + test_data2 = pa.table({ + 'id': [3, 4], + 'name': ['Charlie', 'Diana'], + 'department': ['Marketing', 'HR'] + }) + + # Create temporary directory + with tempfile.TemporaryDirectory() as tmp_dir: + # Write multiple Arrow files + file1_path = os.path.join(tmp_dir, 'data1.arrow') + file2_path = os.path.join(tmp_dir, 'data2.arrow') + + with pa.ipc.new_file(file1_path, test_data1.schema) as writer: + writer.write_table(test_data1) + + with pa.ipc.new_file(file2_path, test_data2.schema) as writer: + writer.write_table(test_data2) + + # Read using Arrow data source from directory + df = spark.read.format("arrow").load(tmp_dir) + + # Verify results + assert df.count() == 4 # 2 rows from each file + assert len(df.columns) == 3 + assert set(df.columns) == {'id', 'name', 'department'} + + # Verify partitioning (should have 2 partitions, one per file) + assert df.rdd.getNumPartitions() == 2 + + # Verify all data is present + rows = df.collect() + names = {row['name'] for row in rows} + assert names == {'Alice', 'Bob', 'Charlie', 'Diana'}