# MongoDB

> Create MongoDB instance and create new database

In [1]:
import pymongo

myclient = pymongo.MongoClient('mongodb://localhost:27017')
mydb = myclient['pyspark_test_db']

In [2]:
# create a new collection (table)
mycollection = mydb['pyspark_test_collection']

In [6]:
# test inserting and retrieving some data
john_doe = {'fname': 'john', 'lname': 'doe', 'age': 46}
jane_doe = {'fname': 'jane', 'lname': 'doe', 'age': 29}

test_data = [john_doe, jane_doe] 
test_insertion_results = mycollection.insert_many(test_data)
test_insertion_results.inserted_ids

[ObjectId('5d543984e001369ade2cde9b'), ObjectId('5d543984e001369ade2cde9c')]

In [9]:
# retrieve some test data
data = mycollection.find({'fname': 'john'})
for _ in data:
    print(_)

{'_id': ObjectId('5d543984e001369ade2cde9b'), 'fname': 'john', 'lname': 'doe', 'age': 46}


In [24]:
# delete the test data
mycollection.delete_many({})

<pymongo.results.DeleteResult at 0x7fc9303d0888>

# PySpark

> now let's import PySpark, create a SparkSession, and load our csv file into a Spark DataFrame  

_We will be working with a small dataset for testing purposes, PySpark can handle data in the petabytes_

In [27]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/37/98/244399c0daa7894cdf387e7007d5e8b3710a79b67f3fd991c0b0b644822d/pyspark-2.4.3.tar.gz (215.6MB)
[K    100% |████████████████████████████████| 215.6MB 196kB/s ta 0:00:011
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K    100% |████████████████████████████████| 204kB 9.4MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Running setup.py bdist_wheel for pyspark ... [?25ldone
[?25h  Stored in directory: /Users/Frankie/Library/Caches/pip/wheels/8d/20/f0/b30e2024226dc112e256930dd2cd4f06d00ab053c86278dcf3
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.3


In [28]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pyspark_test').getOrCreate()

In [29]:
# df = spark.read.csv('cars.csv', header=True, inferSchema=True) # this was a problem because of the ";" delimeter,
# i'm sure it can still work if we set it in the params

In [43]:
df = spark.read.format("csv")\
.option("delimiter", ";")\
.option("header", True)\
.option("inferSchema", True)\
.load("cars.csv")

In [44]:
df.columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin']

In [45]:
df.show(10)

+--------------------+------+---------+------------+----------+------+------------+-----+------+
|                 Car|   MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+------+---------+------------+----------+------+------------+-----+------+
|              STRING|DOUBLE|      INT|      DOUBLE|    DOUBLE|DOUBLE|      DOUBLE|  INT|   CAT|
|Chevrolet Chevell...|  18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|
|   Buick Skylark 320|  15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|
|  Plymouth Satellite|  18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|
|       AMC Rebel SST|  16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|
|         Ford Torino|  17.0|        8|       302.0|     140.0| 3449.|        10.5|   70|    US|
|    Ford Galaxie 500|  15.0|        8|       429.0|     198.0| 4341.|        10.0|   70|    US|
|    Chevrolet Impala|  14.0| 

In [46]:
df.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: string (nullable = true)
 |-- Cylinders: string (nullable = true)
 |-- Displacement: string (nullable = true)
 |-- Horsepower: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Acceleration: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Origin: string (nullable = true)



> Let's continue with some quick EDA on our DataFrame

> Let's now do some transformations to our data

# PySpark DataFrame to MongoDB Collection