Skip to content

Conversation

@shujingyang-db
Copy link
Contributor

@shujingyang-db shujingyang-db commented Jul 29, 2025

Add a new SalesforceDataSource sink example that enables streaming data writes from PySpark to Salesforce using the REST API. Supports
standard and custom Salesforce objects with configurable batch sizes and comprehensive error handling.

Key features:

  • Streaming write support for Account, Contact, and custom objects
  • Authentication via username/password/security token
  • Configurable batch processing (default 200 records)
  • Detailed logging and error recovery

Summary by CodeRabbit

  • New Features

    • Introduced Salesforce as a new streaming data sink, enabling direct writing of streaming data from PySpark to Salesforce objects.
    • Added an example script demonstrating various ways to stream data into Salesforce, including support for custom objects and checkpointing for exactly-once semantics.
  • Documentation

    • Added comprehensive documentation for the Salesforce data source, including setup instructions and usage examples.
    • Updated tables and navigation to include the new Salesforce data source.
  • Tests

    • Added tests to verify Salesforce data source registration and enforce streaming-only write support.
  • Chores

    • Added optional dependency on the simple-salesforce library for Salesforce integration.

@coderabbitai
Copy link

coderabbitai bot commented Jul 29, 2025

Walkthrough

This change introduces a new Salesforce streaming sink to the PySpark data sources package. It adds the SalesforceDataSource implementation, updates documentation and configuration to include Salesforce as a supported data sink, provides installation and usage instructions, and supplies both an example script and a new test for registration and streaming-only enforcement.

Changes

Cohort / File(s) Change Summary
Salesforce DataSource Implementation
pyspark_datasources/salesforce.py, pyspark_datasources/__init__.py
Implements SalesforceDataSource as a streaming sink, adds registration and import in the package initializer.
Documentation Updates
README.md, docs/index.md, docs/datasources/salesforce.md
Documents SalesforceDataSource in the main README, documentation index, and adds a dedicated usage guide.
Example Script
examples/salesforce_sink_example.py
Adds a comprehensive example script demonstrating how to use SalesforceDataSource with various input sources and scenarios.
Dependency and Build Configuration
pyproject.toml, mkdocs.yml
Adds simple-salesforce as an optional dependency, updates extras and documentation navigation for Salesforce.
Test Addition
tests/test_data_sources.py
Adds a test to verify SalesforceDataSource registration and streaming-only enforcement.

Sequence Diagram(s)

sequenceDiagram
    participant Spark as PySpark Streaming
    participant SFDS as SalesforceDataSource
    participant Writer as SalesforceStreamWriter
    participant SFAPI as Salesforce REST API

    Spark->>SFDS: Register streaming sink (salesforce)
    Spark->>Writer: Initialize with schema and options
    Writer->>SFAPI: Connect using credentials
    loop For each batch
        Spark->>Writer: Provide batch of rows
        Writer->>Writer: Convert rows to Salesforce records
        Writer->>SFAPI: Batch insert via Composite Tree API
        alt Batch insert fails
            Writer->>SFAPI: Insert records individually
        end
        Writer->>Spark: Return commit message (records written, batch id)
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

Poem

A Salesforce sink, oh what a feat,
Streaming data, batch by batch, so neat.
Docs and examples, dependencies in tow,
Now PySpark streams where Salesforce records grow.
With tests and configs, the path is clear—
The rabbit hops, and integration is here! 🐇✨

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (5)
mkdocs.yml (1)

26-26: Keep navigation ordering consistent

The new Salesforce page is inserted between simplejson.md and googlesheets.md, whereas the surrounding list isn’t alphabetically or thematically ordered. If the project expects alphabetical nav ordering (many docs sites do), consider relocating salesforce.md accordingly to avoid future merge-conflicts and easier scanning.

docs/datasources/salesforce.md (1)

1-6: Add minimal usage example & ensure directive renders

Compared with other data-source pages, this file is very terse. A short code snippet demonstrating registration and .writeStream usage would help readers. Also double-check that the ::: pyspark_datasources.salesforce.SalesforceDataSource directive renders correctly under mkdocstrings—typos in the import path will silently drop the API docs.

pyspark_datasources/__init__.py (1)

7-7: Expose SalesforceDataSource via __all__ to silence Ruff F401

SalesforceDataSource is imported only to re-export, so Ruff flags it as unused. Follow the existing pattern (other names appear in __all__) or disable the rule locally.

