# MongoDB Atlas via Spark
This notebook provides a top-level introduction in using Spark with MongoDB, enabling developers and data engineers to bring sophisticated real-time analytics and machine learning to live, operational data. 

The following illustrates how to use MongoDB and Spark with an example application that leverages MongoDB's aggregation pipeline to pre-process data within MongoDB ready for use in Databricks. It shows as well how to query and write back to MongoDB for use in applications. This notebook covers:
1. How to read data from MongoDB into Spark. 
2. How to run the MongoDB Connector for Spark as a library in Databricks.
4. How to leverage MongoDB's Aggregation Pipeline from within Spark 
3. How to use the machine learning ALS library in Spark to generate a set of personalized movie recommendations for a given user.
4. How to write the results back to MongoDB so they are accessible to applications.

## Create a Databricks Cluster and Add the Connector as a Library

1. Create a Databricks cluster.
1. Navigate to the cluster detail page and select the **Libraries** tab.
1. Click the **Install New** button.
1. Select **Maven** as the Library Source.
1. Use the **Search Packages** featue, find 'mongo-spark'. This should point to `org.mongodb.spark:mongo-spark-connector_2.12:3.0.1` or newer. 
1. Click **Install**. <br/>
For more info on the MongoDB Spark connector (which now supports structured streaming) see the [MongoDB documentation](https://www.mongodb.com/docs/spark-connector/current/). 

## Create a MongoDB Atlas Instance

Atlas is a fully managed, cloud-based MongoDB service. We'll use Atlas to test the integration between MongoDb and Spark.

1. Sign up for [MongoDB Atlas](https://www.mongodb.com/atlas/database). 
1. [Create an Atlas free tier cluster](https://docs.atlas.mongodb.com/getting-started/).
1. Enable Databricks clusters to connect to the cluster by adding the external IP addresses for the Databricks cluster nodes to the [whitelist in Atlas](https://docs.atlas.mongodb.com/setup-cluster-security/#add-ip-addresses-to-the-whitelist). For convenience you could (**temporarily!!**) 'allow access from anywhere', though we recommend to enable [network peering](https://www.mongodb.com/docs/atlas/security-vpc-peering/) for production. 

## Prep MongoDB with a sample data-set 

MongoDB comes with a nice sample data-set that allows to quickly get started. We will use this in the context of this notebook

1. In MongoDB Atlas [Load the sample data-set](https://www.mongodb.com/docs/charts/tutorial/order-data/prerequisites-setup/) once the cluster is up and running. 
1. You can confirm the presence of the data-set via the **Browse Collections** button in the Atlas UI.

## Update Spark Configuration with the Atlas Connection String

1. Note the connect string under the **Connect** dialog in MongoDB Atlas. It has the form of "mongodb+srv://\<username>\:\<password>\@\<databasename>\.xxxxx.mongodb.net/"
1. Back in Databricks in your cluster configuration, under **Advanced Options** (bottom of page), paste the connection string for both the `spark.mongodb.output.uri` and `spark.mongodb.input.uri` variables. Plase populate the username and password field appropriatly. This way all the workbooks you are running on the cluster will use this configuration. 
1. Alternativley you can explictly set the `option` when calling APIs like: `spark.read.format("mongo").option("spark.mongodb.input.uri", connectionString).load()`. If congigured the variables in the cluster, you don't have to set the option.


In [0]:
connectionString='mongodb+srv://CONNECTION_STRING_HERE/
database="sample_supplies"
collection="sales"

##Read  data from the MongoDB 'sales' collection running an Aggregation Pipeline. 

MongoDB's [Aggregation Pipeline](https://www.mongodb.com/docs/manual/core/aggregation-pipeline/) is a powerful capability that allows to pre-process and transform data within MongoDB. It's a great match for  real-time analytics, dashboards, report generation with roll-ups, sums & averages with 'server-side' data post-processing. (Note: there is a [whole book written about it](https://www.practical-mongodb-aggregations.com/front-cover.html)).  <br/>
MongoDB even supports  [rich secondary/compound indexes](https://www.mongodb.com/docs/manual/indexes/) to extract, filter, and process only the data it needs â€“ for example, analyzing all customers located in a specific geography right within the database without first having to load the full data-set, minimizing data-movement and reducing latency. <br/>
The below aggregation pipeline in our example has 4 stages:<br/>
1. **Match** stage : filters all documents which has "printer paper" in the items array. <br />
1. **Unwind** stage : undwind the items array <br />
1. **Add fields** stage : which will add a new field cald "totalSale" which is quantity of items sold * item price. <br />
1. **Project** stage : only project "saleDate" and "totalSale" in the output


In [0]:
pipeline="[{'$match': { 'items.name':'printer paper' }}, {'$unwind': { path: '$items' }}, {'$addFields': { totalSale: { \
	'$multiply': [ '$items.price', '$items.quantity' ] } }}, {'$project': { saleDate:1,totalSale:1,_id:0 }}]"
salesDF = spark.read.format("mongo").option("database", database).option("collection", collection).option("pipeline", pipeline).option("partitioner", "MongoSinglePartitioner").option("spark.mongodb.input.uri", connectionString).load()
display(salesDF)

saleDate,totalSale
2015-03-23T21:06:49.506+0000,80.02
2015-03-23T21:06:49.506+0000,70.58
2015-03-23T21:06:49.506+0000,280.6
2015-03-23T21:06:49.506+0000,155.42
2015-03-23T21:06:49.506+0000,36.94
2015-03-23T21:06:49.506+0000,159.6
2015-03-23T21:06:49.506+0000,24.24
2015-03-23T21:06:49.506+0000,42.48
2015-08-25T10:01:02.918+0000,80.5
2015-08-25T10:01:02.918+0000,254.79


##Read  data from the MongoDB 'sales' collection as-is
For comparison we can read the data from the collection as-is without applying any aggrgeation pipeline transformation and show the schema. 


In [0]:
df = spark.read.format("mongo").option("database", database).option("spark.mongodb.input.uri", connectionString).option("collection","sales").load()
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- couponUsed: boolean (nullable = true)
 |-- customer: struct (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- age: integer (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- satisfaction: integer (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- price: decimal(6,2) (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- purchaseMethod: string (nullable = true)
 |-- saleDate: timestamp (nullable = true)
 |-- storeLocation: string (nullable = true)



## Create a temp view
Let's use the dataframe created in above step and run a simple SparkSQL query on that


In [0]:
df.createOrReplaceTempView("temp")
filtered_df = spark.sql("SELECT customer FROM temp WHERE storeLocation='New York'")
display(filtered_df)

customer
"List(M, 26, rapifoozi@viupoen.bb, 5)"
"List(F, 53, se@nacwev.an, 4)"
"List(F, 39, beecho@wic.be, 3)"
"List(F, 57, cuwoik@luvgu.tc, 5)"
"List(M, 31, do@neokliw.sz, 5)"
"List(F, 52, citga@fo.pg, 5)"
"List(M, 30, remcihce@iwjamwit.kp, 1)"
"List(F, 59, kave@we.com, 5)"
"List(M, 34, li@efva.gm, 5)"
"List(F, 34, fase@has.sx, 2)"


##Writing Data to MongoDB to a new Collection

Now you can enrich the data with data coming from other sources, or use Spark MLLib for training ML Models using data in MongoDB. <br /> 
For the demonstration here, we are simply writing the 'filtered_df' directly to a new MongoDB collection in the sample_supplies database. <br/> 


In [0]:
filtered_df.write.format("mongo").option("spark.mongodb.output.uri", connectionString).option("database",database).option("collection","filteredSales").mode("append").save()

## Write data to Delta

You can also unify all your data in the Lakehouse by writing the data to a Delta table like below.

In [0]:
df.write.format("delta").saveAsTable("mongo_atlas_delta_example")

##View Data in MongoDB
You should be able to view the collection added in MongoDB Atlas in the [**Collections** tab](https://www.mongodb.com/docs/atlas/atlas-ui/collections/#view-collections). Or you can use [MongoDB Compass](https://www.mongodb.com/docs/compass/current/) to view this via the Desktop app. 

##More Info
Relevant technologys that simplify  real-time data flow and processing: [Data Federation](https://www.mongodb.com/developer/products/atlas/data-federation/), [Workload Isolation](https://www.mongodb.com/docs/manual/core/workload-isolation/), [Aggregation Pipelines](https://www.mongodb.com/docs/manual/core/aggregation-pipeline/), [Materialized Views](https://www.mongodb.com/docs/manual/core/materialized-views/) <br/>
For batch processing with Parquet on Object Store (S3) see the documentation on [$out capabilities](https://www.mongodb.com/docs/atlas/data-federation/supported-unsupported/pipeline/out/). 