/
deleteUserRecords.py
99 lines (91 loc) · 2.99 KB
/
deleteUserRecords.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python
"""
Below are the 3rd parties this script includes
1. pg8000
(Github: https://github.com/mfenniak/pg8000,
License: https://github.com/mfenniak/pg8000/blob/master/LICENSE)
Lines no 28 to 33 use pg8000 to connect to RDS Postgres.
Lines no 43 to 54 use pg8000 to query to RDS Postgres.
"""
import os
import boto3
import pg8000
import botocore
from botocore.exceptions import NoCredentialsError, ClientError
from urllib.parse import urlparse
def get_connection():
try:
print ("Connecting to database")
client = boto3.client("rds")
DBEndPoint = os.environ.get("DBEndPoint")
DatabaseName = os.environ.get("DatabaseName")
DBUserName = os.environ.get("DBUserName")
password = client.generate_db_auth_token(
DBHostname=DBEndPoint, Port=5432, DBUsername=DBUserName
)
conn = pg8000.connect(
host=DBEndPoint,
user=DBUserName,
database=DatabaseName,
password=password,
ssl={"sslmode": "verify-full"},
)
return conn
except Exception as e:
print ("While connecting failed due to :{0}".format(str(e)))
return None
def get_user_files(userids):
try:
myConnection = get_connection()
cur = myConnection.cursor()
sql = """
SELECT s3path,
ARRAY_AGG(recordline)
FROM user_objects
WHERE userid in (%s)
GROUP BY 1
;
""" % ','.join('%s' for i in userids)
cur.execute(sql, userids)
myConnection.commit()
return cur.fetchall()
except Exception as e:
print ("While connecting failed due to :{0}".format(str(e)))
return []
def uploadToS3(client, content, bucket, key):
try:
client.put_object(Body=content, Bucket=bucket, Key=key)
except FileNotFoundError:
print("The file was not found")
return False
except NoCredentialsError:
print("Credentials not available")
return False
def updateFile(client, bucket, object_name, indexList):
s3session = boto3.Session(
).resource('s3')
s3_obj_body = s3session.Object(bucket_name=bucket, key=object_name).get()['Body']
i = 0
content = ''
rowList = []
for row in s3_obj_body._raw_stream:
if i not in indexList:
rowList.append(row)
else:
rowList.append(b'{}\n')
i+=1
content = b''.join(rowList)
uploadToS3(client, content, os.environ.get("DestinationBucket"), object_name)
def lambda_handler(event, context):
s3 = boto3.client('s3')
userids = event['userids']
useridsList = userids.split(",")
res = get_user_files(useridsList)
for row in res:
parsedUri = urlparse(row[0])
indexList = row[1]
bucket = parsedUri.netloc
s3object = parsedUri.path.lstrip('/')
print('Updating file: ' + row[0] + '\n')
updateFile(s3, bucket, s3object, indexList)
print('File updated')