Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ spark.readStream.format("fake").load().writeStream.format("console").start()
| [GoogleSheetsDataSource](pyspark_datasources/googlesheets.py) | `googlesheets` | Read table from public Google Sheets | None |
| [KaggleDataSource](pyspark_datasources/kaggle.py) | `kaggle` | Read datasets from Kaggle | `kagglehub`, `pandas` |
| [SimpleJsonDataSource](pyspark_datasources/simplejson.py) | `simplejson` | Write JSON data to Databricks DBFS | `databricks-sdk` |
| [OpenSkyDataSource](pyspark_datasources/opensky.py) | `opensky` | Read from OpenSky Network. | None |

See more here: https://allisonwang-db.github.io/pyspark-data-sources/.

Expand Down
350 changes: 350 additions & 0 deletions pyspark_datasources/opensky.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
"""
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add the docstring under the python data source class?

OpenSky Network Data Source for Apache Spark - Academic/Private Use Example

This module provides a custom Spark data source for streaming real-time aircraft tracking data from the OpenSky Network API (https://opensky-network.org/). The OpenSky Network is a community-based receiver network that collects air traffic surveillance data and makes it available as open data to researchers and enthusiasts.

Features:
- Real-time streaming of aircraft positions, velocities, and flight data
- Support for multiple geographic regions (Europe, North America, Asia, etc.)
- OAuth2 authentication for higher API rate limits (4000 vs 100 calls/day)
- Robust error handling with automatic retries and rate limiting
- Data validation and type-safe parsing of aircraft state vectors
- Configurable bounding boxes for focused data collection

Usage Example (Academic/Research):
# Basic usage with region NORTH_AMERICA
df = spark.readStream.format("opensky").load()

# With specific region and authentication
df = spark.readStream.format("opensky") \
.option("region", "EUROPE") \
.option("client_id", "your_research_client_id") \
.option("client_secret", "your_research_client_secret") \
.load()

Data Schema:
Each record contains comprehensive aircraft information including position (lat/lon),altitude, velocity, heading, call sign, ICAO identifier, and various flight status flags. All timestamps are in UTC timezone for consistency.

Feed your own data to OpenSky Network https://opensky-network.org/feed

Rate Limits & Responsible Usage:
- Anonymous access: 100 API calls per day
- Authenticated access: 4000 API calls per day (research accounts)
- Minimum 5-second interval between requests
- 8000 API calls if you feed your own data to the OpenSky Network feed (https://opensky-network.org/feed)

Data Attribution:
When using this data in research or publications, please cite:
"The OpenSky Network, https://opensky-network.org"

Author: Frank Munz, Databricks - Example Only, No Warranty
Purpose: Educational Example / Academic Research Tool
Version: 1.0
Last Updated: July-2025

================================================================================
LEGAL NOTICES & TERMS OF USE

USAGE RESTRICTIONS:
- Academic research and educational purposes only
- Commercial use requires explicit permission from OpenSky Network
- Must comply with OpenSky Network Terms of Use: https://opensky-network.org/about/terms-of-use

If you create a publication (including web pages, papers published by a third party, and publicly available presentations) using data from the OpenSky Network data set, you should cite the original OpenSky paper as follows:

Matthias Schäfer, Martin Strohmeier, Vincent Lenders, Ivan Martinovic and Matthias Wilhelm.
"Bringing Up OpenSky: A Large-scale ADS-B Sensor Network for Research".
In Proceedings of the 13th IEEE/ACM International Symposium on Information Processing in Sensor Networks (IPSN), pages 83-94, April 2014.


DISCLAIMER & LIABILITY:
This code is provided "AS IS" for educational purposes only. The author and Databricks make no warranties, express or implied, and disclaim all liability for any damages, losses, or issues arising from the use of this code. Users assume full responsibility for compliance with all applicable terms of service, laws, and regulations. Use at your own risk.

For commercial use, contact OpenSky Network directly.
================================================================================


"""


import requests
import time
from datetime import datetime, timezone
from typing import Dict, List, Tuple, Any, Optional, Iterator
from dataclasses import dataclass
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from enum import Enum

from pyspark.sql.datasource import SimpleDataSourceStreamReader, DataSource
from pyspark.sql.types import *
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace star import with explicit imports

Star imports reduce code clarity and can lead to namespace pollution. Replace with explicit imports for better maintainability.

