In [1]:
!conda info


     active environment : base
    active env location : /Users/l1x/miniconda3
            shell level : 1
       user config file : /Users/l1x/.condarc
 populated config files : 
          conda version : 4.7.10
    conda-build version : not installed
         python version : 3.7.3.final.0
       virtual packages : 
       base environment : /Users/l1x/miniconda3  (writable)
           channel URLs : https://repo.anaconda.com/pkgs/main/osx-64
                          https://repo.anaconda.com/pkgs/main/noarch
                          https://repo.anaconda.com/pkgs/r/osx-64
                          https://repo.anaconda.com/pkgs/r/noarch
          package cache : /Users/l1x/miniconda3/pkgs
                          /Users/l1x/.conda/pkgs
       envs directories : /Users/l1x/miniconda3/envs
                          /Users/l1x/.conda/envs
               platform : osx-64
             user-agent : conda/4.7.10 requests/2.22.0 CPython/3.7.3 Darwin/18.7.0 OSX/10.14.6
                U

In [2]:
import datetime as dt
import hashlib
import pprint
import re
import os

import boto3
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pandas_profiling
import seaborn as sns

from botocore.exceptions import ClientError
from pyhive import presto
from scipy import stats

(pd.__version__,sns.__version__, np.__version__)

('0.25.1', '0.9.0', '1.17.0')

In [3]:
pp = pprint.PrettyPrinter(indent=2)

### Generating some example data

In [4]:
def date_range(start_date=dt.date(2019, 9, 1), end_date = dt.date(2019, 9, 30)):
    delta = end_date - start_date
    dates = [start_date + dt.timedelta(days=i) for i in range(delta.days + 1)]
    return [str(d) for d in dates]

In [5]:
def gen_data(sample_size=10_000_000):
    f0 = np.random.randint(2, size=sample_size)
    aa_milne_arr = ['pooh', 'rabbit', 'piglet', 'christopher']
    f1 = np.random.choice(aa_milne_arr, size=sample_size, p=[0.5, 0.1, 0.1, 0.3])
    f2 = np.random.randint(5_000, size=sample_size)
    ds_arr = date_range()
    f3 = np.random.choice(ds_arr, size=sample_size)
    dl = ['2019-08-23 16:24:55.000', '2019-08-23 15:51:07.000',
           '2019-08-23 15:41:30.000', '2019-08-23 15:36:45.000',
           '2019-08-06 13:54:36.000', '2019-08-02 11:32:04.000',
           '2019-08-23 11:15:20.000', '2019-08-23 11:07:46.000',
           '2019-08-12 10:51:20.000', '2019-08-23 10:51:19.000',
           '2019-08-29 10:33:24.000', '2019-08-23 09:50:18.000',
           '2019-08-23 08:21:51.000', '2019-08-11 11:50:55.000',
           '2019-08-22 11:36:52.000', '2019-08-22 11:31:24.000',
           '2019-08-22 09:56:18.000', '2019-08-21 21:42:50.000',
           '2019-08-21 21:11:08.000', '2019-08-15 21:09:18.000',
           '2019-08-17 21:04:52.000', '2019-08-21 20:57:57.000',
           '2019-08-21 20:27:09.000', '2019-08-21 20:06:50.000',
           '2019-08-12 20:01:00.000', '2019-08-07 19:50:41.000',
           '2019-08-21 17:53:00.000', '2019-08-21 17:38:45.000',
           '2019-08-01 16:37:32.000', '2019-08-15 14:04:18.000',
           '2019-08-15 13:42:21.000', '2019-08-15 13:36:25.000',
           '2019-08-15 13:27:50.000', '2019-08-20 13:22:55.000']
    dd = [np.datetime64(d) for d in dl]
    f4 = np.random.choice(dd, size=sample_size)
    f5 = np.random.randint(500_000, size=sample_size)
    sha_arr = [hashlib.sha224(bytes(s, 'UTF8')).hexdigest() for s in ds_arr]
    f6 = np.random.choice(sha_arr, size=sample_size)
    cities_arr = [
        'Seattle',
        'Los Angeles',
        'Portland',
        'San Diego',
        'Phoenix',
        'Dallas',
        'Houston',
        'Charlotte',
        'Detroit',
        'Nashville'
    ]
    f7 = np.random.choice(cities_arr, size=sample_size)
    f8 = np.random.random_sample((sample_size,))
    return (f0, f1, f2, f3, f4, f5, f6, f7, f8)

In [6]:
data = gen_data(sample_size=100)
df = pd.DataFrame({
    'bin_int': data[0], 'milne': data[1], 'rint_5_000': data[2],
    'ds': data[3], 'rdates': data[4], 'rint_500_000': data[5],
    'sha': data[6], 'cities': data[7], 'rfloat': data[8],
})
df.head()

