## Introduction
The purpose of this notebook is to demonstrate the capability to integrate MongoDB data with Apache Spark and demonstrate the following features:
* loading data
* ETL
* SQL Queries

## Setup the environment
Let's start to setup the environment by importing the necessary libraries.

In [11]:
# Import all the necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F

Lets create the Spark session using MongoDB Spark Connector

In [3]:
spark = SparkSession.\
        builder.\
        appName("pyspark-spark-example").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "1g").\
        config("spark.mongodb.input.uri","mongodb://mongo1:27017,mongo2:27018,mongo3:27019/Stocks.Source?replicaSet=rs0").\
        config("spark.mongodb.output.uri","mongodb://mongo1:27017,mongo2:27018,mongo3:27019/Stocks.Source?replicaSet=rs0").\
        config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
        getOrCreate()

## Data visualization
We first start by loading the dataframes from Apache Spark to MongoDB

In [4]:
df = spark.read.format("mongo").load()

Let’s verify the data was loaded by looking at the schema:

In [5]:
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- company_symbol: string (nullable = true)
 |-- price: double (nullable = true)
 |-- tx_time: string (nullable = true)



We can see that the tx_time field is loaded as a string.  We can easily convert this to a time by issuing a cast statement:

In [8]:
df = df.withColumn('tx_time', df.tx_time.cast('timestamp'))
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- company_symbol: string (nullable = true)
 |-- price: double (nullable = true)
 |-- tx_time: timestamp (nullable = true)



Next, we can add a new ‘movingAverage’ column that will show a moving average based upon the previous value in the dataset.  To do this we leverage the PySpark Window function as follows:

In [12]:
movAvg = df.withColumn("movingAverage", F.avg("price")
             .over( Window.partitionBy("company_symbol").rowsBetween(-1,1)) )

To see our data with the new moving average column we can issue a show command.

In [13]:
movAvg.show()

+--------------------+--------------------+--------------+-----+-------------------+------------------+
|                 _id|        company_name|company_symbol|price|            tx_time|     movingAverage|
+--------------------+--------------------+--------------+-----+-------------------+------------------+
|{5f527ac22f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.38|2020-09-04 13:34:58|43.385000000000005|
|{5f527ac32f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.39|2020-09-04 13:34:59| 43.39666666666667|
|{5f527ac42f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.42|2020-09-04 13:35:00|43.419999999999995|
|{5f527ac52f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.45|2020-09-04 13:35:01|43.443333333333335|
|{5f527ac62f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.46|2020-09-04 13:35:02|             43.46|
|{5f527ac72f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.47|2020-09-04 13:35:03| 43.47666666666667|
|{5f527ac82f6a1552...|ITCHY ACRE CORPOR...|           IAC| 43.5|

## Data manipulation with MongoDB
To update the data in our MongoDB cluster, we use the save method.

In [14]:
movAvg.write.format("mongo").option("replaceDocument", "true").mode("append").save()

We can also use the power of the MongoDB Aggregation Framework to pre-filter, sort or aggregate our MongoDB data by displaying the most expensices products.

In [20]:
pipeline = "[{'$group': {_id:'$company_name', 'maxprice': {$max:'$price'}}},{$sort:{'maxprice':-1}}]"

aggPipelineDF = spark.read.format("mongo").option("pipeline", pipeline).option("partitioner", "MongoSinglePartitioner").load()
aggPipelineDF.show()

+--------------------+--------+
|                 _id|maxprice|
+--------------------+--------+
|FRUSTRATING CHAOS...|    87.6|
|HOMELY KIOSK UNLI...|   86.48|
| CREEPY GIT HOLDINGS|    83.4|
|GREASY CHAMPION C...|   81.76|
|COMBATIVE TOWNSHI...|   72.18|
|FROTHY MIDNIGHT P...|   66.81|
|ITCHY ACRE CORPOR...|   44.42|
|LACKADAISICAL SAV...|   42.34|
|CORNY PRACTITIONE...|   38.55|
|TRITE JACKFRUIT P...|   22.62|
+--------------------+--------+



Finally we can use SparkSQL to issue ANSI-compliant SQL against MongoDB data as follows:

In [21]:
movAvg.createOrReplaceTempView("avgs")
sqlDF=spark.sql("SELECT * FROM avgs WHERE movingAverage > 43.0")
sqlDF.show()

+--------------------+--------------------+--------------+-----+-------+------------------+
|                 _id|        company_name|company_symbol|price|tx_time|     movingAverage|
+--------------------+--------------------+--------------+-----+-------+------------------+
|{5f527ac22f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.38|   null|43.385000000000005|
|{5f527ac32f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.39|   null| 43.39666666666667|
|{5f527ac42f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.42|   null|43.419999999999995|
|{5f527ac52f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.45|   null|43.443333333333335|
|{5f527ac62f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.46|   null|             43.46|
|{5f527ac72f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.47|   null| 43.47666666666667|
|{5f527ac82f6a1552...|ITCHY ACRE CORPOR...|           IAC| 43.5|   null| 43.49666666666667|
|{5f527ac92f6a1552...|ITCHY ACRE CORPOR...|           IAC|43.52|   null|        