-from .salesforce import SalesforceDataSource
+from .salesforce import SalesforceDataSource
+
+__all__ += ["SalesforceDataSource"]  # keep lint happy
tests/test_data_sources.py (1)

157-165: Improve error handling in cleanup section.

The bare except clause should be more specific or at least log any cleanup failures for debugging purposes.

Apply this diff to improve error handling:

         # Clean up any test records created
         try:
             from simple_salesforce import Salesforce
             sf = Salesforce(username=username, password=password, security_token=security_token)
             results = sf.query("SELECT Id, Name FROM Account WHERE Name LIKE 'PySparkTest_%'")
             for record in results['records']:
                 sf.Account.delete(record['Id'])
-        except:
-            pass  # Clean-up is best effort
+        except Exception as e:
+            # Log cleanup failures for debugging
+            print(f"Warning: Failed to clean up test records: {e}")
pyspark_datasources/salesforce.py (1)

289-294: Consider documenting the lack of transaction support more prominently.

The inability to rollback Salesforce records is a significant limitation that users should be aware of before using this data source.

Consider adding this limitation to the class docstring or raising a warning when the data source is initialized to ensure users are aware that partial writes cannot be rolled back.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7d48f9a and 2798e28.

📒 Files selected for processing (8)
  • README.md (1 hunks)
  • docs/datasources/salesforce.md (1 hunks)
  • docs/index.md (1 hunks)
  • mkdocs.yml (1 hunks)
  • pyproject.toml (1 hunks)
  • pyspark_datasources/__init__.py (1 hunks)
  • pyspark_datasources/salesforce.py (1 hunks)
  • tests/test_data_sources.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
pyspark_datasources/__init__.py (1)
pyspark_datasources/salesforce.py (1)
  • SalesforceDataSource (18-142)
🪛 Ruff (0.12.2)
pyspark_datasources/__init__.py

7-7: .salesforce.SalesforceDataSource imported but unused; consider removing, adding to __all__, or using a redundant alias

(F401)

tests/test_data_sources.py

71-71: SalesforceDataSource may be undefined, or defined from star imports

(F405)


74-74: SalesforceDataSource may be undefined, or defined from star imports

(F405)


88-88: Do not assert False (python -O removes these calls), raise AssertionError()

Replace assert False

(B011)


113-113: SalesforceDataSource may be undefined, or defined from star imports

(F405)


163-163: Do not use bare except

(E722)

pyspark_datasources/salesforce.py

175-178: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


200-200: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


257-257: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🔇 Additional comments (2)
README.md (1)

49-49: LGTM – row correctly documents dependency
Entry matches implementation details and optional-extra naming.

pyproject.toml (1)

21-21: LGTM! Dependency configuration is well-structured.

The simple-salesforce dependency is correctly added as optional with appropriate version constraints. The extra groups are properly configured to allow users to install only the Salesforce integration when needed.

Also applies to: 29-30

Comment on lines +153 to +159
self.username = options.get("username")
self.password = options.get("password")
self.security_token = options.get("security_token")
self.instance_url = options.get("instance_url")
self.salesforce_object = options.get("salesforce_object", "Account")
self.batch_size = int(options.get("batch_size", "200"))

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Security: Avoid storing sensitive credentials as instance variables.

Storing passwords and security tokens as instance variables could expose them in logs, stack traces, or memory dumps. Consider extracting credentials only when needed in the write method.

Apply this diff to improve security:

     def __init__(self, schema: StructType, options: Dict[str, str]):
         self.schema = schema
         self.options = options
         
-        # Extract Salesforce configuration
-        self.username = options.get("username")
-        self.password = options.get("password")
-        self.security_token = options.get("security_token")
-        self.instance_url = options.get("instance_url")
         self.salesforce_object = options.get("salesforce_object", "Account")
         self.batch_size = int(options.get("batch_size", "200"))
         
         # Validate required options
-        if not all([self.username, self.password, self.security_token]):
+        username = options.get("username")
+        password = options.get("password") 
+        security_token = options.get("security_token")
+        if not all([username, password, security_token]):
             raise ValueError(
                 "Salesforce username, password, and security_token are required. "
                 "Set them using .option() method in your streaming query."
             )

Then update the write method to extract credentials:

