In [52]:
# Imports
from os import walk
import pymysql

########################################################################
# Function for importing data
########################################################################

# I put this as function to make testing easier
# rootDir is the directory where data and spec folder is located
def dataImports(db, cursor, BATCH_SIZE, rootDir):
    
    specDir = rootDir + "specs/"
    dataDir = rootDir + "data/"
    
    # List of Problematic file, for debugging
    problematicFile = []

    print("Selecting data to load...")
    dataToLoad, problematicFile = getDataToLoad(rootDir)

    print()
    print("Loading data...")
    
    # Prepare create table SQL statement for spec files
    for tableName in dataToLoad:
        sqlStatement = ""
        spec = []
        try:
            with open( specDir + tableName + ".csv", "r", encoding="utf-8") as specFile:
                spec, sqlStatement = prepareCreateTableStatement(tableName, specFile)

            # execute the create table statement
            cursor.execute(sqlStatement)
        except Exception as e:
            print("Exception occured:{}".format(e))
            print("Problem with creating table for definition", tableName, ", Skipping this specs" )
            problematicFile.append(specDir + tableName + ".csv")

            # since the data file is not processed as well
            for data in dataToLoad[tableName]:
                problematicFile.append(dataDir + data)
        else:
            # commit the create table statement
            db.commit()
            print("Table ready for", tableName)

            # Prepare insert data SQL statement for data files
            for data in dataToLoad[tableName]:
                with open(dataDir + data, "r", encoding="utf-8") as dataFile:
                    insertions = prepareInsertStatement(tableName, dataFile, BATCH_SIZE, spec)

                # Execute the data insertion
                totalCount = 0
                for batch in insertions:
                    try:
                        # execute the insert data statement
                        cursor.execute(batch["statement"])
                    except Exception as e:
                        print()
                        print("Exception occured:{}".format(e))
                        print("Problem with creating table for data file", data, ", Skipping this file and rollbacking this file" )
                        problematicFile.append(dataDir + data)

                        # if some data input fail, rollback changes in this file
                        db.rollback()
                        totalCount = 0
                        break
                    else:
                        totalCount += batch["count"]
                        
                # commit the insertion
                db.commit()
                print(totalCount, "rows from", data, "are inserted")
                
            print()

    return problematicFile

# Check the specs and data dir to pre choose which files to use in the importing process
def getDataToLoad(rootDir):
    
    specDir = rootDir + "specs/"
    dataDir = rootDir + "data/"
    
    dataToLoad = dict()
    problematicFile = []
    
    # Get list of definition
    for (dirpath, dirnames, filenames) in walk(specDir):
        for file in filenames:
            dataToLoad[file[:-4]] = []

    # Get list of data and assign to spec
    for (dirpath, dirnames, filenames) in walk(dataDir):
        for file in filenames:
            tableName = file[:-15]

            # Ignore file without corresponding spec file
            if tableName in dataToLoad.keys():
                dataToLoad[tableName].append(file)
            else:
                print("Ignoring data file", file, "as no corresponding spec file exist")
                problematicFile.append(dataDir + file)

    # Preventing dictionary changed size during iteration error
    ignoredSpecList = []
    for dataFile in dataToLoad:
        if len(dataToLoad[dataFile]) == 0 :
            ignoredSpecList.append(dataFile)
            print("Ignoring spec file", dataFile + ".csv" , "as no corresponding data file exist")
            problematicFile.append(specDir + dataFile + ".csv")

    # Ignore spec file with no data file
    for dataFile in ignoredSpecList:
        del dataToLoad[dataFile]

    return dataToLoad, problematicFile

