# Demo: Use Spark to Load Data from MongoDB and Save the Processed Data Back to MongoDB

In [None]:
# Use findspark package to tell jupyter notebook
# about the Spark installation path on the server
import findspark
findspark.init('/opt/spark')

In [None]:
# Import SparkSession Class
# Every spark job has to be in a Spark Session
from pyspark.sql import SparkSession

In [None]:
# URLs for the input and output db.collection
input_uri = 'mongodb://127.0.0.1/movielens.users'
output_uri = 'mongodb://127.0.0.1/movielensAnalysis.youngUsers'

In [None]:
# Create a Spark Session object which specifies:
# 1. the Spark application's name
# 2. the URLS to the input and output databases
# 3. the mongodb to spark connector (a jar file)

my_spark = SparkSession\
    .builder\
    .appName("MongodbSparkConnectorDemo")\
    .config("spark.mongodb.input.uri", input_uri)\
    .config("spark.mongodb.output.uri", output_uri)\
    .config('spark.jars.packages','org.mongodb.spark:mongo-spark-connector_2.12:3.0.0')\
    .getOrCreate()

In [None]:
# Load the data from the specified input database to a DataFrame 
df = my_spark.read.format('com.mongodb.spark.sql.DefaultSource').load()

df.show()

In [None]:
# Run some simple queries on the DataFrame and display the results
youngUsers = df.filter("_c1 < 20")
youngUsers.show()

In [None]:
# Save the queried results to a new database
youngUsers.write\
        .format("com.mongodb.spark.sql.DefaultSource")\
        .option("uri", output_uri)\
        .mode('overwrite')\  # Use 'append' if you will
        .save()

# Great job!