### Patch

In [1]:
from pandas.io.sql import SQLTable

def _execute_insert(self, conn, keys, data_iter):
    #print("Using monkey-patched _execute_insert")
    data = [dict((k, v) for k, v in zip(keys, row)) for row in data_iter]
    conn.execute(self.insert_statement().values(data))

SQLTable._execute_insert = _execute_insert

In [28]:
import pandas as pd
import json
import pymysql.cursors
import os
import time

from sqlalchemy import create_engine

In [6]:
# MySQL Parameters
DB_HOST = 'localhost'
DB_USER = 'root'
DB_PASS = ''
DB_NAME = 'hmda'

# Files location
folder = "c:/data/HMDA/"

In [3]:
# Create column format dictionary
coldf = pd.read_csv('C:/data/HMDA/HMDA_columns_1990_2004.csv')
cols = json.loads(coldf.to_json(orient='records'))

In [4]:
cols

[{'NAME': 'ASOF_DATE', 'START': 0, 'STOP': 4, 'LENGTH': 4},
 {'NAME': 'RESP_ID', 'START': 4, 'STOP': 14, 'LENGTH': 10},
 {'NAME': 'AGENCY_CODE', 'START': 14, 'STOP': 15, 'LENGTH': 1},
 {'NAME': 'LOAN_TYPE', 'START': 15, 'STOP': 16, 'LENGTH': 1},
 {'NAME': 'LOAN_PURPOSE', 'START': 16, 'STOP': 17, 'LENGTH': 1},
 {'NAME': 'OCCUPANCY', 'START': 17, 'STOP': 18, 'LENGTH': 1},
 {'NAME': 'LOAN_AMOUNT', 'START': 18, 'STOP': 23, 'LENGTH': 5},
 {'NAME': 'ACTION_TYPE', 'START': 23, 'STOP': 24, 'LENGTH': 1},
 {'NAME': 'PROPERTY_MSA', 'START': 24, 'STOP': 28, 'LENGTH': 4},
 {'NAME': 'STATE_CODE', 'START': 28, 'STOP': 30, 'LENGTH': 2},
 {'NAME': 'COUNTY_CODE', 'START': 30, 'STOP': 33, 'LENGTH': 3},
 {'NAME': 'CENSUS_TRACT_NUMBER', 'START': 33, 'STOP': 40, 'LENGTH': 7},
 {'NAME': 'RACE_APPLICANT', 'START': 40, 'STOP': 41, 'LENGTH': 1},
 {'NAME': 'RACE_COAPPLICANT', 'START': 41, 'STOP': 42, 'LENGTH': 1},
 {'NAME': 'SEX_APPLICANT', 'START': 42, 'STOP': 43, 'LENGTH': 1},
 {'NAME': 'SEX_COAPPLICANT', 'STA

In [31]:
#datarange = [x for x in range(1990, 2001, 1)]
datarange = [x for x in range(1991, 2005, 1)]
datarange

[1991,
 1992,
 1993,
 1994,
 1995,
 1996,
 1997,
 1998,
 1999,
 2000,
 2001,
 2002,
 2003,
 2004]

In [7]:

def UploadDatatoDB(df):
    # Connect to the database
    connection = pymysql.connect(host=DB_HOST,
                                 user=DB_USER,
                                 password=DB_PASS,
                                 db=DB_NAME,
                                 charset='utf8mb4',
                                 cursorclass=pymysql.cursors.DictCursor)

#    engine = create_engine('mysql+pymysql://root:@localhost:3306/hmda', echo = False)
    
    #df.to_sql(name = 'my_table', con = engine, if_exists = 'append', index = False)
    
    #df.to_sql(name='lars', con=connection, if_exists = 'append', index=False)

    try:
        sql = "INSERT INTO lars ("
        for i in cols:
            sql = sql + i["NAME"] + ", "

        sql = sql[:-2]
        sql = sql + ") VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"

        for i in df.iterrows():

            # create tuple of data
            data = [ i[1][x['NAME']].strip() for x in cols]
            data = tuple(data) 

            with connection.cursor() as cursor:
                result_row = cursor.execute(sql, data)
                connection.commit()

            if i[0] % 10000 == 0:
                print(i[0])

        with connection.cursor() as cursor:
            # Read a single record
            sql = "SELECT * FROM lars LIMIT 1"
            cursor.execute(sql)
            result = cursor.fetchone()
            print(result)
    finally:
        connection.close()
        
    return "Upload ok"


In [26]:
def readDataFile(filename):
    
    df = pd.read_csv(filename, header=None, delimiter="\t", names=["original"])

    for col in cols:
        #print(col['NAME'])
        df[col['NAME']] = df.original.map(lambda x: x[col["START"]:col["STOP"]])

    df = df.drop(['original'], axis=1)
    
    
    return df


In [30]:
def manualUpload(filename):
    connection = pymysql.connect(host=DB_HOST,
                                 user=DB_USER,
                                 password=DB_PASS,
                                 db=DB_NAME,
                                 charset='utf8mb4',
                                 cursorclass=pymysql.cursors.DictCursor)

    sql = " LOAD DATA INFILE '" + filename +  "' \
            INTO TABLE hmda.lars \
            FIELDS TERMINATED BY ',' \
            ENCLOSED BY \'\"\'    \
            LINES TERMINATED BY \'\\n\'   \
            IGNORE 1 ROWS           \
            (ASOF_DATE,RESP_ID,AGENCY_CODE,LOAN_TYPE,LOAN_PURPOSE,OCCUPANCY,  \
             LOAN_AMOUNT,ACTION_TYPE,PROPERTY_MSA,STATE_CODE,COUNTY_CODE,     \
             CENSUS_TRACT_NUMBER,RACE_APPLICANT,RACE_COAPPLICANT,SEX_APPLICANT,  \
             SEX_COAPPLICANT,APPLICANT_INCOME,PURCHASER_TYPE,DENIAL_REASON_1,   \
             DENIAL_REASON_2,DENIAL_REASON_3,EDIT_STATUS,SEQUENCE_NUMBER);"
    
    #print(sql)
    try:

        with connection.cursor() as cursor:
            result_row = cursor.execute(sql)
            connection.commit()

        with connection.cursor() as cursor:
            # Read a single record
            sql = "SELECT count(*) FROM lars"
            cursor.execute(sql)
            result = cursor.fetchone()
            print(result)
    finally:
        connection.close()


# Read file, parse, save, upload, delete

In [32]:
for i in datarange:
    
    totalstart = time.time()
    print(i)
    filename = folder + "HMS.U" + str(i) + ".LARS"
    
    if i == 2001:
        filename = filename + ".PUBLIC.DATA"

    if i == 2004:
        filename = "u2004lar.public.dat"

    # Read and parse
    print("Reading and parsing file...", end='')
    parsestart = time.time()
    df = readDataFile(filename)
    print("done.")
    print("Total lines:")
    print(len(df.index))
    parsedone = time.time()
    elapsed = parsedone - parsestart
    print("Parse:", str(elapsed), " sec")
    
    
    # save
    print("Saving file...", end='')
    df.to_csv(filename + '.csv', index=False)
    print("done.")
    
    
    # upload
    print("Uploading file...", end='')
    uploadstart = time.time()
    manualUpload(filename + '.csv')
    uploaddone = time.time()
    elapsed = uploaddone - uploadstart
    print("done.")
    print("Upload:", str(elapsed), " sec")
    
    # remove file
    os.remove(filename + '.csv')
    
    totaldone = time.time()
    elapsed = totaldone - totalstart
    print("Total:", str(elapsed), " sec")
    

1991
Reading and parsing file...done.
Total lines:
7940024
Parse: 77.38726234436035  sec
Saving file...done.
Uploading file...{'count(*)': 14647674}
done.
Upload: 159.0787913799286  sec
Total: 265.1264474391937  sec
1992
Reading and parsing file...done.
Total lines:
12026809
Parse: 177.12148809432983  sec
Saving file...done.
Uploading file...{'count(*)': 26674483}
done.
Upload: 252.4763104915619  sec
Total: 473.73953652381897  sec
1993
Reading and parsing file...done.
Total lines:
15477323
Parse: 289.920250415802  sec
Saving file...done.
Uploading file...{'count(*)': 42151806}
done.
Upload: 320.33869194984436  sec
Total: 675.7708311080933  sec
1994
Reading and parsing file...done.
Total lines:
12215807
Parse: 193.00198483467102  sec
Saving file...done.
Uploading file...{'count(*)': 54367613}
done.
Upload: 260.0399136543274  sec
Total: 496.7594048976898  sec
1995
Reading and parsing file...done.
Total lines:
11271664
Parse: 145.1650252342224  sec
Saving file...done.
Uploading file...{'c

KeyboardInterrupt: 

# Connect to MySQL

### Parse file

In [26]:
df.head()

Unnamed: 0,original,ASOF_DATE,RESP_ID,AGENCY_CODE,LOAN_TYPE,LOAN_PURPOSE,OCCUPANCY,LOAN_AMOUNT,ACTION_TYPE,PROPERTY_MSA,...,RACE_COAPPLICANT,SEX_APPLICANT,SEX_COAPPLICANT,APPLICANT_INCOME,PURCHASER_TYPE,DENIAL_REASON_1,DENIAL_REASON_2,DENIAL_REASON_3,EDIT_STATUS,SEQUENCE_NUMBER
0,19900000035301B121000101NA NANA NA 582400...,1990,35301,B,1,2,1,10,1,,...,8,2,4,39,0,,,,,1
1,19900000035301B121000041NA NANA NA 581400...,1990,35301,B,1,2,1,4,1,,...,8,1,4,32,0,,,,,2
2,19900000035301B121000081NA NANA NA 582400...,1990,35301,B,1,2,1,8,1,,...,8,2,4,35,0,,,,,3
3,19900000035301B121000051NA NANA NA 581400...,1990,35301,B,1,2,1,5,1,,...,8,1,4,30,0,,,,,4
4,19900000035301B121000111NA NANA NA 551201...,1990,35301,B,1,2,1,11,1,,...,5,1,2,130,0,,,,,5


In [28]:
df2 = df.drop(['original'], axis=1)

In [29]:
df2.head()

Unnamed: 0,ASOF_DATE,RESP_ID,AGENCY_CODE,LOAN_TYPE,LOAN_PURPOSE,OCCUPANCY,LOAN_AMOUNT,ACTION_TYPE,PROPERTY_MSA,STATE_CODE,...,RACE_COAPPLICANT,SEX_APPLICANT,SEX_COAPPLICANT,APPLICANT_INCOME,PURCHASER_TYPE,DENIAL_REASON_1,DENIAL_REASON_2,DENIAL_REASON_3,EDIT_STATUS,SEQUENCE_NUMBER
0,1990,35301,B,1,2,1,10,1,,,...,8,2,4,39,0,,,,,1
1,1990,35301,B,1,2,1,4,1,,,...,8,1,4,32,0,,,,,2
2,1990,35301,B,1,2,1,8,1,,,...,8,2,4,35,0,,,,,3
3,1990,35301,B,1,2,1,5,1,,,...,8,1,4,30,0,,,,,4
4,1990,35301,B,1,2,1,11,1,,,...,5,1,2,130,0,,,,,5


In [49]:
len(df.index)

7940024

In [9]:
import MySQLdb


In [10]:
df = df.drop(['original'], axis=1)

In [16]:
from tqdm import tqdm

def chunker(seq, size):
    # from http://stackoverflow.com/a/434328
    return (seq[pos:pos + size] for pos in range(0, len(seq), size))

def insert_with_progress(df, engine):
    chunksize = int(len(df) / 1000) # 10k chunks seems to be largest
    with tqdm(total=len(df)) as pbar:
        for i, cdf in enumerate(chunker(df, chunksize)):
            replace = "replace" if i == 0 else "append"
            
            cdf.to_sql(name='lars', con=engine, if_exists = 'append', index=False)
            
            pbar.update(chunksize)

# ---
engine = create_engine('mysql+mysqldb://root:@localhost:3306/hmda', echo = False)

insert_with_progress(df, engine)

 39%|███████████████████████████▎                                          | 2615730/6707650 [11:34<18:00, 3787.70it/s]Exception ignored in: <function WeakKeyDictionary.__init__.<locals>.remove at 0x0000026D100AB840>
Traceback (most recent call last):
  File "C:\Users\Matt\AppData\Local\conda\conda\envs\lsci-gpu\lib\weakref.py", line 357, in remove
    self = selfref()
KeyboardInterrupt
 45%|███████████████████████████████▊                                      | 3044978/6707650 [13:30<15:58, 3822.80it/s]


KeyboardInterrupt: 

In [17]:
df.to_csv(filename + '.csv', index=False)

### Upload file directly

In [None]:

manualUpload('c:/data/HMDA/HMS.U1990.LARS.csv')