In [1]:
# Load and view data

fname = "data.csv"
with open(fname, 'r') as fh:
    lines = fh.readlines()
    
print("Labels: ", lines[0])
print("Sample: ", lines[1])
print("Columns: ", len(lines))

import pandas as pd

df = pd.read_csv(fname)

Labels:  Order,Ticker,Type,Discreet?,YearMonthDayUTC,Address,State,Zip,Zip4,Cars,Notes,UTC,Time_Date,Time,Month,Day,Year,Spaces,Population,Male_Population,Female_Population,Population_Density,5_Year_Projected_Population_Growth,Average_ Family_Size,Family_Households,Households,Age_Below_18,Age_18_Plus,Age_65_Plus,Associate_Degree,Bachelors_Degree,High_Schl_ Grad,PPHouseholds_1,PPHouseholds_2,MT2PPHousehold_2Plus,Per_Capita_Income,Median_Disposable_Household_Income,PCTWhitePop,PCTAIANPop,PCTAPop,PCTBPop,PCTHPIPop,PCTHisPop,PCTMultiracePop,PCTOtherRacePop,PCTMinority,PCTNo_Vehicle,PCT_1_Vehicle,PCT_2_Vehicles,PCTMore_2_Vehicles,PCTCivilian_Unemp,Slot 1,Slot 2,Slot 3,Slot 4,Slot 5,Slot 6,Category,PrePost,DOW,Shifted Month,WMT-Q,Week End,Unique Location,Unique Location & Year,Duplicate?,Year Repeat?,Same Store in Two Years?,Region,Sub Region,Create Year

Sample:  15.0,CMG,CMG_1467,,11APR01185905,7175 W. Lake Mead  Las Vegas,NV,89128,,53.0,04/01/2011,,2011-04-01 11:59:00,11:59,4.0,1.0,2011.0

In [2]:
# Select relevant columns

iwant = ["Region", "Sub Region", "Address", "State", "Zip", "Cars", "YearMonthDayUTC", "PCTNo_Vehicle", "PCT_1_Vehicle","PCT_2_Vehicles","PCTMore_2_Vehicles", 'Median_Disposable_Household_Income', 'Per_Capita_Income', 'Average_ Family_Size', 'Population_Density', 'Male_Population']

for item in df.keys(): 
    if not (item in iwant):
        del df[item]

print(df)

       YearMonthDayUTC                                   Address State  \
0        11APR01185905              7175 W. Lake Mead  Las Vegas    NV   
1        11APR01172545      1715 North Town East Blvd.  Mesquite    TX   
2        11APR01190010         18951 Brookhurst  Fountain Valley    CA   
3        11APR01172543                       4901 W. Park  Plano    TX   
4        11APR01172545        7700 N. Central Expressway  Dallas    TX   
5        11APR01190024        12359 Seal Beach Blvd.  Seal Beach    CA   
6        11APR01174959                16680 N. 83rd Ave.  Peoria    AZ   
7        11APR01172543         1009 N. Central Expressway  Plano    TX   
8        11APR01190024       16241 Beach Blvd.  Huntington Beach    CA   
9        11APR01190023             5310 Lakewood Blvd.  Lakewood    CA   
10       11APR01175000         5880 W. Thunderbird Rd.  Glendale    AZ   
11       11APR01193126  990 Serramonte Boulevard  Suite D  Colma    CA   
12       11APR01190010             230

In [4]:
# Split data into chunks

import numpy as np

s = 900 # number of records per chunk
t = int(296703 / s) +1

chunks = df.groupby(np.arange(len(df)) // s)
print(df.shape, "=>", len(chunks), "x", "({}, {})".format(s, 16))

indices = list(map(lambda x: x*s, range(0,t)))
parcels = list(zip(indices, chunks)) # list of (start_index, df_chunk) tuples

(296702, 16) => 330 x (900, 16)


In [23]:
import boto3
import os
import requests
from time import sleep

### Config ###
total_servers = t
servers_per_batch = 16 # aws permits max 20 servers running at a time


# Create tarball for server from tuple
#
# Result:
#     <#>/
#       - src.tar.gz
#       - src/
#           - runscript.sh
#           - scraper.py
#           - data.csv
#

def create_tarball(tup):
    # 1. Create runscript
    ind, dfc = tup
    tmpdir = str(ind)+"/src/"
    cmd = "mkdir -p "+tmpdir
    os.system(cmd)
    
    # 1. Copy runscript
    cmd = "cp runscript.sh "+tmpdir
    os.system(cmd)
    
    # 2. Create datafile 
    datafile = tmpdir+"data.csv"
    dfc.to_csv(path_or_buf=datafile)
    
    # 3. Copy scraper program
    cmd = "cp scraper.py "+tmpdir
    os.system(cmd)
    
    # 4. Compress runscript.sh, data.csv, scraper.py
    cmd = "cd "+str(ind)+" && tar czf src.tar.gz src/"
    os.system(cmd)


### Batches ###

# initialize aws client
ec2 = boto3.resource('ec2')

start_inds = range(0, total_servers, servers_per_batch)

# create servers in batches
for ind in start_inds:
   
    num_servers = servers_per_batch

    # compress tarball
    for i in range(ind, ind+servers_per_batch):
        if i < t:
            create_tarball(parcels[i][1])
        else:
            num_servers = num_servers - 1

    
    # start servers (1-12, 13-24, etc.)
    print("starting servers: {}-{}".format(ind, ind+num_servers))
    batch = ec2.create_instances(
        ImageId="ami-9be6f38c",
        InstanceInitiatedShutdownBehavior="terminate",
        InstanceType="t2.nano",
        KeyName="cluster", 
        MinCount=num_servers,
        MaxCount=num_servers)

    # wait for all servers to start and collect ip addresses
    names = []
    for instance in batch:
        print("server #{} not up yet...".format(instance.id))
        instance.wait_until_running()
        instance.reload()
        print((instance.id, instance.state, instance.public_dns_name, instance.public_ip_address))
        names.append(instance.public_ip_address)
    print("all batch servers up")    
    print("servers: {}".format(names))

    
    print("waiting for initialization...")
    # send processing tasks via scp   
    index = 0
    for instance in batch:
        try:
            res = os.system("scp -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -i cluster.pem {}/src.tar.gz ec2-user@{}:src.tar.gz".format(ind+index, instance.public_ip_address))
            while res: # not equal to success case 0
                print("waiting...")
                sleep(60) # have to wait here for first in batch to initialize
                res = os.system("scp -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -i cluster.pem {}/src.tar.gz ec2-user@{}:src.tar.gz".format(ind+index, instance.public_ip_address))          
            
        except Exception as e:
            print("could not upload data to {}: {}".format(instance.id,e))
        
        index = index + 1
    
    print("uploads complete")

    for instance in batch: 
        # ssh in and start task
        print("starting task...")
        host = instance.public_ip_address
        cmd = 'echo "{}" | ./remoteCmd.sh'.format(host)
        os.system(cmd)
        print("task started")
              
    print("batch initialization complete!")
    
    
    # collect completed tasks and shut down
    index = 0
    for instance in batch:
        print("collecting from: {}".format(instance.public_ip_address))
        wdir = os.getcwd()
        cmd = "cd {}/{}/ && wget {}".format(wdir, ind+index, "http://"+instance.public_ip_address+":8000/output.tar.gz")
        not_done = os.system(cmd)
        
        counter = 0
        # wait for done
        while not_done:
            print("waiting... {}".format(counter))
            counter = counter + 1            
            sleep(180)
            not_done = os.system(cmd)
                
        index = index + 1
        print("server #{} complete".format(ind+index))
    
    
    print("terminating batch in 15...")
    sleep(15)
    
    for instance in batch:
        instance.terminate()
    
    print("wait for termination to complete...")
    sleep(100)
    print("batch complete!")

starting servers: 320-330
server #i-078e4f82f516f3af8 not up yet...
('i-078e4f82f516f3af8', {'Name': 'running', 'Code': 16}, 'ec2-54-152-249-156.compute-1.amazonaws.com', '54.152.249.156')
server #i-0b3e456ace12f0fad not up yet...
('i-0b3e456ace12f0fad', {'Name': 'running', 'Code': 16}, 'ec2-54-89-153-7.compute-1.amazonaws.com', '54.89.153.7')
server #i-040d8cc149d4fcc04 not up yet...
('i-040d8cc149d4fcc04', {'Name': 'running', 'Code': 16}, 'ec2-52-206-248-74.compute-1.amazonaws.com', '52.206.248.74')
server #i-0603c555040480e48 not up yet...
('i-0603c555040480e48', {'Name': 'running', 'Code': 16}, 'ec2-54-88-13-5.compute-1.amazonaws.com', '54.88.13.5')
server #i-074a9a4c00642f5fd not up yet...
('i-074a9a4c00642f5fd', {'Name': 'running', 'Code': 16}, 'ec2-52-91-47-53.compute-1.amazonaws.com', '52.91.47.53')
server #i-0bf2750fd5b65bda5 not up yet...
('i-0bf2750fd5b65bda5', {'Name': 'running', 'Code': 16}, 'ec2-52-90-122-188.compute-1.amazonaws.com', '52.90.122.188')
server #i-016a4a2d64

In [25]:
# merge output files

os.system("mkdir output")

def collect(num):
    cmd = "cd {}/ && tar xzf output.tar.gz".format(num)
    os.system(cmd)
    
    cmd = "mv {}/output.csv output/{}.csv".format(num, num)
    os.system(cmd)
    

def merge(num):
    fname = "output/{}.csv".format(num)
    return pd.read_csv(fname)
    
frames = []    
for i in range(0,330):
    collect(i)
    frames.append(merge(i))

result = pd.concat(frames)
print(result)

datafile = "combined.csv"
result.to_csv(path_or_buf=datafile)
print("done!")

     Unnamed: 0 YearMonthDayUTC                                    Address  \
0           532   12APR20180625               8500 Firestone Blvd.  Downey   
1           310   12APR03173938                       635 W Ina Rd  Tucson   
2           408   12APR10164900           9540 Mason-Montgomery Rd.  Mason   
3           277   12APR01171809                    4170 Lavon Dr.  Garland   
4           662   12APR04171147             293 W. Campbell Rd. Richardson   
5           877   12APR16162810                 4930 W. Broad St. Richmond   
6           215   11APR26160017   435 Walt Whitman Rd.  Huntington Station   
7           648   12APR03174450           3040 Excelsior Blvd. Minneapolis   
8            11   11APR01193126   990 Serramonte Boulevard  Suite D  Colma   
9           725   12APR06173627             4151 Sterling Ave. Kansas City   
10          758   12APR09172510              7644 Lyndale Ave. S Richfield   
11          102   11APR12192814                      1050 Gilman

In [42]:
# write csv for map

#lines = result.loc[:, result.columns.isin(["Address", "State", "Lat", "Lng"])]
columns = ["Address", "State", "Lat", "Lng"]
df1 = pd.DataFrame(result, columns=columns)
datafile = "resultData.csv"
df1.to_csv(path_or_buf=datafile)
print("{} written!".format(datafile)) 

resultData.csv written!