# In write method, after line 186:
username = self.options.get("username")
password = self.options.get("password")
security_token = self.options.get("security_token")
instance_url = self.options.get("instance_url")
🤖 Prompt for AI Agents
In pyspark_datasources/salesforce.py around lines 153 to 159, sensitive
credentials like password and security_token are stored as instance variables,
which risks exposure in logs or memory. Remove these credentials from instance
variables and instead extract them locally within the write method after line
186 by accessing them from self.options. This limits the scope of sensitive data
and improves security.

Comment on lines +175 to +178
raise ImportError(
"simple-salesforce library is required for Salesforce integration. "
"Install it with: pip install simple-salesforce"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add exception chaining for better error traceability.

When re-raising exceptions, use from to preserve the original exception context.

Apply this diff:

         except ImportError:
             raise ImportError(
                 "simple-salesforce library is required for Salesforce integration. "
                 "Install it with: pip install simple-salesforce"
-            )
+            ) from None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise ImportError(
"simple-salesforce library is required for Salesforce integration. "
"Install it with: pip install simple-salesforce"
)
except ImportError:
raise ImportError(
"simple-salesforce library is required for Salesforce integration. "
"Install it with: pip install simple-salesforce"
) from None
🧰 Tools
🪛 Ruff (0.12.2)

175-178: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In pyspark_datasources/salesforce.py around lines 175 to 178, the ImportError is
raised without exception chaining, which loses the original error context.
Modify the raise statement to use "raise ImportError(...) from e" where "e" is
the caught exception, to preserve the original exception context for better
traceability.

Comment on lines +199 to +200
logger.error(f"Failed to connect to Salesforce: {str(e)}")
raise ConnectionError(f"Salesforce connection failed: {str(e)}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add exception chaining to preserve error context.

Apply this diff:

         except Exception as e:
             logger.error(f"Failed to connect to Salesforce: {str(e)}")
-            raise ConnectionError(f"Salesforce connection failed: {str(e)}")
+            raise ConnectionError(f"Salesforce connection failed: {str(e)}") from e
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.error(f"Failed to connect to Salesforce: {str(e)}")
raise ConnectionError(f"Salesforce connection failed: {str(e)}")
except Exception as e:
logger.error(f"Failed to connect to Salesforce: {str(e)}")
- raise ConnectionError(f"Salesforce connection failed: {str(e)}")
+ raise ConnectionError(f"Salesforce connection failed: {str(e)}") from e
🧰 Tools
🪛 Ruff (0.12.2)

200-200: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In pyspark_datasources/salesforce.py around lines 199 to 200, the code raises a
ConnectionError after logging the original exception but does not preserve the
original exception context. Modify the raise statement to use "raise
ConnectionError(...) from e" to enable exception chaining and preserve the
original error context.

Comment on lines 256 to 257
except AttributeError:
raise ValueError(f"Salesforce object '{self.salesforce_object}' not found")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add exception chaining for consistency.

Apply this diff:

         except AttributeError:
-            raise ValueError(f"Salesforce object '{self.salesforce_object}' not found")
+            raise ValueError(f"Salesforce object '{self.salesforce_object}' not found") from None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except AttributeError:
raise ValueError(f"Salesforce object '{self.salesforce_object}' not found")
except AttributeError:
raise ValueError(f"Salesforce object '{self.salesforce_object}' not found") from None
🧰 Tools
🪛 Ruff (0.12.2)

257-257: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
In pyspark_datasources/salesforce.py around lines 256 to 257, the ValueError
raised in the except block for AttributeError lacks exception chaining. Modify
the raise statement to include "from e" where "e" is the caught AttributeError,
ensuring consistent exception chaining for better error traceability.

)

df.write.format("salesforce").mode("append").save()
assert False, "Should have raised error - Salesforce DataSource only supports streaming"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Replace assert False with pytest.fail().

Using assert False can be removed by Python's optimization mode (python -O). Use pytest.fail() instead for more reliable test failures.

Apply this diff:

-        assert False, "Should have raised error - Salesforce DataSource only supports streaming"
+        pytest.fail("Should have raised error - Salesforce DataSource only supports streaming")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
assert False, "Should have raised error - Salesforce DataSource only supports streaming"
pytest.fail("Should have raised error - Salesforce DataSource only supports streaming")
🧰 Tools
🪛 Ruff (0.12.2)

88-88: Do not assert False (python -O removes these calls), raise AssertionError()

Replace assert False

(B011)

🤖 Prompt for AI Agents
In tests/test_data_sources.py at line 88, replace the statement 'assert False,
"Should have raised error - Salesforce DataSource only supports streaming"' with
'pytest.fail("Should have raised error - Salesforce DataSource only supports
streaming")' to ensure the test failure is not skipped in Python's optimization
mode.

