In [5]:
import copy
from pymongo import MongoClient
import pymysql

#MySQL config constants
MYSQL_USERNAME = 'hcheruiy'
MYSQL_PASSWORD = '@cc_20!9'
MYSQL_HOST = 'localhost'
MYSQL_DB = 'ap'

#MongoDB config constants
MONGO_HOST = 'localhost'
MONGO_PORT = 27017
MONGO_DP = 'ap'

In [9]:
# pipeline constants
RESET_MONGO_COLLECTIONS_ON_UPDATE = True
PRINT_INFO = True
PRINT_RESULTS = True


def initialize_mysql():
    """
    Initializes and returns a MySQL database based on config.
    """
    return pymysql.connect(
        host = MYSQL_HOST,
        user = MYSQL_USERNAME,
        password = MYSQL_PASSWORD,
        db = MYSQL_DB,
    )

def initialize_mongo():
    """
    Initializes and returns MongoDB database based on config.
    """
    return MongoClient(config.MONGO_HOST, config.MONGO_PORT)[config.MONGO_DB]

def extract_data(mysql_cursor):
    """
    Given a cursor, extracts data from MySQL dataset and returns all the tables
    with the data.
    """
    general_ledger_accounts = execute_mysql_query('select * from general_ledger_accounts', mysql_cursor, 'fetchall')
    terms = execute_mysql_query('select * from terms', mysql_cursor, 'fetchall') 
    vendors = execute_mysql_query('select * from vendors', mysql_cursor, 'fetchall')
    invoices= execute_mysql_query('select * from invoices', mysql_cursor, 'fetchall')
    invoice_line_items= execute_mysql_query('select * from invoice_line_items', mysql_cursor, 'fetchall')
    vendor_contacts= execute_mysql_query('select * from vendor_contacts', mysql_cursor, 'fetchall')
    invoice_archive= execute_mysql_query('select * from invoice_archive', mysql_cursor, 'fetchall')

    tables = (general_ledger_accounts, terms, vendors, invoices, invoice_line_items, vendor_contacts, invoice_archive)

    return tables

def execute_mysql_query(sql, cursor, query_type):
    """
    executes a given sql, pymsql cursor and type.
    """
    if query_type == 'fetchall':
        cursor.execute(sql)
        return cursor.fetchall()

    elif query_type == 'fetchone':
        cursor.execute(sql)
        return cursor.fetchall()
    else:
        pass

