In [1]:
import pymongo

client = pymongo.MongoClient("mongodb://root:example@localhost:27017/")
db = client['my_database']
collection = db['my_collection']

# Delete all documents in the collection
result = collection.delete_many({})

# Print the number of documents deleted
print(f"Deleted {result.deleted_count} documents.")

# Insert sample documents with different schemas
data = [
    {"name": "Alice", "age": 29, "address": "123 Main St"},
    {"name": "Bob", "age": 35, "address": "456 Maple Ave", "email": "bob@example.com"},
    {"name": "Charlie", "age": 22, "address": "789 Oak Dr", "phone": "555-1234"},
    {"name": "David", "age": 40, "email": "david@example.com", "address": "101 Pine Rd"},
    {"name": "Eva", "age": 28, "address": "202 Birch Blvd", "phone": "555-5678", "email": "eva@example.com"}
]

collection.insert_many(data)

documents = collection.find()

for document in documents:
    print(document)

Deleted 5 documents.
{'_id': ObjectId('67cd19874d066494b75b4d3f'), 'name': 'Alice', 'age': 29, 'address': '123 Main St'}
{'_id': ObjectId('67cd19874d066494b75b4d40'), 'name': 'Bob', 'age': 35, 'address': '456 Maple Ave', 'email': 'bob@example.com'}
{'_id': ObjectId('67cd19874d066494b75b4d41'), 'name': 'Charlie', 'age': 22, 'address': '789 Oak Dr', 'phone': '555-1234'}
{'_id': ObjectId('67cd19874d066494b75b4d42'), 'name': 'David', 'age': 40, 'email': 'david@example.com', 'address': '101 Pine Rd'}
{'_id': ObjectId('67cd19874d066494b75b4d43'), 'name': 'Eva', 'age': 28, 'address': '202 Birch Blvd', 'phone': '555-5678', 'email': 'eva@example.com'}


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


In [3]:

# Initialize Spark session with MongoDB connector
spark = SparkSession.builder \
    .appName("MongoDB with PySpark") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.4.1") \
    .config("spark.mongodb.input.uri", "mongodb://root:example@localhost:27017/my_database.my_collection") \
    .config("spark.mongodb.output.uri", "mongodb://root:example@localhost:27017/my_database.my_collection") \
    .getOrCreate()

spark

:: loading settings :: url = jar:file:/Users/venkateshshankar/venvs/pyspark_env/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/venkateshshankar/.ivy2/cache
The jars for the packages stored in: /Users/venkateshshankar/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-97c20dad-6ebb-49f5-9c16-a5b7a31d59fb;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;10.4.1 in central
	found org.mongodb#mongodb-driver-sync;5.1.4 in central
	[5.1.4] org.mongodb#mongodb-driver-sync;[5.1.1,5.1.99)
	found org.mongodb#bson;5.1.4 in central
	found org.mongodb#mongodb-driver-core;5.1.4 in central
	found org.mongodb#bson-record-codec;5.1.4 in central
:: resolution report :: resolve 789ms :: artifacts dl 4ms
	:: modules in use:
	org.mongodb#bson;5.1.4 from central in [default]
	org.mongodb#bson-record-codec;5.1.4 from central in [default]
	org.mongodb#mongodb-driver-core;5.1.4 from central in [default]
	org.mongodb#mongodb-driver-sync;5.1.4 from central in [default]
	org.mongodb.spark#m

In [4]:

# Read data from MongoDB collection
df = spark.read.format("mongodb").option("database", "my_database") \
                 .option("collection", "my_collection") \
    .option("spark.mongodb.read.connection.uri", "mongodb://root:example@localhost:27017") \
    .load()

In [5]:
df.columns

['_id', 'address', 'age', 'email', 'name', 'phone']

In [10]:
df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- email: string (nullable = true)
 |-- name: string (nullable = true)
 |-- phone: string (nullable = true)



In [11]:
df = df.filter(col('age')>='30')

In [12]:
df.show()

+--------------------+-------------+---+-----------------+-----+-----+
|                 _id|      address|age|            email| name|phone|
+--------------------+-------------+---+-----------------+-----+-----+
|67cd19874d066494b...|456 Maple Ave| 35|  bob@example.com|  Bob| NULL|
|67cd19874d066494b...|  101 Pine Rd| 40|david@example.com|David| NULL|
+--------------------+-------------+---+-----------------+-----+-----+



In [13]:
df.write.format("mongodb").\
    option("spark.mongodb.write.connection.uri", "mongodb://root:example@localhost:27017").\
    option("database", "my_database") \
                 .option("collection", "my_collection") \
    .mode("append").save()

In [14]:
documents = collection.find()

for document in documents:
    print(document)

{'_id': ObjectId('67cd19874d066494b75b4d3f'), 'name': 'Alice', 'age': 29, 'address': '123 Main St'}
{'_id': ObjectId('67cd19874d066494b75b4d40'), 'name': 'Bob', 'age': 35, 'address': '456 Maple Ave', 'email': 'bob@example.com'}
{'_id': ObjectId('67cd19874d066494b75b4d41'), 'name': 'Charlie', 'age': 22, 'address': '789 Oak Dr', 'phone': '555-1234'}
{'_id': ObjectId('67cd19874d066494b75b4d42'), 'name': 'David', 'age': 40, 'email': 'david@example.com', 'address': '101 Pine Rd'}
{'_id': ObjectId('67cd19874d066494b75b4d43'), 'name': 'Eva', 'age': 28, 'address': '202 Birch Blvd', 'phone': '555-5678', 'email': 'eva@example.com'}
{'_id': '67cd19874d066494b75b4d40', 'address': '456 Maple Ave', 'age': 35, 'email': 'bob@example.com', 'name': 'Bob', 'phone': None}
{'_id': '67cd19874d066494b75b4d42', 'address': '101 Pine Rd', 'age': 40, 'email': 'david@example.com', 'name': 'David', 'phone': None}


In [15]:
spark.stop()