-from pyspark.sql.types import *
+from pyspark.sql.types import (
+    StructType, StructField, StringType, IntegerType, 
+    DoubleType, BooleanType, TimestampType, ArrayType
+)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from pyspark.sql.types import *
-from pyspark.sql.types import *
+from pyspark.sql.types import (
+ StructType,
+ StructField,
+ StringType,
+ IntegerType,
+ DoubleType,
+ BooleanType,
+ TimestampType,
+ ArrayType,
+)
🧰 Tools
🪛 Ruff (0.12.2)

80-80: from pyspark.sql.types import * used; unable to detect undefined names

(F403)

🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py at line 80, replace the star import from
pyspark.sql.types with explicit imports of only the needed classes or functions.
Identify which specific types are used in the file and import them individually
to improve code clarity and avoid namespace pollution.


DS_NAME = "opensky"

@dataclass
class BoundingBox:
lamin: float # Minimum latitude
lamax: float # Maximum latitude
lomin: float # Minimum longitude
lomax: float # Maximum longitude

class Region(Enum):
EUROPE = BoundingBox(35.0, 72.0, -25.0, 45.0)
NORTH_AMERICA = BoundingBox(7.0, 72.0, -168.0, -60.0)
SOUTH_AMERICA = BoundingBox(-56.0, 15.0, -90.0, -30.0)
ASIA = BoundingBox(-10.0, 82.0, 45.0, 180.0)
AUSTRALIA = BoundingBox(-50.0, -10.0, 110.0, 180.0)
AFRICA = BoundingBox(-35.0, 37.0, -20.0, 52.0)
GLOBAL = BoundingBox(-90.0, 90.0, -180.0, 180.0)

class OpenSkyAPIError(Exception):
"""Base exception for OpenSky API errors"""
pass

class RateLimitError(OpenSkyAPIError):
"""Raised when API rate limit is exceeded"""
pass

class OpenSkyStreamReader(SimpleDataSourceStreamReader):

DEFAULT_REGION = "NORTH_AMERICA"
MIN_REQUEST_INTERVAL = 5.0 # seconds between requests
ANONYMOUS_RATE_LIMIT = 100 # calls per day
AUTHENTICATED_RATE_LIMIT = 4000 # calls per day
MAX_RETRIES = 3
RETRY_BACKOFF = 2
RETRY_STATUS_CODES = [429, 500, 502, 503, 504]

def __init__(self, schema: StructType, options: Dict[str, str]):
super().__init__()
self.schema = schema
self.options = options
self.session = self._create_session()
self.last_request_time = 0

region_name = options.get('region', self.DEFAULT_REGION).upper()
try:
self.bbox = Region[region_name].value
except KeyError:
print(f"Invalid region '{region_name}'. Defaulting to {self.DEFAULT_REGION}.")
self.bbox = Region[self.DEFAULT_REGION].value
Comment on lines +129 to +130
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use logging instead of print statements

For production code, use Python's logging module instead of print statements for better control over log levels and outputs.

Add at the top of the file:

import logging
logger = logging.getLogger(__name__)

Then update:

-            print(f"Invalid region '{region_name}'. Defaulting to {self.DEFAULT_REGION}.")
+            logger.warning(f"Invalid region '{region_name}'. Defaulting to {self.DEFAULT_REGION}.")
🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py around lines 129 to 130, replace the print
statement used for logging the invalid region message with a logging call using
Python's logging module. First, add "import logging" and create a logger
instance with "logger = logging.getLogger(__name__)" at the top of the file.
Then change the print statement to use "logger.warning" to log the message about
the invalid region and defaulting to the default region.


self.client_id = options.get('client_id')
self.client_secret = options.get('client_secret')
self.access_token = None
self.token_expires_at = 0

if self.client_id and self.client_secret:
self._get_access_token() # OAuth2 authentication
self.rate_limit = self.AUTHENTICATED_RATE_LIMIT
else:
self.rate_limit = self.ANONYMOUS_RATE_LIMIT

def _get_access_token(self):
"""Get OAuth2 access token using client credentials flow"""
current_time = time.time()
if self.access_token and current_time < self.token_expires_at:
return # Token still valid

token_url = "https://auth.opensky-network.org/auth/realms/opensky-network/protocol/openid-connect/token"
data = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}

try:
response = requests.post(token_url, data=data, timeout=10)
response.raise_for_status()
token_data = response.json()