# Process spec file, returning the specification elements and create table statement
def prepareCreateTableStatement(tableName, specFile):
    spec = []
    
    # Skip the header
    next(specFile)

    # Prepare statement with id as primary key
    sqlStatement = "CREATE TABLE IF NOT EXISTS `" + tableName + "` ("
    
    # Naming primary key to tablename_id format to make working with it easier            
    sqlStatement += "`" + tableName + "_id` INT(11) NOT NULL auto_increment, "

    for line in specFile:
        specAttr = {}
        line = line.strip().split(',')

        # Formatting, personally I prefer all DB field name to be standardized
        # to lowercase and snake case
        colName = line[0].strip().lower().replace(" ", "_")
        length = line[1].strip()
        dataType = line[2].strip()

        specAttr["colName"] = colName
        specAttr["length"] = int(length)
        specAttr["type"] = dataType

        sqlStatement += "`" + colName + "` "

        if dataType == "TEXT":
            sqlStatement += "VARCHAR(" + length + ")"
        elif dataType == "FLOAT":
            sqlStatement += "DOUBLE"
        elif dataType == "DATETIME":
            sqlStatement += "VARCHAR(20)"
        elif dataType == "BOOLEAN":
            sqlStatement += "tinyint(1)"
        elif dataType == "INTEGER":
            # Might need to handle different number of digits in the future here                
            sqlStatement += "INT(" + length + ")"
        else:
            # Unknown Integer!
            raise Exception('unknown format in spec file ' + tableName)

        # Might need to add default value for table here  
        sqlStatement += ", "
        spec.append(specAttr)

    sqlStatement += "PRIMARY KEY (`" + tableName + "_id`) );"
    
    return spec, sqlStatement

# Process Data file input and prepare sql insert statements
# it will break down large file input into batch to prevent mysql error
def prepareInsertStatement(tableName, dataFile, batchSize, spec):
    
    # Prepare the header
    columnList = [] 
    for specAttr in spec:
        columnList.append(specAttr["colName"])
    insertColumnStatement = "INSERT INTO " + tableName + " (" + ", ".join(columnList) + ") VALUES"
    
    insertion = []
    
    count = 0
    rows = []

    for line in dataFile:  
        position = 0
        
        fieldList = []
        for field in spec:
            value = line[position : (position + field["length"])] 
            
            if field["type"] == "TEXT" or field["type"] == "DATETIME" :
                fieldList.append("'" + value.strip() + "'")
            else :
                fieldList.append(value.strip())
                
            position += field["length"] 

        rows.append("(" + ", ".join(fieldList) + ")")
        count += 1
        
        if count >= batchSize:
            # when the data get too big, separate it to different statements
            batch = {}
            batch["count"] = count
            batch["statement"] = insertColumnStatement + ", ".join(rows)
            insertion.append(batch)
            
            count = 0
            rows = []
    
    if count > 0 : 
        batch = {}
        batch["count"] = count
        batch["statement"] = insertColumnStatement + ", ".join(rows)
        insertion.append(batch)

    return insertion

In [53]:
########################################################################
# Utility
######################################################################## 

# A helper function that read the config file and load it to memory
def getConfig():
    
    # defaults
    dbConfig = {}
    dbConfig["host"] = "localhost"
    dbConfig["user"] = "root"
    dbConfig["password"] = ""
    dbConfig["port"] = 3306
    BATCH_SIZE=100
    
    try:
        with open("config.txt", "r", encoding="utf-8") as configFile:
            config = {}
            for line in configFile:
                line = line.strip().split("=")
                if line[0] != "":
                    config[line[0]] = line[1]
                    
        dbConfig["host"] = config["DB_HOST"]
        dbConfig["port"] = int(config["DB_PORT"])
        
        dbConfig["user"] = config["DB_USERNAME"]
        dbConfig["password"] = config["DB_PASSWORD"]
        
        dbConfig["dbName"] = config["DB_DBNAME"]
        BATCH_SIZE = int(config["DB_INSERT_BATCH_SIZE"])
        
    except Exception as e:
        print("Exception occured:{}".format(e))
        print("Config file not exist! Using default parameters" )
        
    return dbConfig, BATCH_SIZE


