# `Map-Reduce Again: Python to MongoDB`

**<font color=red>Mr Fugu Data Science</font>**


# (◕‿◕✿)

[Github](https://github.com/MrFuguDataScience) | [Youtube](https://www.youtube.com/channel/UCbni-TDI-Ub8VlGaP8HLTNw?view_as=subscriber)

`------------------`

`Purpose & Outcome`:

+ Create a connection to MongoDB from Python

+ Do a basic examples of Map-Reduce example
    + Show same example with a query attached
+ export csv/json from MongoDB


`----------------------------------`

# General Idea:

1.) Map : takes local data, relative to that node, writes to a temp file. A master node then makes a sure only a single copy of the data is stored.

2.) Shuffle : redistribute the data, where the same data will be with same node.

3.) Reduce : procecess data by key, in parallel.

4.) There can be a `finalize` step: this can be used to modify output after the reduce step

# When to use Map-Reduce?
+ If you have large datasetsthat that do not fit into main memory of one machine, that is a good time.
+ Graph analysis
+ Classification, Inverted Index, Machine Learning, document clustering are some of the use cases.

[Mongo Doc](https://docs.mongodb.com/manual/reference/method/db.collection.mapReduce/)

# Install Pymongo:

`
conda install -c anaconda pymongo` | `pip install pymongo` | `pip3 install pymongo`


[anaconda](https://anaconda.org/anaconda/pymongo) | [offical doc install](https://pypi.org/project/pymongo/)

In [110]:
import pymongo as pym      # Interface with Python <--> MongoDB 
import pandas as pd        # Create Dataframe
import os                  # find files on system
import csv                 # to dump as a csv file       
import json                # convert file to json format
from bson.code import Code  # helps with map-reduce

import pandas as pd

In [6]:
# Making a Connection to MongoClient
client = pym.MongoClient('mongodb://localhost:27017/')

# DATABASE connection:
db = client["berkeley"]

# CREATING A COLLECTION (*AKA* TABLE):
recruiter_candidates= db["recruiter_clients"]

# What Our Data Looks Like:

`db.recruiter_clients.findOne()
{
	"_id" : ObjectId("5eb5cc5b47c72ccce7539c1b"),
	"candidate" : {
		"first_name" : "Margaret",
		"last_name" : "Mcdonald",
		"skills" : [
			"skLearn",
			"Java",
			"R",
			"SQL",
			"Spark",
			"C++"
		],
		"state" : "AL",
		"specialty" : "Database",
		"experience" : "Junior",
		"relocation" : "no"
	}
}`

# Map-Reduce basic example:

+ Two ways of output:

1.) Iterate through variable name

2.) Call the Output file stored as a new collection ('ppl_skillCount')

In [396]:
# Map Function:
mapFunc = Code("function(){ var skill =\
this.candidate.skills;for(i in skill){emit({name:this.candidate.first_name+' '\
+this.candidate.last_name,skill,skill:this.candidate.skills},\
1);}}")


# Reduce:
redFunc = Code("function(keyName,valSkillCount){return Array.sum(valSkillCount);}")


# Bringing it all together, creating an output file: 'ppl_skillCount'
map_red=recruiter_candidates.map_reduce(mapFunc,redFunc,'ppl_skillCount')

# Printing it all out as dictionary format stored in a list:
ppl_skills=[]
for skills in map_red.find():
    ppl_skills.append(skills)
ppl_skills[:5]    



[{'_id': {'name': 'Aaron Ferguson', 'skill': ['TensorFlow', 'SQL', 'MongoDB']},
  'value': 3.0},
 {'_id': {'name': 'Aaron Williams',
   'skill': ['MongoDB', 'TensorFlow', 'Python']},
  'value': 3.0},
 {'_id': {'name': 'Adam Anderson', 'skill': ['R']}, 'value': 1.0},
 {'_id': {'name': 'Adam Middleton',
   'skill': ['SQL', 'Python', 'MongoDB', 'Java', 'skLearn']},
  'value': 5.0},
 {'_id': {'name': 'Adrian Solis',
   'skill': ['skLearn', 'MongoDB', 'C++', 'TensorFlow']},
  'value': 4.0}]

In [397]:
# Alternate Way: Calling the Output file
mydoc=db.ppl_skillCount.find({})

ppl_=[]
for x in mydoc:
    ppl_.append(x)
ppl_[:4]

[{'_id': {'name': 'Aaron Ferguson', 'skill': ['TensorFlow', 'SQL', 'MongoDB']},
  'value': 3.0},
 {'_id': {'name': 'Aaron Williams',
   'skill': ['MongoDB', 'TensorFlow', 'Python']},
  'value': 3.0},
 {'_id': {'name': 'Adam Anderson', 'skill': ['R']}, 'value': 1.0},
 {'_id': {'name': 'Adam Middleton',
   'skill': ['SQL', 'Python', 'MongoDB', 'Java', 'skLearn']},
  'value': 5.0}]

In [399]:
# Store the Output as a Dataframe:

row =[]
ro=[]
for data in ppl_skills: 
    name_skills = data['_id'] 
    skill_count = data['value'] 
    ro.append(skill_count)  
    row.append(name_skills)
    
ppl_skills_count=pd.DataFrame(row)
ppl_skills_count['skill_count']=ro
ppl_skills_count.head()

Unnamed: 0,name,skill,skill_count
0,Aaron Ferguson,"[TensorFlow, SQL, MongoDB]",3.0
1,Aaron Williams,"[MongoDB, TensorFlow, Python]",3.0
2,Adam Anderson,[R],1.0
3,Adam Middleton,"[SQL, Python, MongoDB, Java, skLearn]",5.0
4,Adrian Solis,"[skLearn, MongoDB, C++, TensorFlow]",4.0


# Map-Reduce with `Query`:

+ Arbitrary Query: find people with less than or equal to 2 skills. 

In [331]:

# Map Function:
mapFunc = Code("function(){ var skill =\
this.candidate.skills;for(i in skill){emit({name:this.candidate.first_name+' '\
+this.candidate.last_name,skill,skill:this.candidate.skills},\
1);}}")


# Reduce:
redFunc = Code("function(keyName,valSkillCount){return Array.sum(valSkillCount);}")


# Bringing it all together, creating an output file: 'ppl_skillCount'
map_red=recruiter_candidates.map_reduce(mapFunc,redFunc,'ppl_skillCount',\
query={'$expr':{'$lte':[{'$size':"$candidate.skills"}, 2]}})

# Printing it all out as dictionary format stored in a list:
ppl_skills=[]
for skills in map_red.find():
    ppl_skills.append(skills)
ppl_skills[:5]    


[{'_id': {'name': 'Adam Anderson', 'skill': ['R']}, 'value': 1.0},
 {'_id': {'name': 'Alexandra Glover', 'skill': ['Spark', 'TensorFlow']},
  'value': 2.0},
 {'_id': {'name': 'Alexis Stewart', 'skill': ['Java']}, 'value': 1.0},
 {'_id': {'name': 'Amy Campbell', 'skill': ['C++']}, 'value': 1.0},
 {'_id': {'name': 'Amy Crawford', 'skill': ['C++']}, 'value': 1.0}]

# Save Query as CSV:  (NON-NESTED FORM)
+ **<font color=red>pay attention</font>**

In [367]:
abbrev_ppl=list(recruiter_candidates.find({},{'_id':0,'candidate.first_name':1,\
'candidate.last_name':1,'candidate.skills':1,'candidate.state':1}))

abbrev_ppl[:2]

# import csv

fields = ['first_name', 'last_name', 'skills','state']
with open('candidate_ppl.csv','w') as outfile:
    write=csv.DictWriter(outfile, fieldnames=fields)
    write.writeheader()
    for x in abbrev_ppl:
#         print(x)
        for y, v in x.items():
#             print(v)
            if y == 'candidate':
#                 print (y, v)
                write.writerow(v)


#                 write.writerow(x)


In [385]:

pd.read_csv('candidate_ppl.csv').head(7)

Unnamed: 0,first_name,last_name,skills,state
0,Margaret,Mcdonald,"['skLearn', 'Java', 'R', 'SQL', 'Spark', 'C++']",AL
1,Michael,Carter,"['TensorFlow', 'R', 'Spark', 'MongoDB', 'C++',...",AR
2,Brenda,Tyler,['Spark'],UT
3,Joseph,King,"['skLearn', 'SQL', 'R', 'Spark', 'Java', 'C++'...",FL
4,Laura,Webb,"['TensorFlow', 'C++', 'SQL', 'Java', 'R', 'Mon...",WY
5,Cheryl,Ramirez,"['C++', 'Python', 'R', 'Java', 'skLearn', 'SQL...",OK
6,Charles,Stewart,"['MongoDB', 'C++', 'Java', 'SQL', 'R', 'Python...",NM


# Retain Original Format: Nested JSON

In [381]:
abbrev_ppl=list(recruiter_candidates.find({},{'_id':0,'candidate.first_name':1,\
'candidate.last_name':1,'candidate.skills':1,'candidate.state':1}))


# import json

with open('candidate_abbrev.json', 'w') as fp:
    json.dump(abbrev_ppl, fp)



# `Citations & Help`:
    
# ◔̯◔

https://appdividend.com/2018/10/26/mongodb-mapreduce-example-tutorial/

https://api.mongodb.com/python/current/examples/aggregation.html#map-reduce

https://stackoverflow.com/questions/7811163/query-for-documents-where-array-size-is-greater-than-1


https://stackoverflow.com/questions/40245873/export-data-to-csv-from-mongodb-by-using-python

https://datashoptalk.com/mapreduce-in-mongodb/

https://runnable.com/blog/pipelines-vs-map-reduce-to-speed-up-data-aggregation-in-mongodb