self.access_token = token_data["access_token"]
expires_in = token_data.get("expires_in", 1800)
self.token_expires_at = current_time + expires_in - 300

except requests.exceptions.RequestException as e:
raise OpenSkyAPIError(f"Failed to get access token: {str(e)}")
Comment on lines +165 to +166
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use exception chaining for better error context

When re-raising exceptions, use from e to preserve the original exception context.

-        except requests.exceptions.RequestException as e:
-            raise OpenSkyAPIError(f"Failed to get access token: {str(e)}")
+        except requests.exceptions.RequestException as e:
+            raise OpenSkyAPIError(f"Failed to get access token: {str(e)}") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except requests.exceptions.RequestException as e:
raise OpenSkyAPIError(f"Failed to get access token: {str(e)}")
except requests.exceptions.RequestException as e:
raise OpenSkyAPIError(f"Failed to get access token: {str(e)}") from e
🧰 Tools
🪛 Ruff (0.12.2)

166-166: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py around lines 165 to 166, the exception is
re-raised without chaining, which loses the original error context. Modify the
raise statement to use "raise OpenSkyAPIError(...) from e" to preserve the
original exception context for better debugging.


def _create_session(self) -> requests.Session:
"""Create and configure requests session with retry logic"""
session = requests.Session()
retry_strategy = Retry(
total=self.MAX_RETRIES,
backoff_factor=self.RETRY_BACKOFF,
status_forcelist=self.RETRY_STATUS_CODES
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session

def initialOffset(self) -> Dict[str, int]:
return {'last_fetch': 0}

def _handle_rate_limit(self):
"""Ensure e MIN_REQUEST_INTERVAL seconds between requests"""
current_time = time.time()
time_since_last_request = current_time - self.last_request_time

if time_since_last_request < self.MIN_REQUEST_INTERVAL:
sleep_time = self.MIN_REQUEST_INTERVAL - time_since_last_request
time.sleep(sleep_time)

self.last_request_time = time.time()

def _fetch_states(self) -> requests.Response:
"""Fetch states from OpenSky API with error handling"""
self._handle_rate_limit()

if self.client_id and self.client_secret:
self._get_access_token()

params = {
'lamin': self.bbox.lamin,
'lamax': self.bbox.lamax,
'lomin': self.bbox.lomin,
'lomax': self.bbox.lomax
}

headers = {}
if self.access_token:
headers['Authorization'] = f'Bearer {self.access_token}'

try:
response = self.session.get(
"https://opensky-network.org/api/states/all",
params=params,
headers=headers,
timeout=10
)

if response.status_code == 429:
raise RateLimitError("API rate limit exceeded")
response.raise_for_status()

return response

except requests.exceptions.RequestException as e:
error_msg = f"API request failed: {str(e)}"
if isinstance(e, requests.exceptions.Timeout):
error_msg = "API request timed out"
elif isinstance(e, requests.exceptions.ConnectionError):
error_msg = "Connection error occurred"
raise OpenSkyAPIError(error_msg) from e

def valid_state(self, state: List) -> bool:
"""Validate state data"""
if not state or len(state) < 17:
return False

return (state[0] is not None and # icao24
state[5] is not None and # longitude
state[6] is not None) # latitude

def parse_state(self, state: List, timestamp: int) -> Tuple:
"""Parse state data with safe type conversion"""
def safe_float(value: Any) -> Optional[float]:
try:
return float(value) if value is not None else None
except (ValueError, TypeError):
return None

def safe_int(value: Any) -> Optional[int]:
try:
return int(value) if value is not None else None
except (ValueError, TypeError):
return None

def safe_bool(value: Any) -> Optional[bool]:
return bool(value) if value is not None else None

return (
datetime.fromtimestamp(timestamp, tz=timezone.utc),
state[0], # icao24
state[1], # callsign
state[2], # origin_country
datetime.fromtimestamp(state[3], tz=timezone.utc),
datetime.fromtimestamp(state[4], tz=timezone.utc),
safe_float(state[5]), # longitude
safe_float(state[6]), # latitude
safe_float(state[7]), # geo_altitude
safe_bool(state[8]), # on_ground
safe_float(state[9]), # velocity
safe_float(state[10]), # true_track
safe_float(state[11]), # vertical_rate
state[12], # sensors
safe_float(state[13]), # baro_altitude
state[14], # squawk
safe_bool(state[15]), # spi
safe_int(state[16]) # category
)

def readBetweenOffsets(self, start: Dict[str, int], end: Dict[str, int]) -> Iterator[Tuple]:
data, _ = self.read(start)
return iter(data)

def read(self, start: Dict[str, int]) -> Tuple[List[Tuple], Dict[str, int]]:
"""Read states with error handling and backoff"""
try:
response = self._fetch_states()
data = response.json()

valid_states = [
self.parse_state(s, data['time'])
for s in data.get('states', [])
if self.valid_state(s)
]

return (
valid_states,
{'last_fetch': data.get('time', int(time.time()))}
)

except OpenSkyAPIError as e:
print(f"OpenSky API Error: {str(e)}")
return ([], start)
except Exception as e:
print(f"Unexpected error: {str(e)}")
return ([], start)
Comment on lines +304 to +308
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use logging for error messages

Replace print statements with proper logging for better production monitoring.

-        except OpenSkyAPIError as e:
-            print(f"OpenSky API Error: {str(e)}")
-            return ([], start)
-        except Exception as e:
-            print(f"Unexpected error: {str(e)}")
-            return ([], start)
+        except OpenSkyAPIError as e:
+            logger.error(f"OpenSky API Error: {str(e)}")
+            return ([], start)
+        except Exception as e:
+            logger.error(f"Unexpected error: {str(e)}", exc_info=True)
+            return ([], start)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
print(f"OpenSky API Error: {str(e)}")
return ([], start)
except Exception as e:
print(f"Unexpected error: {str(e)}")
return ([], start)
except OpenSkyAPIError as e:
logger.error(f"OpenSky API Error: {str(e)}")
return ([], start)
except Exception as e:
logger.error(f"Unexpected error: {str(e)}", exc_info=True)
return ([], start)
🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py around lines 304 to 308, replace the print
statements used for error messages with appropriate logging calls. Import the
logging module if not already done, and use logger.error() or a similar logging
method to record the error messages instead of print, ensuring better production
monitoring and log management.


class OpenSkyDataSource(DataSource):
def __init__(self, options: Dict[str, str] = None):
super().__init__(options or {})
self.options = options or {}

if 'client_id' in self.options and not self.options.get('client_secret'):
raise ValueError("client_secret must be provided when client_id is set")

if 'region' in self.options and self.options['region'].upper() not in Region.__members__:
raise ValueError(f"Invalid region. Must be one of: {', '.join(Region.__members__.keys())}")

@classmethod
def name(cls) -> str:
return DS_NAME

def schema(self) -> StructType:
return StructType([
StructField("time_ingest", TimestampType()),
StructField("icao24", StringType()),
StructField("callsign", StringType()),
StructField("origin_country", StringType()),
StructField("time_position", TimestampType()),
StructField("last_contact", TimestampType()),
StructField("longitude", DoubleType()),
StructField("latitude", DoubleType()),
StructField("geo_altitude", DoubleType()),
StructField("on_ground", BooleanType()),
StructField("velocity", DoubleType()),
StructField("true_track", DoubleType()),
StructField("vertical_rate", DoubleType()),
StructField("sensors", ArrayType(IntegerType())),
StructField("baro_altitude", DoubleType()),
StructField("squawk", StringType()),
StructField("spi", BooleanType()),
StructField("category", IntegerType())
])

def simpleStreamReader(self, schema: StructType) -> OpenSkyStreamReader:
return OpenSkyStreamReader(schema, self.options)

spark.dataSource.register(OpenSkyDataSource)
Copy link

@coderabbitai coderabbitai bot Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove automatic registration at module level

Registering the data source at module import time will fail if Spark session is not initialized. This should be done by the user explicitly.

Remove this line and document in the module docstring or README that users should register the data source manually:

-spark.dataSource.register(OpenSkyDataSource)

Users should register it in their code:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.dataSource.register(OpenSkyDataSource)
🧰 Tools
🪛 Ruff (0.12.2)

350-350: spark may be undefined, or defined from star imports

(F405)

🤖 Prompt for AI Agents
In pyspark_datasources/opensky.py at line 350, remove the automatic registration
line 'spark.dataSource.register(OpenSkyDataSource)' to avoid errors when Spark
session is not initialized. Instead, update the module docstring or README to
instruct users to manually register the data source after creating a Spark
session, providing an example code snippet for clarity.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fmunz data source cannot be registered here since there is no spark session available. You can have a separate init file to register the data source before using it in production.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!