# a function that try to connec to the database
def connectToDatabaseAndGetCursor(dbConfig, dbName):
    db = None
    cursor = None
    fail = False
    
    try: 
        # try to conect to db
        db = pymysql.connect(host = dbConfig["host"], user = dbConfig["user"], password = dbConfig["password"], port = dbConfig["port"])
    except Exception as e:
        print("Exception occured:{}".format(e))
        fail = True
    else:
        cursor = db.cursor()
        # try to create a DB called 'dataDB'
        try:
            # use test database for testing
            cursor.execute("create database IF NOT EXISTS " + dbName)
            cursor.execute("use " + dbName)
                
        except Exception as e:
            print("Exception occured:{}".format(e))
            fail = True
    
    return db, cursor, fail

In [54]:
########################################################################
# Entry point
# Run by using `python3 dataLoader.py` in bash
########################################################################
def main():
    
    # CONFIG 
    dbConfig, BATCH_SIZE = getConfig()
    
    # Connect to Database
    db, cursor, fail = connectToDatabaseAndGetCursor(dbConfig, dbConfig["dbName"])
    
    if not fail:
        problematicFile = dataImports(db, cursor, BATCH_SIZE, "")
        db.close()
        
        if len(problematicFile)  == 0:
            print("All file loaded successfully")
        else:
            print("Some file/specs are not loaded, here is the list of unprocessed data")
            for file in problematicFile:
                print(file)
                
main()

Selecting data to load...
Ignoring data file testformat3_2015-06-28.txt as no corresponding spec file exist
Ignoring spec file dummy.csv as no corresponding data file exist

Loading data...
Table ready for testformat1
3 rows from testformat1_2015-06-29.txt are inserted
144 rows from testformat1_2015-06-28.txt are inserted

Table ready for claims
2 rows from claims_2015-06-29.txt are inserted
2 rows from claims_2015-06-28.txt are inserted

Some file/specs are not loaded, here is the list of unprocessed data
data/testformat3_2015-06-28.txt
specs/dummy.csv


In [65]:
########################################################################
# Test Files
########################################################################
def integrationTest():
    
    # TEST RESULT
    success = True
    
    # CONFIG 
    dbConfig, BATCH_SIZE = getConfig()
    
    success = runTestCase(happyFlow, dbConfig, 100) and success
    success = runTestCase(catchNonMatchingSpecAndDataFile, dbConfig, 100) and success
    success = runTestCase(rollbackWhenDataIsCorrupted, dbConfig, 100) and success
    
    if success :
        print("TEST CASE PASS")
        print("Yay! all test case passed")
    else:
        print("TEST CASE FAIL")
        print("Noooo! you broke something")

# Happy Flow test case
def happyFlow(testName, db, cursor, BATCH_SIZE):
    problematicFile = dataImports(db, cursor, 100, "testData/" + testName + "/")
    assert len(problematicFile) == 0
    
    # Baseline test
    cursor.execute("SELECT * from testformat1")
    rows = cursor.fetchall()
    assert str(rows) == "((1, 'Yooofrn', 1, 1), (2, 'Zneabar', 0, -12), (3, 'Uuietdxuq', 1, 103))", "testformat1"
    
    # test if it can success fully insert large amount of data (batch size is 100)
    cursor.execute("SELECT * from testformat2")
    rows = cursor.fetchall()
    assert len(rows) == 1152, "testformat2"
    
    # test if it can handle floating point and other data types
    cursor.execute("SELECT * from claims")
    rows = cursor.fetchall()
    assert str(rows) == "((1, '1234567890', 15.0, '2015-06-08T10:08:03Z', 0, 'Stephen'), (2, '1234567891', -15.0, '2015-06-18T10:08:03Z', 1, 'Curry'))", "claims"

