# Create Apache Spark Dataframes in Python using data fron Mongodb collections

In [1]:
import os
import sys

#windows directory path containing spark binaries
spark_path = "C://opt//spark"

os.environ['SPARK_HOME'] = spark_path
os.environ['HADOOP_HOME'] = spark_path
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.mongodb.spark:mongo-spark-connector_2.10:2.0.0 pyspark-shell'

sys.path.append(spark_path + "//bin")
sys.path.append(spark_path + "//python")
sys.path.append(spark_path + "//python//pyspark//")
sys.path.append(spark_path + "//python//lib")
sys.path.append(spark_path + "//python//lib//pyspark.zip")
sys.path.append(spark_path + "//python//lib//py4j-0.10.4-src.zip")

In [2]:
from pyspark import SparkContext
sc = SparkContext(master="local[4]")

In [3]:
from pyspark.sql import SparkSession

In [4]:
my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/people") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/people") \
    .getOrCreate()

In [5]:
#write to mongodb
people = my_spark.createDataFrame([("Bilbo Baggins",  50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77),
   ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"])
people.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("database","local").option("collection", "people").save()


In [8]:
#read from mongodb
df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/local.people").load()

In [9]:
#print schema
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [10]:
df.collect()

[Row(_id=Row(oid=u'58ece8eb79b9441630004370'), age=77, name=u'Kili'),
 Row(_id=Row(oid=u'58ece8eb79b9441630004371'), age=50, name=u'Bilbo Baggins'),
 Row(_id=Row(oid=u'58ece8eb79b9441630004372'), age=167, name=u'Oin'),
 Row(_id=Row(oid=u'58ece8eb79b9441630004373'), age=1000, name=u'Gandalf'),
 Row(_id=Row(oid=u'58ece8eb79b9441630004374'), age=158, name=u'Gloin'),
 Row(_id=Row(oid=u'58ece8eb79b9441630004375'), age=169, name=u'Dwalin'),
 Row(_id=Row(oid=u'58ece8eb79b9441630004376'), age=82, name=u'Fili'),
 Row(_id=Row(oid=u'58ece8eb79b9441630004377'), age=None, name=u'Bombur'),
 Row(_id=Row(oid=u'58ece8eb79b9441630004378'), age=195, name=u'Thorin'),
 Row(_id=Row(oid=u'58ece8eb79b9441630004379'), age=178, name=u'Balin')]

In [12]:
type(df)

pyspark.sql.dataframe.DataFrame