Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM mcr.microsoft.com/devcontainers/python:3.11-bookworm
FROM mcr.microsoft.com/devcontainers/python:3.12-bookworm

RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
&& apt-get -y install --no-install-recommends \
Expand Down
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"dockerfile": "Dockerfile",
"context": "."
},
"postStartCommand": "bash -i /workspace/scripts/init-env.sh",
// "postStartCommand": "bash -i /workspace/scripts/init-env.sh",
"customizations": {
"vscode": {
"extensions": [
Expand Down
56 changes: 28 additions & 28 deletions .devcontainer/scripts/init-env.sh
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
#!/bin/bash

# Define the virtual environment directory
VENV_DIR=".venv"

# Check if the virtual environment already exists
if [[ ! -d "$VENV_DIR" ]]; then
echo "Creating virtual environment..."
python -m venv "$VENV_DIR"
echo "Virtual environment created at $VENV_DIR"
else
echo "Virtual environment already exists."
fi

# Activate the virtual environment
if [[ -f "$VENV_DIR/bin/activate" ]]; then
echo "Activating virtual environment..."
source "$VENV_DIR/bin/activate"
echo "Virtual environment activated."
poetry add ipykernel
poetry install

source .venv/bin/activate

else
echo "Error: Unable to activate virtual environment."
exit 1
fi
#!/bin/bash
# Define the virtual environment directory
VENV_DIR=".venv"
# Check if the virtual environment already exists
if [[ ! -d "$VENV_DIR" ]]; then
echo "Creating virtual environment..."
python -m venv "$VENV_DIR"
echo "Virtual environment created at $VENV_DIR"
else
echo "Virtual environment already exists."
fi
# Activate the virtual environment
if [[ -f "$VENV_DIR/bin/activate" ]]; then
echo "Activating virtual environment..."
source "$VENV_DIR/bin/activate"
echo "Virtual environment activated."
poetry add ipykernel
poetry install
source .venv/bin/activate
else
echo "Error: Unable to activate virtual environment."
exit 1
fi
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,6 @@ cython_debug/

# PyPI configuration file
.pypirc

**/**/.scratch
**/tests/*_integration.py
16 changes: 16 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": false
}
]
}
14 changes: 7 additions & 7 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"python.defaultInterpreterPath": ".venv/bin/python",
"python.formatting.provider": "black",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "always"
}
{
"python.defaultInterpreterPath": ".venv/bin/python",
"python.formatting.provider": "black",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": "always"
}
}
1,524 changes: 1,461 additions & 63 deletions poetry.lock

Large diffs are not rendered by default.

13 changes: 8 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
[project]
name = "source-faker"
name = "source-msgraph"
version = "0.1.0"
description = ""
authors = [
{name = "geekwhocodes",email = "ganeshraskar@outlook.com"}
]
readme = "README.md"
requires-python = ">=3.11"
requires-python = ">=3.12,<4"
dependencies = [
"pyspark (==4.0.0.dev2)",
"faker (>=36.1.1,<37.0.0)",
"ipykernel (>=6.29.5,<7.0.0)"
"msgraph-sdk (>=1.21.0,<2.0.0)",
"azure-identity (>=1.20.0,<2.0.0)",
"microsoft-kiota-serialization-json (>=1.9.2,<2.0.0)",
]

[tool.poetry]
packages = [{include = "source_faker", from = "src"}]
packages = [{include = "source_msgraph", from = "src"}]


[tool.poetry.group.dev.dependencies]
Expand All @@ -28,6 +29,8 @@ pyarrow = "^19.0.1"
grpcio = "^1.70.0"
grpcio-status = "^1.60.1"
pandas = "^2.2.0"
ipykernel = "^6.29.5"
markdown = "^3.7"

[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
Expand Down
118 changes: 0 additions & 118 deletions src/source_faker/source.py

This file was deleted.

6 changes: 0 additions & 6 deletions src/source_faker/spark.py

This file was deleted.

File renamed without changes.
122 changes: 122 additions & 0 deletions src/source_msgraph/async_interator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import asyncio

import asyncio
from typing import AsyncGenerator, Iterator, Any

class AsyncToSyncIterator:
"""
Converts an async generator into a synchronous iterator while ensuring proper event loop handling.
"""

def __init__(self, async_gen: AsyncGenerator[Any, None]):
"""
Initializes the iterator by consuming an async generator synchronously.

Args:
async_gen (AsyncGenerator): The async generator yielding results.
"""
self.async_gen = async_gen
self.loop = self._get_event_loop()
self.iterator = self._to_iterator()

def _get_event_loop(self) -> asyncio.AbstractEventLoop:
"""Returns the currently running event loop or creates a new one if none exists."""
try:
loop = asyncio.get_running_loop()
if loop.is_running():
return None # Indicate an already running loop (handled in `_to_iterator()`)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop

def _to_iterator(self) -> Iterator:
"""
Ensures that the async generator is consumed using the correct event loop.
Uses streaming (does not load all results into memory).
"""
if self.loop:
return iter(self.loop.run_until_complete(self._stream_results()))
else:
return iter(asyncio.run(self._stream_results())) # Safe for Jupyter, PySpark

# Caution : prone to OOM errors
async def _stream_results(self):
# """Streams async generator results without collecting all in memory."""
# page_count = 0
# async for item in self.async_gen:
# if page_count >= self.max_pages:
# raise RuntimeError("Pagination limit reached, possible infinite loop detected!")
# yield item
# page_count += 1 # Track pages to prevent infinite loops
return [item async for item in self.async_gen]

def __iter__(self) -> Iterator:
"""Returns the synchronous iterator."""
return self.iterator


import asyncio
from typing import AsyncGenerator, Iterator, Any

class AsyncToSyncIteratorV2:
"""
Converts an async generator into a synchronous iterator while ensuring proper event loop handling.
"""

def __init__(self, async_gen: AsyncGenerator[Any, None]):
"""
Initializes the iterator by consuming an async generator synchronously.

Args:
async_gen (AsyncGenerator): The async generator yielding results.
"""
self.async_gen = async_gen
self.iterator = self._to_iterator()

def _to_iterator(self) -> Iterator:
"""
Ensures that the async generator is consumed using the correct event loop.
Uses streaming (does not load all results into memory).
"""
try:
loop = asyncio.get_running_loop()
return self._sync_generator(loop) # Works inside Jupyter
except RuntimeError:
return iter(asyncio.run(self._collect_results())) # Works in scripts

def _sync_generator(self, loop: asyncio.AbstractEventLoop) -> Iterator:
"""
Streams async results into a sync generator while inside a running event loop.
"""
queue = asyncio.Queue()

async def _producer():
"""Fills the queue with async results."""
async for item in self.async_gen:
await queue.put(item)
await queue.put(None) # Sentinel to signal completion

async def _consumer():
"""Yields items from the queue in sync mode."""
task = loop.create_task(_producer())
while True:
item = await queue.get()
if item is None:
break
yield item
await task # Ensure producer task completes

return iter(loop.run_until_complete(self._collect_results()))

async def _collect_results(self):
"""Collects async generator results into a list (safe for asyncio.run)."""
return [item async for item in self.async_gen]

def __iter__(self) -> Iterator:
"""Returns the synchronous iterator."""
return self.iterator

def __next__(self) -> Any:
"""Returns the next item from the iterator."""
return next(self.iterator)
Loading