# Happy Flow test case
def catchNonMatchingSpecAndDataFile(testName, db, cursor, BATCH_SIZE):
    problematicFile = dataImports(db, cursor, BATCH_SIZE, "testData/" + testName + "/")
    assert len(problematicFile) == 6, "there should be 6 ignored files"
    
    # No spec file
    assert problematicFile[0] == 'testData/catchNonMatchingSpecAndDataFile/data/testformat2_2015-06-28.txt',"problematicFile output"
    # No data file
    assert problematicFile[1] == 'testData/catchNonMatchingSpecAndDataFile/specs/claims.csv',"problematicFile output"
    # Spec file corrupt
    assert problematicFile[2] == 'testData/catchNonMatchingSpecAndDataFile/specs/testformat3.csv', "problematicFile output"
    assert problematicFile[3] == 'testData/catchNonMatchingSpecAndDataFile/data/testformat3_2015-06-29.txt', "problematicFile output"
    assert problematicFile[4] == 'testData/catchNonMatchingSpecAndDataFile/data/testformat3_2015-06-28.txt', "problematicFile output"
    assert problematicFile[5] == 'testData/catchNonMatchingSpecAndDataFile/data/testformat3_2015-06-30.txt', "problematicFile output"
        
    # Spec file corrupted
    cursor.execute("SELECT * from testformat3")
    rows = cursor.fetchall()
    print(rows)
    assert len(rows) == 0, "should have nothing as spec file is corrupted"
    
# Happy Flow test case
def rollbackWhenDataIsCorrupted(testName, db, cursor, BATCH_SIZE):
    problematicFile = dataImports(db, cursor, BATCH_SIZE, "testData/" + testName + "/")
    assert len(problematicFile) == 2, "problematicFile"
    assert problematicFile[0] == 'testData/rollbackWhenDataIsCorrupted/data/testformat1_2015-06-28.txt', "problematicFile"
    assert problematicFile[1] == 'testData/rollbackWhenDataIsCorrupted/data/testformat2_2015-06-28.txt', "problematicFile"
       
    # Line 177 is corrupted
    cursor.execute("SELECT * from testformat1")
    rows = cursor.fetchall()
    assert len(rows) == 0, "testformat1"
    
    # first line is corrupted
    cursor.execute("SELECT * from testformat2")
    rows = cursor.fetchall()
    assert len(rows) == 0, "testformat2"
    
# Helper function to run the test cases
def runTestCase(f, dbConfig, BATCH_SIZE) : 
    testName =  f.__name__
    # Connect to Database
    db, cursor, fail = connectToDatabaseAndGetCursor(dbConfig, "testDB")
    
    try:
        f(testName, db, cursor, BATCH_SIZE)
    except AssertionError as e:
        print("ASSERTION ERROR occured:{}".format(e))
        success = False
    except Exception as e:    
        print("Exception occured:{}".format(e))
        success = False
    else:
        success = True
    
    if success:
        print("PASS --- " + testName)
    else:
        print("FAIL --- " + testName)
        
    cursor.execute("DROP DATABASE `testDB`;")
    db.close()
    print()
    return success

integrationTest()

Selecting data to load...

Loading data...
Table ready for testformat1
3 rows from testformat1_2015-06-28.txt are inserted

Table ready for testformat2
1152 rows from testformat2_2015-06-28.txt are inserted

Table ready for claims
2 rows from claims_2015-06-28.txt are inserted

PASS --- happyFlow

Selecting data to load...
Ignoring data file testformat2_2015-06-28.txt as no corresponding spec file exist
Ignoring spec file claims.csv as no corresponding data file exist

Loading data...
Exception occured:unknown format in spec file testformat3
Problem with creating table for definition testformat3 , Skipping this specs
['testData/catchNonMatchingSpecAndDataFile/data/testformat2_2015-06-28.txt', 'testData/catchNonMatchingSpecAndDataFile/specs/claims.csv', 'testData/catchNonMatchingSpecAndDataFile/specs/testformat3.csv', 'testData/catchNonMatchingSpecAndDataFile/data/testformat3_2015-06-29.txt', 'testData/catchNonMatchingSpecAndDataFile/data/testformat3_2015-06-28.txt', 'testData/catchNonM