Copy link
Owner

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Thanks for adding the salesforce example.
Can we modify the PR title mentioning that it's a Saleforce sink?

Comment on lines +43 to +44
salesforce_object : str, optional
Target Salesforce object name (default: "Account")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where can I get a list of objects?

Copy link
Contributor Author

@shujingyang-db shujingyang-db Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's on salesforce UI. We can also add features to pull objects list and their schemas from salesforce

Comment on lines 206 to 208
record = self._convert_row_to_salesforce_record(row)
if record: # Only add non-empty records
records.append(record)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will consume all rows in the current partition. What if we have too many rows that exceed the executor memory limit? We can either 1) batch the rows and periodically write them to Salesforce or 2) use the Arrow writer which take an iterator of arrow record batches : https://github.com/apache/spark/blob/master/python/pyspark/sql/datasource.py#L979

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I think we don't have streaming support for Arrow record batch writer (which we should support!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure I updated it with batch writing data to Salesforce

@shujingyang-db shujingyang-db changed the title Add an example for Salesforce data source Add an example for Salesforce sink Jul 31, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
tests/test_salesforce_sink.py (4)

15-15: Remove unused import.

The lit function from pyspark.sql.functions is imported but never used in the test.

-from pyspark.sql.functions import col, lit
+from pyspark.sql.functions import col

12-12: Consider using proper package imports instead of modifying sys.path.

While sys.path.append('.') works, it's better to use proper relative imports or ensure the package is properly installed for testing.

Consider structuring the test to use:

from pyspark_datasources.salesforce import SalesforceDataSource

without modifying sys.path, or use pytest's proper test discovery mechanisms.


125-125: Consider making timeout configurable.

The hardcoded 60-second timeout might not be sufficient for all environments or could be too long for fast CI systems.

Consider making it configurable:

-        query1.awaitTermination(timeout=60)
+        timeout = int(os.getenv('SPARK_STREAM_TIMEOUT', '60'))
+        query1.awaitTermination(timeout=timeout)

279-280: Make exception handling more specific.

The bare except clause should specify the exception type for better error handling practices.

-        except:
+        except Exception:
             pass

Or even better, catch specific exceptions:

-        except:
+        except (AttributeError, RuntimeError):
             pass
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 510ff30 and c9fe38a.

📒 Files selected for processing (1)
  • tests/test_salesforce_sink.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
tests/test_salesforce_sink.py (2)
pyspark_datasources/salesforce.py (1)
  • SalesforceDataSource (18-199)
tests/test_data_sources.py (3)
  • test_fake_datasource_stream (20-32)
  • test_opensky_datasource_stream (51-66)
  • test_github_datasource (13-17)
🪛 Ruff (0.12.2)
tests/test_salesforce_sink.py

15-15: pyspark.sql.functions.lit imported but unused

Remove unused import: pyspark.sql.functions.lit

(F401)


279-279: Do not use bare except

(E722)

🔇 Additional comments (6)
tests/test_salesforce_sink.py (6)

18-23: LGTM!

The CSV file creation helper function is well-implemented and follows CSV best practices with the newline='' parameter.


28-38: LGTM!

Good security practice using environment variables for Salesforce credentials and providing clear feedback when they're missing.


40-68: LGTM!

The test setup is well-structured with proper temporary directory creation, appropriate Spark configuration for testing, and correct schema definition that matches the test data.


241-265: LGTM!

Excellent test result analysis with clear success criteria and comprehensive logging. The logic correctly identifies exactly-once processing semantics and provides detailed feedback for different scenarios.


273-289: LGTM!

Good cleanup practices with proper handling of both Spark session termination and temporary directory cleanup in the finally block.


291-309: LGTM!

Clear test execution with proper exit code handling and informative output for both success and failure cases.

Copy link
Owner

@allisonwang-db allisonwang-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Comment on lines 29 to 32
username = os.getenv('SALESFORCE_USERNAME')
password = os.getenv('SALESFORCE_PASSWORD')
security_token = os.getenv('SALESFORCE_SECURITY_TOKEN')

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to run these tests without the tokens? Maybe we can create an examples folder and add this as an example instead of a unit test salesforce_sink_example.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg! I move it to examples folder

import time
import tempfile
import csv
sys.path.append('.')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessary

Comment on lines 103 to 107
# Check for Salesforce credentials in environment variables
username = os.getenv('SALESFORCE_USERNAME')
password = os.getenv('SALESFORCE_PASSWORD')
security_token = os.getenv('SALESFORCE_SECURITY_TOKEN')

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. Let's keep the unit test lightweight. The above test case test_salesforce_datasource_registration should be enough.

... .option("checkpointLocation", "/path/to/checkpoint") \\
... .start()
Key Features:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding idempotency: if a streaming write job failed and I resume it from the checkpoint, will it overwrite my records already in salesforce?

Copy link
Contributor Author

@shujingyang-db shujingyang-db Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it will not overwrite. It will resume from the offset persisted in the checkpoint.

But if there're records written in salesforce but aren't committed yet, there could be duplicate records.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to mention this behavior in notes :)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
tests/test_data_sources.py (1)

88-88: Replace assert False with pytest.fail().

Using assert False can be removed by Python's optimization mode (python -O). Use pytest.fail() instead for more reliable test failures.

Apply this diff:

-        assert False, "Should have raised error - Salesforce DataSource only supports streaming"
+        pytest.fail("Should have raised error - Salesforce DataSource only supports streaming")
🧹 Nitpick comments (2)
examples/salesforce_sink_example.py (2)

345-345: Remove unnecessary f-string prefix.

The f-string doesn't contain any placeholders, so the f prefix is unnecessary.

Apply this diff:

-        print(f"\n📈 Checkpoint Analysis:")
+        print("\n📈 Checkpoint Analysis:")

396-396: Remove unused variable assignment.

The custom_data variable is created but never used since the actual streaming query is commented out for demonstration purposes.

Apply this diff:

-        # Transform for custom object (example: Product__c)
-        custom_data = streaming_df.select(
-            col("value").cast("string").alias("Product_Code__c"),
-            lit("Sample Product").alias("Name"),
-            (col("value") * 29.99).cast("double").alias("Price__c"),
-            current_timestamp().alias("Created_Date__c")
-        )
+        # Example transformation for custom object (Product__c)
+        # custom_data = streaming_df.select(
+        #     col("value").cast("string").alias("Product_Code__c"),
+        #     lit("Sample Product").alias("Name"),
+        #     (col("value") * 29.99).cast("double").alias("Price__c"),
+        #     current_timestamp().alias("Created_Date__c")
+        # )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c9fe38a and 340ef03.

📒 Files selected for processing (2)
  • examples/salesforce_sink_example.py (1 hunks)
  • tests/test_data_sources.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
examples/salesforce_sink_example.py (2)
tests/test_data_sources.py (1)
  • spark (8-10)
pyspark_datasources/salesforce.py (1)
  • SalesforceDataSource (18-199)
tests/test_data_sources.py (1)
pyspark_datasources/salesforce.py (3)
  • SalesforceDataSource (18-199)
  • name (169-171)
  • write (226-294)
🪛 Ruff (0.12.2)
examples/salesforce_sink_example.py

345-345: f-string without any placeholders

Remove extraneous f prefix

(F541)


396-396: Local variable custom_data is assigned to but never used

Remove assignment to unused variable custom_data

(F841)

tests/test_data_sources.py

71-71: SalesforceDataSource may be undefined, or defined from star imports

(F405)


74-74: SalesforceDataSource may be undefined, or defined from star imports

(F405)


88-88: Do not assert False (python -O removes these calls), raise AssertionError()

Replace assert False

(B011)

🔇 Additional comments (7)
tests/test_data_sources.py (1)

69-74: LGTM! Clean registration test.

The test correctly registers the SalesforceDataSource and validates the expected name. The implementation aligns well with the data source's streaming-only design.

examples/salesforce_sink_example.py (6)

34-49: LGTM! Excellent credential management.

The credential checking function provides clear user feedback and secure environment variable usage. The early return pattern prevents execution without proper credentials.


51-115: LGTM! Well-structured streaming example.

Example 1 demonstrates proper usage of the SalesforceDataSource with rate source, appropriate transformations, and good progress reporting. The checkpoint location and batch size configuration are reasonable.


117-209: LGTM! Comprehensive CSV streaming example.

Example 2 effectively demonstrates CSV file streaming with proper schema definition, temporary file management, and cleanup. The custom schema option usage is particularly valuable for users.


211-365: LGTM! Excellent checkpoint demonstration.

