-
Notifications
You must be signed in to change notification settings - Fork 860
/
load_s3.py
81 lines (61 loc) · 2.75 KB
/
load_s3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import boto3, os, pathlib, logging
from . import load_pd
from .. import s3_utils
logger = logging.getLogger(__name__)
def list_bucket_s3(bucket):
logger.log(15, 'Listing s3 bucket: '+str(bucket))
s3bucket = boto3.resource('s3')
my_bucket = s3bucket.Bucket(bucket)
files = []
for object in my_bucket.objects.all():
files.append(object.key)
logger.log(15, str(object.key))
return files
def download(input_bucket, input_prefix, local_path):
directory = os.path.dirname(local_path)
pathlib.Path(directory).mkdir(parents=True, exist_ok=True)
s3 = boto3.resource('s3')
s3.Bucket(input_bucket).download_file(input_prefix, local_path)
def list_bucket_prefix_s3(bucket, prefix):
return list_bucket_prefix_suffix_s3(bucket=bucket, prefix=prefix)
def list_bucket_prefix_suffix_s3(bucket, prefix, suffix=None, banned_suffixes=None):
if banned_suffixes is None:
banned_suffixes = []
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(bucket)
prefix = prefix
files = []
for object_summary in my_bucket.objects.filter(Prefix=prefix):
suffix_full = object_summary.key.split(prefix, 1)[1]
is_banned = False
for banned_suffix in banned_suffixes:
if banned_suffix in suffix_full:
is_banned = True
if (not is_banned) and ((suffix is None) or (suffix in suffix_full)):
files.append(object_summary.key)
return files
def list_bucket_prefix_suffix_contains_s3(bucket, prefix, suffix=None, banned_suffixes=None, contains=None):
if banned_suffixes is None:
banned_suffixes = []
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(bucket)
prefix = prefix
files = []
for object_summary in my_bucket.objects.filter(Prefix=prefix):
suffix_full = object_summary.key.split(prefix, 1)[1]
is_banned = False
for banned_suffix in banned_suffixes:
if banned_suffix in suffix_full:
is_banned = True
if (not is_banned) and ((suffix is None) or (suffix in suffix_full)) and contains in suffix_full:
files.append(object_summary.key)
return files
def load_multipart_s3(bucket, prefix, columns_to_keep=None, dtype=None, sample_count=None):
files = list_bucket_prefix_s3(bucket, prefix)
files_cleaned = [file for file in files if prefix + '/part-' in file]
paths_full = [s3_utils.s3_bucket_prefix_to_path(bucket=bucket, prefix=file, version='s3') for file in files_cleaned]
if sample_count is not None:
logger.log(15, 'Taking sample of '+str(sample_count)+' of '+str(len(paths_full))+' s3 files to load')
paths_full = paths_full[:sample_count]
df = load_pd.load(path=paths_full, columns_to_keep=columns_to_keep, dtype=dtype)
return df