def transform_data(dataset, table):
    """
    Transforms the data to load it into MongoDB, returns a JSON object.
    """
    dataset_collection = []
    tmp_collection = []

    if table == 'general_ledger_accounts':
        for item in dataset[0]:
            tmp_collection['account_number'] = item[0]
            tmp_collection['account_description'] = item[1]
            dataset_collection.append(copy.copy(tmp_collection))

        return dataset_collection

    elif table == 'terms':
        for item in dataset[1]:
            tmp_collection['terms_id'] = item[0]
            tmp_collection['terms_description'] = item[1]
            tmp_collection['terms_due_days'] = item[2]
            dataset_collection.append(copy.copy(tmp_collection))

        return dataset_collection

    elif table == 'vendors':
        for item in dataset[2]:
            tmp_collection['vendor_id'] = item[0]
            tmp_collection['vendor_name'] = item[1]
            tmp_collection['vendor_address1'] = item[2]
            tmp_collection['vendor_address2'] = item[3]
            tmp_collection['vendor_city'] = item[4]
            tmp_collection['vendor_state'] = item[5]
            tmp_collection['vendor_zipcode'] = item[6]
            tmp_collection['vendor_phone'] = item[7]
            tmp_collection['vendor_contact_last_name'] = item[8]
            tmp_collection['vendor_contact_first_name'] = item[9]
            tmp_collection['vendor_terms_id'] = item[10]
            tmp_collection['vendor_account_number'] = item[11]
            dataset_collection.append(copy.copy(tmp_collection))

        return dataset_collection

    elif table == "invoices":
        for item in dataset[3]:
            tmp_collection['invoice_id'] = item[0]
            tmp_collection['vendor_id'] = item[1]
            tmp_collection['invoice_number'] = item[2]
            tmp_collection['invoice_date'] = item[3]
            tmp_collection['invoice_total'] = item[4]
            tmp_collection['payment_total'] = item[5]
            tmp_collection['credit_total'] = item[6]
            tmp_collection['terms_id'] = item[7]
            tmp_collection['invoice_due_date'] = item[8]
            tmp_collection['payment_date'] = item[9]
            dataset_collection.append(copy.copy(tmp_collection))

        return dataset_collection

    elif table == "invoice_line_items":
        for item in dataset[4]:
            tmp_collection['invoice_id'] = item[0]
            tmp_collection['invoice_sequence'] = item[1]
            tmp_collection['account_number'] = item[2]
            tmp_collection['line_item_amount'] = item[3]
            tmp_collection['line_item_description'] = item[4]
            dataset_collection.append(copy.copy(tmp_collection))

        return dataset_collection
    
    elif table == 'vendor_contacts':
        for item in dataset[5]:
            tmp_collection['vendor_id'] = item[0]
            tmp_collection['last_name'] = item[1]
            tmp_collection['first_name'] = item[2]
            dataset_collection.append(copy.copy(tmp_collection))

        return dataset_collection

    elif table == "invoice_archive":
        for item in dataset[6]:
            tmp_collection['invoice_id'] = item[0]
            tmp_collection['vendor_id'] = item[1]
            tmp_collection['invoice_number'] = item[2]
            tmp_collection['invoice_date'] = item[3]
            tmp_collection['invoice_total'] = item[4]
            tmp_collection['payment_total'] = item[5]
            tmp_collection['credit_total'] = item[6]
            tmp_collection['terms_id'] = item[7]
            tmp_collection['invoice_due_date'] = item[8]
            tmp_collection['payment_date'] = item[9]
            dataset_collection.append(copy.copy(tmp_collection))

        return dataset_collection

def load_data(mongo_collection, dataset_collection):
    """
    Loads the data into MongoDB and returns the results.
    """
    if RESET_MONGO_COLLECTIONS_ON_UPDATE:
        mongo_collection.delete_many({})

    return mongo_collection.insert_many(dataset_collection)

def main():
    """
    Main method starts a pipeline, extracts data, transforms it and loads it 
    into a Mongo client.
    """
    if PRINT_INFO:
        print('Starting data pipeline')
        print('Initializing MySQL connection')
    mysql = initialize_mysql()

    if PRINT_INFO:
        print('MySQL connection completed.')
        print('Starting data pipeline stage 1: Extracting data from MySQL.')
    mysql_cursor = mysql.cursor()
    mysql_data = extract_data(mysql_cursor)
    
    if PRINT_INFO:
        print('Stage 1 completed! Data successfully extracted from MySQL')
        print('Starting data pipeline stage 2: Transforming data from MySQL for MongoDB')
        print('Transforming genres dataset')
    general_ledger_accounts_collection = transform_data(mysql_data, "general_ledger_accounts")
    
main()

Starting data pipeline
Initializing MySQL connection
MySQL connection completed.
Starting data pipeline stage 1: Extracting data from MySQL.
Stage 1 completed! Data successfully extracted from MySQL
Starting data pipeline stage 2: Transforming data from MySQL for MongoDB
Transforming genres dataset


TypeError: list indices must be integers or slices, not str

In [10]:
mysql = initialize_mysql()

In [12]:
mysql

<pymysql.connections.Connection at 0x7fe0b95f1da0>

In [13]:
mysql_cursor = mysql.cursor()

In [14]:
mysql_cursor

<pymysql.cursors.Cursor at 0x7fe0b960ca20>