Example 3 provides valuable insight into exactly-once processing with detailed phase-by-phase execution and analysis. The incremental data approach clearly shows checkpoint functionality.


367-428: LGTM! Good approach for demonstrating custom objects.

Example 4 wisely shows the configuration without executing potentially failing operations. The detailed code example in the print statement is educational and safe.


430-456: LGTM! Excellent main function with comprehensive documentation.

The main function provides clear execution flow and valuable takeaways for users. The exception handling covers both user interruption and unexpected errors appropriately.

@shujingyang-db shujingyang-db changed the title Add an example for Salesforce sink Add support for Salesforce sink Aug 1, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (5)
examples/salesforce_sink_example.py (5)

24-24: Remove unused import.

The sys module is imported but never used in the script.

-import sys

92-92: Consider using a more appropriate checkpoint location.

Using /tmp for checkpoint locations may not be suitable for production environments as it can be cleared on system restart. Consider using a persistent directory or making it configurable.

-.option("checkpointLocation", "/tmp/salesforce_example1_checkpoint") \
+.option("checkpointLocation", "./checkpoints/salesforce_example1_checkpoint") \

182-182: Consider using a more appropriate checkpoint location.

Same issue as Example 1 - using /tmp for checkpoint locations may not be suitable for production environments.

-.option("checkpointLocation", "/tmp/salesforce_example2_checkpoint") \
+.option("checkpointLocation", "./checkpoints/salesforce_example2_checkpoint") \

342-342: Remove unnecessary f-string prefix.

The f-string has no placeholders, so the f prefix is unnecessary.

-        print(f"\n📈 Checkpoint Analysis:")
+        print("\n📈 Checkpoint Analysis:")

393-398: Remove unused variable assignment.

The custom_data variable is created but never used since the actual streaming query is commented out for demonstration purposes.

Since this is purely for demonstration and the actual query is commented out, consider either removing the unused variable or adding a comment explaining its purpose:

-        # Transform for custom object (example: Product__c)
-        custom_data = streaming_df.select(
-            col("value").cast("string").alias("Product_Code__c"),
-            lit("Sample Product").alias("Name"),
-            (col("value") * 29.99).cast("double").alias("Price__c"),
-            current_timestamp().alias("Created_Date__c")
-        )
+        # Transform for custom object (example: Product__c)
+        # custom_data = streaming_df.select(
+        #     col("value").cast("string").alias("Product_Code__c"),
+        #     lit("Sample Product").alias("Name"),
+        #     (col("value") * 29.99).cast("double").alias("Price__c"),
+        #     current_timestamp().alias("Created_Date__c")
+        # )
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 340ef03 and 9889a72.

⛔ Files ignored due to path filters (1)
  • poetry.lock is excluded by !**/*.lock
📒 Files selected for processing (1)
  • examples/salesforce_sink_example.py (1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
examples/salesforce_sink_example.py

24-24: sys imported but unused

Remove unused import: sys

(F401)


342-342: f-string without any placeholders

Remove extraneous f prefix

(F541)


393-393: Local variable custom_data is assigned to but never used

Remove assignment to unused variable custom_data

(F841)

🔇 Additional comments (6)
examples/salesforce_sink_example.py (6)

31-46: Excellent credential management approach.

The credential checking function follows security best practices by using environment variables and provides clear user guidance when credentials are missing.


48-113: Well-structured streaming example with proper error handling.

The example demonstrates good practices including credential validation, proper Spark session management, progress reporting, and resource cleanup.


114-207: Excellent CSV handling and cleanup implementation.

The example demonstrates proper temporary file management, schema definition, and comprehensive resource cleanup. The use of shutil.rmtree ensures no temporary files are left behind.


208-363: Excellent demonstration of exactly-once processing with checkpoints.

This example effectively demonstrates checkpoint functionality through a two-phase approach. The analysis logic correctly verifies that exactly-once semantics are maintained across streaming restarts.


364-426: Good educational approach for custom object demonstration.

The example effectively shows how to configure custom Salesforce objects while acknowledging that they may not exist in all environments. The commented-out code provides a clear template for users.


427-455: Excellent main function with comprehensive error handling and educational content.

The main function properly orchestrates all examples and provides valuable takeaways for users. The error handling covers both user interruption and unexpected errors appropriately.

@allisonwang-db
Copy link
Owner

LGTM. pending CI

@allisonwang-db allisonwang-db merged commit 614dcfc into allisonwang-db:master Aug 1, 2025
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants