# Load the Amazon product dataset into MongoDB using Apache Spark

In [1]:
from pyspark.sql import functions
from pyspark.sql import Row
import findspark
findspark.init()
import json
import itertools
from pyspark import SparkConf
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,BooleanType,LongType,DoubleType


database_uri = "mongodb://127.0.0.1:27017"
working_directory ="C://jars/*"



# Connecting spark with MongoDB in Spark Session through mongo-spark-connector  

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# defining the Schema for extracting Data into spark rdd 
schema = StructType([StructField("asin",StringType(),True),
                     StructField("overall",DoubleType(),True),
                     StructField("reviewText",StringType(),True),
                     StructField("reviewTime", StringType(), True),
                     StructField("reviewerID", StringType(), True),
                     StructField("summary", StringType(), True),
                     StructField("unixReviewTime", LongType(), True),
                     StructField("verified", BooleanType(), True)])
spark = SparkSession.builder.master("local").appName('SparkByExamples.com').config("spark.mongodb.input.uri", database_uri).config("spark.mongodb.output.uri", database_uri).config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1").getOrCreate()
df = spark.read.format("json").option("multiLine","False").schema(schema).option("inferSchema","True").load("C:/Users/dell/Downloads/All_Amazon_Review.json/All_Amazon_Review.json")
df1=df.dropna()



In [4]:
df1.show()

+----------+-------+--------------------+-----------+--------------+--------------------+--------------+--------+
|      asin|overall|          reviewText| reviewTime|    reviewerID|             summary|unixReviewTime|verified|
+----------+-------+--------------------+-----------+--------------+--------------------+--------------+--------+
|B017O9P72A|    1.0|Alexa is not able...|12 11, 2015|A27BTSGLXK2C5K|VERY Buggy, doesn...|    1449792000|   false|
|B017O9P72A|    4.0|Alexa works great...| 12 8, 2015|A27ZJ1NCBFP1HZ|      So Far So Good|    1449532800|   false|
|B017O9P72A|    1.0|Weak!!\n\nAlexa d...| 12 7, 2015| ACCQIOZMFN4UK|         Time waster|    1449446400|   false|
|B017O9P72A|    2.0|Can only control ...| 12 5, 2015|A3KUPJ396OQF78|               Buggy|    1449273600|   false|
|B017O9P72A|    1.0|this worked great...| 02 2, 2018|A1U1RE1ZI19E1H|     stopped working|    1517529600|   false|
|B017O9P72A|    5.0|         Great skill|01 15, 2018|A3TXR8GLKS19RE|               Great

In [5]:
df1.printSchema()

root
 |-- asin: string (nullable = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- verified: boolean (nullable = true)



# Writing the data through Pyspark RDD into MongoDB 

In [None]:
df1.write.format("com.mongodb.spark.sql.DefaultSource").option('uri', 'mongodb://127.0.0.1:27017').option('database', 'review_data').option('collection', 'reviews').mode("append").save()