-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3-boto3.py
107 lines (90 loc) · 3.81 KB
/
s3-boto3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# Bari Arviv - s3 operation using boto3
import boto3
import logging
from botocore.client import Config
from botocore.exceptions import ClientError
## Global variables
# Parameters for connection to s3:
REGION = 'XXXXXX'
VERSION = 'XXXXXX'
ACCESS_KEY = 'XXXXXX'
SECRET_ACCESS_KEY = 'XXXXXX'
END_POINT_URL = 'XXXXXX'
# Objects to perform actions: client is swiss knife , resource has all sort of data:
s3_resource = boto3.resource('s3', endpoint_url=END_POINT_URL,
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_ACCESS_KEY,
config=Config(signature_version=VERSION),
region_name=REGION)
s3_client = boto3.client('s3', endpoint_url=END_POINT_URL,
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_ACCESS_KEY,
config=Config(signature_version=VERSION),
region_name=REGION)
""" Functions for bucket operations """
# The function prints the list of existing buckets.
def list_buckets():
buckets = s3_client.list_buckets()
if buckets['Buckets']:
for bucket in buckets['Buckets']:
print(bucket)
# The function performs the operation according to the parameter obtained. In case of
# an exception, print it. You can create, delete and print the list of existing buckets.
def bucket_operations(bucket_name=None, operation='list'):
if operation != 'list' and not bucket_name:
logging.error('The bucket name is %s missing!' % (bucket_name))
return False
try:
if operation == 'delete':
s3_client.delete_bucket(Bucket=bucket_name)
elif operation == 'create':
s3_client.create_bucket(Bucket=bucket_name)
elif operation == 'list':
list_buckets()
else:
logging.error('unknown bucket operation')
return False
except ClientError as err:
logging.error(err)
return False
return True
""" Functions for files operations """
# The function prints the list of files in the bucket.
def list_files(bucket_name):
current_bucket = s3_resource.Bucket(bucket_name)
print('The files in bucket %s:\n' % (bucket_name))
for obj in current_bucket.objects.all():
print(obj.meta.data)
# The function performs the operation according to the parameter obtained. In case of an exception,
# print it. You can upload, download and delete a file and print the list of files inside the bucket.
def file_operations(bucket_name, operation='list', file_name=None, file_location=None):
if not bucket_name:
logging.error('The bucket name is %s missing!' % (bucket_name))
return False
try:
if operation == 'list':
list_files(bucket_name)
elif operation == 'delete':
s3_client.delete_object(Bucket=bucket_name, Key=file_name)
elif operation == 'download':
s3_resource.Bucket(bucket_name).download_file(file_name, file_location)
elif operation == 'upload':
s3_resource.Bucket(bucket_name).upload_file(file_location, file_name)
else:
logging.error('unknown file operation')
return False
except ClientError as err:
logging.error(err)
return False
return True
def main():
bucket_name = 'bari'
# create a new bucket
if bucket_operations(bucket_name, 'create'):
print("Bucket creation completed successfully!")
# print the list of existing buckets
bucket_operations(operation='list')
# print the files in a bucket
file_operations(bucket_name, operation='list')
if __name__ == '__main__':
main()