Unnamed: 0,bin_int,milne,rint_5_000,ds,rdates,rint_500_000,sha,cities,rfloat
0,1,piglet,1997,2019-09-23,2019-08-23 11:15:20,149107,c17e819192baba61a8a0c49cc82ba012e3a0361dacffc3...,Seattle,0.919322
1,0,pooh,2798,2019-09-23,2019-08-15 21:09:18,437040,39acca9365153f85fe5fba70776de7980b1328aa5fa4ab...,Los Angeles,0.048787
2,1,christopher,2039,2019-09-23,2019-08-21 20:27:09,458579,5daf27ca22eef377897f1917cf87a57d9c0c28b7dcef76...,Portland,0.095792
3,0,pooh,1273,2019-09-16,2019-08-11 11:50:55,485624,38ae948be43dd5bfdd0f5ab70646fcfd59c540f3405637...,Nashville,0.226544
4,0,rabbit,4484,2019-09-15,2019-08-20 13:22:55,489752,dc76ddec562456db644decc4d6b7a8419ae081a7e0dddc...,Phoenix,0.252698


In [7]:
#df_sample = df.sample(10)

In [8]:
if True == False:
    for i in range(20):
        data = gen_data(10_000_000)
        df = pd.DataFrame({
            'bin_int': data[0], 'milne': data[1],  'rint_5_000': data[2],
            'ds': data[3],      'rdates': data[4], 'rint_500_000': data[5],
            'sha': data[6],     'cities': data[7], 'rfloat': data[8],
        })
        file_name = "data_{0}.csv".format(i)
        df.to_csv(file_name, index=False, encoding='utf-8', header=False)

In [9]:
#if you ever store credentials in code you are doing it wrong
#either use instance profile to run inside AWS or use credential profile with local credential files
aws_region = 'eu-west-1'
aws_profile = 'li-istvan'
aws_credentials_file = '{0}/{1}'.format(os.environ['HOME'], '.aws/credentials')

if os.access(aws_credentials_file, os.R_OK):
    session = boto3.session.Session(profile_name=aws_profile, region_name=aws_region)
else:
    session = boto3.session.Session(region_name=aws_region)

s3_client = session.client('s3', )
s3_resource = session.resource('s3')

In [10]:
aws_s3_bucket = 'li-perf-test'
try:
    s3_client.create_bucket(Bucket=aws_s3_bucket, CreateBucketConfiguration={'LocationConstraint': aws_region})
except ClientError as e:
    if 'BucketAlreadyOwnedByYou' not in str(e):
        raise
    else:
        print("AWS S3 Bucket Already Created: ", aws_s3_bucket)

AWS S3 Bucket Already Created:  li-perf-test


In [11]:
[bucket.name for bucket in s3_resource.buckets.all() if re.match('.*perf.*', bucket.name) is not None]

['li-perf-test']

In [12]:
aws_s3_root_path='data'
def upload_file_to_aws_s3(file_name):
    aws_s3_object_name = '{aws_s3_root_path}/raw/{file_name}'.format(
        aws_s3_root_path=aws_s3_root_path, 
        file_name=file_name
    )
    try:
        s3_resource.Object(aws_s3_bucket, aws_s3_object_name).load()
    except ClientError as e:
        if e.response['Error']['Code'] == "404":
            pp.pprint('Uploading starting: {0}'.format(aws_s3_object_name))
            s3_client.upload_file(file_name, aws_s3_bucket, aws_s3_object_name)
            pp.pprint('Uploading finished...{0}'.format(aws_s3_object_name))
        else:
            raise
    else:
        pp.pprint('File has been uploaded previously: {0}'.format(aws_s3_object_name))
        
    return 'ok'



In [13]:
from multiprocessing import Pool
pool = Pool(5)
pool.map(upload_file_to_aws_s3, [x for x in os.listdir(".") if x.endswith(".csv")])

'File has been uploaded previously: data/raw/data_3.csv'
'File has been uploaded previously: data/raw/data_19.csv'
'File has been uploaded previously: data/raw/data_1.csv'
'File has been uploaded previously: data/raw/data_2.csv'
'File has been uploaded previously: data/raw/data_18.csv'
'File has been uploaded previously: data/raw/data_4.csv'
'File has been uploaded previously: data/raw/data_0.csv'
'File has been uploaded previously: data/raw/data_5.csv'
'File has been uploaded previously: data/raw/data_7.csv'
'File has been uploaded previously: data/raw/data_6.csv'
'File has been uploaded previously: data/raw/data_13.csv'
'File has been uploaded previously: data/raw/data_12.csv'
'File has been uploaded previously: data/raw/data_10.csv'
'File has been uploaded previously: data/raw/data_8.csv'
'File has been uploaded previously: data/raw/data_11.csv'
'File has been uploaded previously: data/raw/data_9.csv'
'File has been uploaded previously: data/raw/data_15.csv'
'File has been uploaded 

['ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok',
 'ok']

In [14]:
def list_objectin_aws_s3(aws_s3_bucket, aws_s3_object_prefix):
    paginator = s3_client.get_paginator('list_objects')
    filters = { 'Bucket': aws_s3_bucket,
                'Prefix': aws_s3_object_prefix }
    page_iterator = paginator.paginate(**filters)
    return [p['Key'] for page in page_iterator for p in page['Contents']]


In [15]:
list_objectin_aws_s3(aws_s3_bucket, 'data/')

['data/raw/data_0.csv',
 'data/raw/data_1.csv',
 'data/raw/data_10.csv',
 'data/raw/data_11.csv',
 'data/raw/data_12.csv',
 'data/raw/data_13.csv',
 'data/raw/data_14.csv',
 'data/raw/data_15.csv',
 'data/raw/data_16.csv',
 'data/raw/data_17.csv',
 'data/raw/data_18.csv',
 'data/raw/data_19.csv',
 'data/raw/data_2.csv',
 'data/raw/data_3.csv',
 'data/raw/data_4.csv',
 'data/raw/data_5.csv',
 'data/raw/data_6.csv',
 'data/raw/data_7.csv',
 'data/raw/data_8.csv',
 'data/raw/data_9.csv']

In [16]:
df.columns

Index(['bin_int', 'milne', 'rint_5_000', 'ds', 'rdates', 'rint_500_000', 'sha',
       'cities', 'rfloat'],
      dtype='object')

In [17]:
'''
CREATE DATABASE IF NOT EXISTS `perf`;
'''

'\nCREATE DATABASE IF NOT EXISTS `perf`;\n'

In [18]:
print('''CREATE EXTERNAL TABLE `perf`.`csv_test0`( 
   `bin_int` INT
   , `milne` STRING
   , `rint_5_000` INT
   , `rint_100` INT
   , `rdates` STRING
   , `rint_500_000` INT
   , `sha` STRING
   , `cities` STRING
   , `rfloat` FLOAT
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',' 
LINES TERMINATED BY '\\n' 
STORED AS TEXTFILE 
LOCATION 's3://{aws_s3_bucket}/{aws_s3_root_path}/raw/';
'''.format(aws_s3_bucket=aws_s3_bucket, aws_s3_root_path=aws_s3_root_path))

CREATE EXTERNAL TABLE `perf`.`csv_test0`( 
   `bin_int` INT
   , `milne` STRING
   , `rint_5_000` INT
   , `rint_100` INT
   , `rdates` STRING
   , `rint_500_000` INT
   , `sha` STRING
   , `cities` STRING
   , `rfloat` FLOAT
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',' 
LINES TERMINATED BY '\n' 
STORED AS TEXTFILE 
LOCATION 's3://li-perf-test/data/raw/';



In [19]:
pp.pprint('''CREATE EXTERNAL TABLE `perf`.`orc_test1`( 
   `bin_int` INT
   , `milne` STRING
   , `rint_5_000` INT
   , `rint_100` INT
   , `rdates` STRING
   , `rint_500_000` INT
   , `sha` STRING
   , `cities` STRING
   , `rfloat` FLOAT
) 
STORED AS ORC
LOCATION 's3://{aws_s3_bucket}/{aws_s3_root_path}/orc_test1/';
'''.format(aws_s3_bucket=aws_s3_bucket, aws_s3_root_path=aws_s3_root_path))

('CREATE EXTERNAL TABLE `perf`.`orc_test1`( \n'
 '   `bin_int` INT\n'
 '   , `milne` STRING\n'
 '   , `rint_5_000` INT\n'
 '   , `rint_100` INT\n'
 '   , `rdates` STRING\n'
 '   , `rint_500_000` INT\n'
 '   , `sha` STRING\n'
 '   , `cities` STRING\n'
 '   , `rfloat` FLOAT\n'
 ') \n'
 'STORED AS ORC\n'
 "LOCATION 's3://li-perf-test/data/orc_test1/';\n")


In [20]:
'''INSERT INTO `perf`.`orc_test1` SELECT * FROM `perf`.`orc_test0`;'''

'INSERT INTO `perf`.`orc_test1` SELECT * FROM `perf`.`orc_test0`;'