In [28]:
def extract_data(mysql_cursor):
    """
    Given a cursor, extracts data from MySQL dataset and returns all the tables
    with the data.
    """
    general_ledger_accounts = execute_mysql_query('select * from general_ledger_accounts', mysql_cursor, 'fetchall')
    #terms = execute_mysql_query('select * from terms', mysql_cursor, 'fetchall') 
    #vendors = execute_mysql_query('select * from vendors', mysql_cursor, 'fetchall')
    #invoices= execute_mysql_query('select * from invoices', mysql_cursor, 'fetchall')
    #invoice_line_items= execute_mysql_query('select * from invoice_line_items', mysql_cursor, 'fetchall')
    #vendor_contacts= execute_mysql_query('select * from vendor_contacts', mysql_cursor, 'fetchall')
    #invoice_archive= execute_mysql_query('select * from invoice_archive', mysql_cursor, 'fetchall')

    tables = (general_ledger_accounts)# terms, vendors, invoices, invoice_line_items, vendor_contacts, invoice_archive)

    return tables

mysql_data = extract_data(mysql_cursor)

In [29]:
mysql_data

((221, '401K Employee Contributions'),
 (591, 'Accounting'),
 (200, 'Accounts Payable'),
 (110, 'Accounts Receivable'),
 (569, 'Auto Expense'),
 (568, 'Auto License Fee'),
 (565, 'Bank Fees'),
 (394, 'Book Club Royalties'),
 (181, 'Book Development'),
 (120, 'Book Inventory'),
 (400, 'Book Printing Costs'),
 (403, 'Book Production Costs'),
 (572, 'Books, Dues, and Subscriptions'),
 (520, 'Building Lease'),
 (523, 'Building Maintenance'),
 (551, 'Business Forms'),
 (590, 'Business Insurance'),
 (574, 'Business Licenses and Taxes'),
 (280, 'Capital Stock'),
 (162, 'Capitalized Lease'),
 (536, 'Card Deck Advertising'),
 (100, 'Cash'),
 (610, 'Charitable Contributions'),
 (555, 'Collection Agency Fees'),
 (301, 'College Sales'),
 (310, 'Compositing Revenue'),
 (160, 'Computer Equipment'),
 (527, 'Computer Equipment Maintenance'),
 (306, 'Consignment Sales'),
 (556, 'Credit Card Handling'),
 (540, 'Direct Mail Advertising'),
 (238, 'Employee FICA Taxes Payable'),
 (242, 'Employee SDI Taxes 

In [18]:
type(mysql_data)

tuple

In [30]:
len(mysql_data)

75

In [31]:
mysql_data[1]

(591, 'Accounting')

In [32]:
tmp = []

tmp['acc_number'] = mysql_data[0]
tmp

TypeError: list indices must be integers or slices, not str

In [27]:
list_data

[((221, '401K Employee Contributions'),
  (591, 'Accounting'),
  (200, 'Accounts Payable'),
  (110, 'Accounts Receivable'),
  (569, 'Auto Expense'),
  (568, 'Auto License Fee'),
  (565, 'Bank Fees'),
  (394, 'Book Club Royalties'),
  (181, 'Book Development'),
  (120, 'Book Inventory'),
  (400, 'Book Printing Costs'),
  (403, 'Book Production Costs'),
  (572, 'Books, Dues, and Subscriptions'),
  (520, 'Building Lease'),
  (523, 'Building Maintenance'),
  (551, 'Business Forms'),
  (590, 'Business Insurance'),
  (574, 'Business Licenses and Taxes'),
  (280, 'Capital Stock'),
  (162, 'Capitalized Lease'),
  (536, 'Card Deck Advertising'),
  (100, 'Cash'),
  (610, 'Charitable Contributions'),
  (555, 'Collection Agency Fees'),
  (301, 'College Sales'),
  (310, 'Compositing Revenue'),
  (160, 'Computer Equipment'),
  (527, 'Computer Equipment Maintenance'),
  (306, 'Consignment Sales'),
  (556, 'Credit Card Handling'),
  (540, 'Direct Mail Advertising'),
  (238, 'Employee FICA Taxes Payabl