<font size = +2> Transfering data from MongoDB to MySql</font>

Ensure that you install the packages needed for this project using "pip install"

In [1]:
import pymongo
from pymongo import MongoClient
from pprint import pprint
import json
import relationalize
from typing import Any, Dict, Generator
from relationalize import Relationalize, Schema
from relationalize.utils import create_local_file
import os
import csv
import pandas as pd
import mysql.connector as ms
from mysql.connector import Error

Using MongoClient to connect to the MongoDB.

In [2]:
client = MongoClient()
#Connect to the database 'test' by attaching the name of the database as an extension to the instantiated mongoclient variable.
db = client.test
#Connect to the collection using the name as an extension
employee = db.employee
employee

Collection(Database(MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True), 'test'), 'employee')

In [3]:
#Next, check to see if there are any data in the collection
len = 0
#Using the find attribute of mongodb to get the total number of documents in the collection
for doc in employee.find():
    len += 1
print(len)

8


In [4]:
#Printing out the list of documents in the collection since they are not much
for doc in employee.find():
    pprint(doc)

{'Address': 'Sears Streer, NZ',
 'Age': '42',
 'Name': 'Raj Kumar',
 '_id': ObjectId('64307ef302a0320de41984ea')}
{'Address': 'Sears Streer, NZ',
 'Age': '42',
 'Name': 'Raj Kumar',
 '_id': ObjectId('64307f1c02a0320de41984eb')}
{'Address': 'Ehime Mbano, Nigeria',
 'Age': '29',
 'Name': 'Meshach Nnosiri',
 '_id': ObjectId('6430837902a0320de41984ec')}
{'Address': 'Ehime Mbano, Nigeria',
 'Age': '31',
 'Name': 'Franklin Nnosiri',
 '_id': ObjectId('6430837902a0320de41984ed')}
{'Address': 'Portharcourt, Nigeria',
 'Age': '24',
 'Name': 'Judith Nnosiri',
 '_id': ObjectId('6430837902a0320de41984ee')}
{'Address': 'Owerri, Nigeria',
 'Age': '26',
 'Name': 'Nancy Nnosiri',
 '_id': ObjectId('643095ec02a0320de41984ef'),
 'marrital_status': 'Single'}
{'_id': ObjectId('6430ac89cd83c136312f3c9f'),
 'author': 'Martin',
 'contributors': ['Aldren', 'Geir Arne', 'Jaya', 'Joanna', 'Mike'],
 'title': 'Beautiful Soup: Build a Web Scraper With Python',
 'url': 'https://realpython.com/beautiful-soup-web-scrap

In [5]:
#I defined a function that allows us to specify the number of documents we want printed
def find(documents, number):
    docs = documents
    list_docs = []
    for x in range(number):
        doc = docs[x]
        list_docs.append(doc)
    return list_docs       

#Instead using the 'pprint' function to print JSON objects, we can also do this using 'print and json.dumps'
print(json.dumps(find(employee.find(), 2), default=str, indent=2))

[
  {
    "_id": "64307ef302a0320de41984ea",
    "Name": "Raj Kumar",
    "Address": "Sears Streer, NZ",
    "Age": "42"
  },
  {
    "_id": "64307f1c02a0320de41984eb",
    "Name": "Raj Kumar",
    "Address": "Sears Streer, NZ",
    "Age": "42"
  }
]


Next we write the documents in JSON format into a file of json extension

In [6]:
with open('export.json', "w") as export_file:
    for document in employee.find():
        export_file.write(f"{json.dumps(document, default=str)}\n")

Check to see that the json objects were written in the file

In [7]:
with open('export.json', "r") as file:
    docs = file.read()
    print(docs)

{"_id": "64307ef302a0320de41984ea", "Name": "Raj Kumar", "Address": "Sears Streer, NZ", "Age": "42"}
{"_id": "64307f1c02a0320de41984eb", "Name": "Raj Kumar", "Address": "Sears Streer, NZ", "Age": "42"}
{"_id": "6430837902a0320de41984ec", "Name": "Meshach Nnosiri", "Address": "Ehime Mbano, Nigeria", "Age": "29"}
{"_id": "6430837902a0320de41984ed", "Name": "Franklin Nnosiri", "Address": "Ehime Mbano, Nigeria", "Age": "31"}
{"_id": "6430837902a0320de41984ee", "Name": "Judith Nnosiri", "Address": "Portharcourt, Nigeria", "Age": "24"}
{"_id": "643095ec02a0320de41984ef", "Name": "Nancy Nnosiri", "Address": "Owerri, Nigeria", "Age": "26", "marrital_status": "Single"}
{"_id": "6430ac89cd83c136312f3c9f", "title": "Beautiful Soup: Build a Web Scraper With Python", "author": "Martin", "contributors": ["Aldren", "Geir Arne", "Jaya", "Joanna", "Mike"], "url": "https://realpython.com/beautiful-soup-web-scraper-python/"}
{"_id": "6430add2cd83c136312f3ca0", "title": "Beautiful Soup: Build a Web Scrape

In [8]:
schemas: Dict[str, Schema] = {}

#Create a function that will enable relationalize to create a schema for the json objects
# this gets called when a relationalized object is written to the temporary file.
def on_object_write(schema: str, object: dict):
    if schema not in schemas:
        schemas[schema] = Schema()
    schemas[schema].read_object(object)

#Create an iterator that will iterate through each of the json objects present and return list of the values present
def create_iterator(filename) -> Generator[Dict[str, Any], None, None]:
    with open(filename, "r") as infile:
        for line in infile:
            yield json.loads(line)

#Create a temporary folder to store the flattened json objects using relationalize
os.makedirs("temp", exist_ok=True)
with Relationalize(
    'employee', create_local_file("temp"), on_object_write
) as r:
    r.relationalize(create_iterator("export.json"))

In [9]:
#Printing the values present in the schema list which includes the name of the collection and the generated extension of the data, as well as their columns.
print(schemas.items())

dict_items([('employee', <relationalize.schema.Schema object at 0x0000017AB4B4CD90>), ('employee_contributors', <relationalize.schema.Schema object at 0x0000017AB4B4CC70>)])


In [10]:
#Next we print out the column names present in the schema
for schema_name, schema in schemas.items():
    print(schema_name, schema.generate_output_columns())

employee ['Address', 'Age', 'Name', '_id', 'author', 'contributors', 'marrital_status', 'title', 'url']
employee_contributors ['contributors__index_', 'contributors__rid_', 'contributors__val_']


In [11]:
#Create a folder where the files will be converted from a json format to a csv.
os.makedirs("final", exist_ok=True)
for schema_name, schema in schemas.items():
    with open(
        os.path.join("final", f"{schema_name}.csv"),
        "w",
    ) as final_file:
        writer = csv.DictWriter(final_file, fieldnames=schema.generate_output_columns())
        #Denote the variable as header using the writeheader() function
        writer.writeheader()
        for row in create_iterator(os.path.join("temp", f"{schema_name}.json")):
            converted_obj = schema.convert_object(row)
            #Using the writerow extention to include the 
            writer.writerow(converted_obj)

Next the CSV files are converted into pandas format where it is checked and using iterrows() to iterrate through and write in relational database(mysql)

In [12]:
df = pd.read_csv(r'C:\Users\user\Desktop\Data Science\Cleaning_FIFA_Data\final\employee.csv')
df2 = pd.read_csv(r'C:\Users\user\Desktop\Data Science\Cleaning_FIFA_Data\final\employee_contributors.csv')

In [13]:
#Employee files containing all the columns
df.head()

Unnamed: 0,Address,Age,Name,_id,author,contributors,marrital_status,title,url
0,"Sears Streer, NZ",42.0,Raj Kumar,64307ef302a0320de41984ea,,,,,
1,"Sears Streer, NZ",42.0,Raj Kumar,64307f1c02a0320de41984eb,,,,,
2,"Ehime Mbano, Nigeria",29.0,Meshach Nnosiri,6430837902a0320de41984ec,,,,,
3,"Ehime Mbano, Nigeria",31.0,Franklin Nnosiri,6430837902a0320de41984ed,,,,,
4,"Portharcourt, Nigeria",24.0,Judith Nnosiri,6430837902a0320de41984ee,,,,,


In [14]:
#Employee_contributors file, an extension of the employee file only that this contains employees with contributors name as well as a unique code identifier linking it to the employee data.
df2.head()

Unnamed: 0,contributors__index_,contributors__rid_,contributors__val_
0,0,R_e8c87c751d464567b2fe55a5d7924c88,Aldren
1,1,R_e8c87c751d464567b2fe55a5d7924c88,Geir Arne
2,2,R_e8c87c751d464567b2fe55a5d7924c88,Jaya
3,3,R_e8c87c751d464567b2fe55a5d7924c88,Joanna
4,4,R_e8c87c751d464567b2fe55a5d7924c88,Mike


Create a connection to MySQL 

In [15]:
try:
    connection = ms.connect(
        host='127.0.0.1',  # host on which the database is running
        database='sakila',  # name of the database to connect to
        user='root',  # username to connect with
        password='password')  # password associated with your username
    if connection.is_connected():
        db_Info = connection.get_server_info()
        print("Connected to MySQL Server version ", db_Info)
        cursor = connection.cursor()
        cursor.execute("select database();")
        record = cursor.fetchone()
        print("You're connected to database: ", record)

except Error as e:
    print("Error while connecting to MySQL", e)

Connected to MySQL Server version  8.0.31
You're connected to database:  ('sakila',)


In [16]:
cursor = connection.cursor()

Create a table and the columns present in the table, this is very neccessary for relational database

In [17]:
cursor.execute("""DROP TABLE IF EXISTS employee_test""")
cursor.execute("""CREATE TABLE employee_test (Address varchar(255), Age varchar(255),
                      Name varchar(255), _id varchar(255), author varchar(255) DEFAULT NULL,
                      contributors varchar(255) DEFAULT NULL, marital_status varchar(255) DEFAULT NULL,
                      title varchar(255) DEFAULT NULL, url varchar(255) DEFAULT NULL)""")

Next you insert into the values into the table by iterating through the df data gotten by using pandas.

In [18]:
for column, row in df.iterrows():
    value = row
    #Define a query for inserting into a table.
    query = """INSERT INTO sakila.employee_test
               VALUES ("%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s")"""
    cursor.execute(query, tuple(value))
    connection.commit()
print('Inserted row')

Inserted row


In [19]:
#Query the database to see if the rows have indeed been inserted
query = """SELECT *
           FROM employee_test"""
cursor.execute(query)
values = cursor.fetchall()

In [20]:
#Get the columns from the table in the sql using cursor.description
columns = [column[0] for column in cursor.description]
print(f'The columns present are:{columns}')

The columns present are:['Address', 'Age', 'Name', '_id', 'author', 'contributors', 'marital_status', 'title', 'url']


In [21]:
#Create a dataframe with the values and columns then crosscheck with the initial dataframe gotten from the saved csv file
data = pd.DataFrame(values, columns = columns)
data.head()

Unnamed: 0,Address,Age,Name,_id,author,contributors,marital_status,title,url
0,"'Sears Streer, NZ'",42.0,'Raj Kumar','64307ef302a0320de41984ea',,,,,
1,"'Sears Streer, NZ'",42.0,'Raj Kumar','64307f1c02a0320de41984eb',,,,,
2,"'Ehime Mbano, Nigeria'",29.0,'Meshach Nnosiri','6430837902a0320de41984ec',,,,,
3,"'Ehime Mbano, Nigeria'",31.0,'Franklin Nnosiri','6430837902a0320de41984ed',,,,,
4,"'Portharcourt, Nigeria'",24.0,'Judith Nnosiri','6430837902a0320de41984ee',,,,,


Create a different table in mysql where the relational data will be stored

In [22]:
cursor.execute("""DROP TABLE IF EXISTS employee_contributors""")
cursor.execute("""CREATE TABLE employee_contributors(contributors__index_ varchar(255), 
                  contributors__rid_ varchar(255), 
                  contributors__val_ varchar(255))""")
for column,row in df2.iterrows():
    value = row
    query2 = """INSERT INTO sakila.employee_contributors
                 VALUES(%s, %s, %s)"""
    cursor.execute(query2, tuple(value))
    connection.commit()
print("Rows has been inserted")

Rows has been inserted


Get the employee_contributors from the database and cross check it with the initial file.

In [23]:
query3 = """SELECT *
            FROM employee_contributors"""
cursor.execute(query3)
value1 = cursor.fetchall()

column1 = [column[0] for column in cursor.description]
print(f'The columns present are:{column1}')
data1 = pd.DataFrame(value1, columns = column1)
data1.head()

The columns present are:['contributors__index_', 'contributors__rid_', 'contributors__val_']


Unnamed: 0,contributors__index_,contributors__rid_,contributors__val_
0,0,R_e8c87c751d464567b2fe55a5d7924c88,Aldren
1,1,R_e8c87c751d464567b2fe55a5d7924c88,Geir Arne
2,2,R_e8c87c751d464567b2fe55a5d7924c88,Jaya
3,3,R_e8c87c751d464567b2fe55a5d7924c88,Joanna
4,4,R_e8c87c751d464567b2fe55a5d7924c88,Mike


It's standard practise to close the files as well as the database when you are done working.

In [24]:
client.close()
connection.close()