In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

working_directory = 'jars/*'

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection") \
    .config("spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection") \
    .config('spark.driver.extraClassPath', working_directory) \
    .getOrCreate()


In [6]:
# WRITE

people = my_spark.createDataFrame([("JULIA", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77),
                                ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", 22)], ["name", "age"])

people.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()


In [None]:
people.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("database",
                                                                                 "people").option("collection", "contacts").save()

In [7]:
# READ

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.select('*').where(col("name") == "JULIA").show()
df.select('*').where(col("name") == "JULIA").count()



+--------------------+---+-----+
|                 _id|age| name|
+--------------------+---+-----+
|[5c2b999f537aa919...| 50|JULIA|
|[5c2b9c08537aa91a...| 50|JULIA|
|[5c2b9e6d537aa91a...| 50|JULIA|
+--------------------+---+-----+



3

In [3]:
# Filter

df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.filter(df['age'] <= 51).show()



+--------------------+---+-------------+
|                 _id|age|         name|
+--------------------+---+-------------+
|[5c2b96ef537aa913...| 50|Bilbo Baggins|
|[5c2b96ef537aa913...| 50|Bilbo Baggins|
|[5c2b96f7537aa913...| 50|   KONSTANTIN|
|[5c2b9716537aa913...| 50|   KONSTANTIN|
|[5c2b973e537aa913...| 50|   KONSTANTIN|
|[5c2b9742537aa913...| 50|   KONSTANTIN|
|[5c2b9800537aa915...| 50|   KONSTANTIN|
|[5c2b9826537aa915...| 22|       Bombur|
|[5c2b9826537aa915...| 50|   KONSTANTIN|
|[5c2b985a537aa915...| 22|       Bombur|
|[5c2b985a537aa915...| 50|   KONSTANTIN|
|[5c2b98a1537aa916...| 22|       Bombur|
|[5c2b98a1537aa916...| 50|   KONSTANTIN|
|[5c2b999f537aa919...| 50|        JULIA|
|[5c2b999f537aa919...| 22|       Bombur|
|[5c2b9c08537aa91a...| 50|        JULIA|
|[5c2b9c08537aa91a...| 22|       Bombur|
|[5c2b9e6d537aa91a...| 22|       Bombur|
|[5c2b9e6d537aa91a...| 50|        JULIA|
+--------------------+---+-------------+



In [7]:
# Aggregation

pipeline = "{ $match: { age : { $gt : 51 } } }"
df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").option("pipeline", pipeline).load()
df.show()


+--------------------+----+-------+
|                 _id| age|   name|
+--------------------+----+-------+
|[5c2b96ef537aa913...| 195| Thorin|
|[5c2b96ef537aa913...| 178|  Balin|
|[5c2b96ef537aa913...|  77|   Kili|
|[5c2b96ef537aa913...| 169| Dwalin|
|[5c2b96ef537aa913...| 167|    Oin|
|[5c2b96ef537aa913...| 158|  Gloin|
|[5c2b96ef537aa913...|  82|   Fili|
|[5c2b96ef537aa913...|1000|Gandalf|
|[5c2b96ef537aa913...| 195| Thorin|
|[5c2b96ef537aa913...| 178|  Balin|
|[5c2b96ef537aa913...|  77|   Kili|
|[5c2b96ef537aa913...| 169| Dwalin|
|[5c2b96ef537aa913...|1000|Gandalf|
|[5c2b96ef537aa913...| 167|    Oin|
|[5c2b96ef537aa913...| 158|  Gloin|
|[5c2b96ef537aa913...|  82|   Fili|
|[5c2b96f7537aa913...|  77|   Kili|
|[5c2b96f7537aa913...| 169| Dwalin|
|[5c2b96f7537aa913...| 195| Thorin|
|[5c2b96f7537aa913...| 178|  Balin|
+--------------------+----+-------+
only showing top 20 rows



In [None]:
my_spark.stop()

