In [None]:
!pip install boto3
# !pip install s3fs

In [None]:
!pip install opendatasets --upgrade
!pip install mysql-connector-python==8.2.0

- [Hands-on Cloud S3 - Tutorial](https://hands-on.cloud/boto3-s3-tutorial/)
- [Analyticsvidhya S3 Tutorial](https://www.analyticsvidhya.com/blog/2022/12/using-aws-s3-with-python-boto3/)

In [None]:
# interact with aws
import boto3

# system manipulation
import os
import io
import pathlib
from glob import glob
import uuid
from getpass import getpass

# to handle  data retrieval
import urllib3
from urllib3 import request

# to handle certificate verification
import certifi

# to manage json data
import json
# import geopandas as gpd

# for pandas dataframes
import pandas as pd

# datetime, argparse
import time
import datetime
from math import ceil
import argparse

# a simple logging message
import logging

# using request library
import requests

import opendatasets as od

In [None]:
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

In [None]:
# config our logging to write to an output file
logging.basicConfig(level=logging.INFO,
                    # filename='log.log',
                    format="%(asctime)s - %(levelname)s - %(message)s",
                    # filemode='w',
                    handlers=[
                                logging.FileHandler("debug.log"),
                                logging.StreamHandler()
                            ],
                    force=True # logging.basicConfig can be run just once, we use "force=True" to reset any previous configuration
                    )

In [None]:
os.environ["AWS_DEFAULT_REGION"] = 'us-east-2' # change to your own region
os.environ["AWS_ACCESS_KEY_ID"] = getpass('Enter AWS Access Key ID: ') #'*********AHZ4IVO******'
os.environ["AWS_SECRET_ACCESS_KEY"] = getpass('Enter AWS Secret Access Key: ') #'****4W4*******QW1W*****************'

In [None]:
def create_bucket_name(bucket_prefix):
    return ''.join([bucket_prefix, str(uuid.uuid4())])

In [None]:
def create_bucket(S3_BUCKET_PREFIX, s3):
    session = boto3.session.Session()
    AWS_REGION = session.region_name
    
    S3_BUCKET_NAME = create_bucket_name(S3_BUCKET_PREFIX)
    if AWS_REGION == 'us-east-1':
        response = s3.create_bucket(Bucket=S3_BUCKET_NAME)
    else:
        location = {'LocationConstraint': AWS_REGION}
        response = s3.create_bucket(Bucket=S3_BUCKET_NAME,
                                    CreateBucketConfiguration=location)
    print(f"Amazon S3 {S3_BUCKET_NAME} bucket has been created in {AWS_REGION}")
    return S3_BUCKET_NAME, response

In [None]:
AWS_REGION = boto3.session.Session().region_name

S3_BUCKET_PREFIX = "service-call-dc-"

BASE_DIR = os.getcwd() #pathlib.Path(__file__).parent.resolve()

s3_client = boto3.client("s3", region_name=AWS_REGION)

In [None]:
S3_BUCKET_NAME, response = create_bucket(S3_BUCKET_PREFIX, s3_client)

In [None]:
s3_resource = boto3.resource(
    service_name='s3',
    region_name=AWS_REGION,
    aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
    aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"]
)
# S3_BUCKET_NAME, response = create_bucket(S3_BUCKET_PREFIX, s3_resource)

## Listing Existing Buckets

### Listing S3 Buckets using Boto3 client

In [None]:
response = s3_client.list_buckets()
print("Listing Amazon S3 Buckets:")
for bucket in response['Buckets']:
    print(f"-- {bucket['Name']}")

### Listing S3 Buckets using Boto3 resource

In [None]:
# Print out bucket names
for bucket in s3_resource.buckets.all():
    print(bucket.name)

In [None]:
def bucket_exists_cli(bucketName):
    response = s3_client.list_buckets()
    for bucket in response['Buckets']:
        if bucketName == bucket['Name']:
            return True
    return False

def bucket_exists_res(bucket):
    return s3_resource.Bucket(bucket) in s3_resource.buckets.all()


def upload_path(local_directory, bucket, destination, certain_upload=False):

  # enumerate local files recursively
    for root, dirs, files in os.walk(local_directory):

        for filename in files:

            # construct the full local path
            local_path = os.path.join(root, filename)

            # construct the full Dropbox path
            relative_path = os.path.relpath(local_path, local_directory)
            s3_path = os.path.join(destination, relative_path)

            if certain_upload:
                s3_client.upload_file(local_path, bucket, s3_path)
                return

            print('Searching "%s" in "%s"' % (s3_path, bucket))
            try:
                s3_client.head_object(Bucket=bucket, Key=s3_path)
                # print("Path found on S3! Skipping %s..." % s3_path)
            except:
                print("Uploading %s..." % s3_path)
                s3_client.upload_file(local_path, bucket, s3_path)

### How to enable S3 Bucket versioning using Boto3?

In [None]:
# s3_resource = boto3.resource("s3", region_name=AWS_REGION)

def enable_version(bucket_name):
    versioning = s3_resource.BucketVersioning(bucket_name)
    versioning.enable()
    print(f'S3 Bucket versioning: {versioning.status}')

buckets = s3_resource.buckets.all()

for mybucket in buckets:
    enable_version(mybucket.name)

## Retriving From API And Normlizing Dict

In [None]:
def flat_items(d, key_separator='.'):
    """
    Flattens the dictionary containing other dictionaries like here: https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys

    >>> example = {'a': 1, 'c': {'a': 2, 'b': {'x': 5, 'y' : 10}}, 'd': [1, 2, 3]}
    >>> flat = dict(flat_items(example, key_separator='_'))
    >>> assert flat['c_b_y'] == 10
    """
    for k, v in d.items():
        if type(v) is dict:
            for k1, v1 in flat_items(v, key_separator=key_separator):
                yield key_separator.join((k, k1)), v1
        else:
            yield k, v

In [None]:
dataset_url = 'https://opendata.arcgis.com/api/v3/datasets/14faf3d4bfbe4ca4a713bf203a985151_0/downloads/data?format=geojson&spatialRefId=4326&where=1%3D1'
url_name = dataset_url.split('/')[-1]

od.download(dataset_url, 'data')

# Source
src = f'data/{url_name}'

# Destination
dest = 'data/All_311_City_Service_Requests_-_Last_30_Days.geojson'

In [None]:
data = pd.read_json('./data/All_311_City_Service_Requests_-_Last_30_Days.geojson', orient='index').T.to_dict()

In [None]:
service_req_df_combine = []
serviec_req_cols = []
strt = time.time()

for item in data['features'][0]:
    if len(serviec_req_cols) < 1:
        serviec_req_cols = [k for k,_ in list(flat_items(item, key_separator='.'))]
    service_req_df_combine.append([v for _,v in list(flat_items(item, key_separator='.'))])

service_req_df_json = pd.DataFrame(service_req_df_combine, columns=serviec_req_cols)
endt = time.time()
print('Time Taken: ', endt - strt)
service_req_df_json.head()

## How to upload file to S3 Bucket using Boto3?

#### Read the AWS S3 file to Pandas DataFrame


In [None]:
working_buckets = list(s3_resource.buckets.all())
working_buckets

In [None]:
S3_BUCKET_NAME = working_buckets[0].name
data_to_upload = {'All_311_City_Service_Requests_-_Last_30_Days.json' : service_req_df_json} #, 'Roadway_Block.geojson':road_block_df_json}

def json_stream(value, S3_BUCKET_NAME, FileName):
    json_buffer = io.StringIO()

    # Create dataframe and convert to pandas
    value.to_json(json_buffer, orient='records')

    response = s3_client.put_object(Body=json_buffer.getvalue(),
                                    Bucket=S3_BUCKET_NAME,
                                    Key=FileName)

def csv_stream(value, S3_BUCKET_NAME, FileName):
    csv_buffer = io.StringIO()

    # Create dataframe and convert to pandas
    value.to_csv(csv_buffer, index=False)

    response=s3_client.put_object(Body=csv_buffer.getvalue(),
                                Bucket=S3_BUCKET_NAME,
                                Key=FileName)


def upload_to_s3(S3_BUCKET_NAME, data_to_upload, folder_prefix, format='csv'):
    for key, value in data_to_upload.items():
        FileName = f'{folder_prefix}_{datetime.datetime.now().date()}/{key}'
        if format=='csv':
            csv_stream(value, S3_BUCKET_NAME, FileName)
        else:
            json_stream(value, S3_BUCKET_NAME, FileName)

    
def download_from_s3(S3_BUCKET_NAME, folder_prefix, format='csv'):
    my_dict = {}
    for key in s3_client.list_objects(Bucket=S3_BUCKET_NAME, Prefix=folder_prefix)['Contents']:
        print(key['Key'])
        df_name = key['Key'].split('/')[-1].split('.')[0].split('-')[0]

        obj = s3_client.get_object(Bucket= S3_BUCKET_NAME ,
                                   Key = key['Key'])

        if format == 'csv':
            my_dict[df_name]  = pd.read_csv(io.BytesIO(obj['Body'].read()), parse_dates=True, infer_datetime_format=True, encoding='utf8')
        else:
            my_dict[df_name]  = pd.read_json(io.BytesIO(obj['Body'].read()), orient='records', encoding='utf8')
        print(f"{df_name} downloaded successfully")
    return my_dict

In [None]:
folder_prefix='01_data_collection/01_raw_data_'
upload_to_s3(S3_BUCKET_NAME, data_to_upload, folder_prefix=folder_prefix, format='json')
my_dict = download_from_s3(S3_BUCKET_NAME, folder_prefix, format='json')

In [None]:
service_df = my_dict['All_311_City_Service_Requests_']

In [None]:
service_df.tail(10)

### Data Cleaning

In [None]:
service_df.head()

In [None]:
# Column Cleaning
service_df.columns = [col.split('.')[-1].lower() if 'geometry' not in col else col.replace('.', '_') for col in service_df.columns]

#### Data format Revision

In [None]:
service_df.info()

In [None]:
col_types = {'adddate': 'datetime64[ns]', 'resolutiondate':'datetime64[ns]', 'serviceduedate':'datetime64[ns]',
             'serviceorderdate':'datetime64[ns]', 'inspectionflag':'str', 'inspectiondate':'datetime64[ns]',
             'inspectorname':'str', 'status_code':'str', 'zipcode':'str', 'ward':'str', 'creator':'str',
             'created':'datetime64[ns]', 'editor':'str', 'edited':'datetime64[ns]'}

service_df = service_df.astype(col_types)

In [None]:
service_df.info()

In [None]:
dtype_dict = service_df.dtypes.to_dict()

In [None]:
dtype_dict

#### Adddress Parsing

In [None]:
# No need for address parsin, it has been taken care of already
service_df.loc[:,['streetaddress', 'xcoord', 'ycoord', 'latitude', 'longitude',
                  'city', 'state', 'zipcode', 'maraddressrepositoryid', 'ward']]

In [None]:
S3_BUCKET_NAME = working_buckets[0].name
data_to_upload = {'Staging_All_311_City_Service_Requests_-_Last_30_Days.csv': service_df}

folder_prefix='first_staging/02_staging_phase_'
upload_to_s3(S3_BUCKET_NAME, data_to_upload, folder_prefix=folder_prefix, format='csv')
my_dict = download_from_s3(S3_BUCKET_NAME, folder_prefix, format='csv')

In [None]:
staged_service_df = my_dict['Staging_All_311_City_Service_Requests_']
staged_service_df.head()

In [None]:
staged_service_df = staged_service_df.astype(dtype_dict)

In [None]:
staged_service_df.info()

#### Data Validation

These are removed since they contain no value
- ['status_code', 'inspectionflag', 'inspectiondate', 'inspectorname', 'details', 'gis_id']

No records of them in the MetaData
- globalid, creator, created, editor, edited,

Singular value
- ['type', 'city', 'state', 'geometry_type]

Important Notice:
1. We would not remove status_code, reason is because "This field was replaced by SERVICEORDERSTATUS for requests resolved as of 6/15/19. Prior to this date, one or both fields may be used".
2. details column contains Information about the action expected to fulfill the request or otherwise address the information reported. This column might be important for text processing in future so won't be removed.
3. city, state though contains single value, but might be important when working with service requestd of all cty and state in the US.

In [None]:
drop_cols_service_df = staged_service_df.drop(['type', 'inspectionflag', 'inspectiondate', 'inspectorname', 'gis_id', 'globalid', 'creator', 'created', 'editor', 'edited', 'geometry_type'], axis=1)
drop_cols_service_df

#### Data De_duplication

Duplicte columns with Lat and Long
- coordinates

In [None]:
de_duplicate_service_df = drop_cols_service_df.drop(['geometry_coordinates'], axis=1)
de_duplicate_service_df.head()

In [None]:
dtype_dict = de_duplicate_service_df.dtypes.to_dict()

In [None]:
S3_BUCKET_NAME = working_buckets[0].name
data_to_upload = {'Final_Staging_All_311_City_Service_Requests_-_Last_30_Days.csv': de_duplicate_service_df}

folder_prefix='final_staging/03_final_staging_phase_'
upload_to_s3(S3_BUCKET_NAME, data_to_upload, folder_prefix=folder_prefix, format='csv')
my_dict = download_from_s3(S3_BUCKET_NAME, folder_prefix, format='csv')

Creating Facts and Dimension Tables

In [None]:
final_staging_service_df = my_dict['Final_Staging_All_311_City_Service_Requests_']
final_staging_service_df = final_staging_service_df.astype(dtype_dict)

In [None]:
final_staging_service_df.info()

In [None]:
final_staging_service_df.head()

In [None]:
service_info = final_staging_service_df.loc[:,['servicecode', 'servicecodedescription', 'servicetypecodedescription', 'organizationacronym']]
service_info.drop_duplicates(subset="servicecode", inplace=True)
service_info = service_info.reset_index(drop=True)
service_info

In [None]:
service_info.info()

In [None]:
directory = 'output'
if not os.path.exists(directory):
    os.makedirs(directory)
service_info.to_csv('output/dimServiceType.csv', index=False)

In [None]:
def get_time(x):
    if x >= 5 and x < 12:
        return 'morning'
    elif x >= 12 and x < 17:
        return 'afternoon'
    elif x >= 17 and x < 21:
        return 'evening'
    else:
        return 'night'

get_time(21)

In [None]:
start_date = final_staging_service_df['adddate'].min() - pd.DateOffset(years= 1)
end_date = final_staging_service_df['serviceduedate'].max() + pd.DateOffset(years= 1)

request_date = pd.DataFrame(pd.date_range(start=start_date,
                           end=end_date, freq='1H', normalize=True), columns=['Date']) #final_staging_service_df['serviceduedate'].max()

request_date['year'] = request_date['Date'].dt.year
request_date['months'] = request_date['Date'].dt.month
request_date['day'] = request_date['Date'].dt.day
request_date['week'] = request_date['Date'].dt.isocalendar().week
request_date['dayofweek'] = request_date['Date'].dt.dayofweek
request_date['quarter'] = request_date['Date'].dt.quarter
request_date['is_weekend'] = request_date['dayofweek'].apply(lambda x: True if x in [5,6] else False)
request_date['hour'] = request_date['Date'].dt.hour
request_date['time_of_day'] = request_date['hour'].apply(lambda x: get_time(x))

request_date

In [None]:
request_date.info()

In [None]:
request_date.to_csv('output/dimDate.csv', index=False)

In [None]:
status_col = ['priority', 'serviceorderstatus', 'status_code',]

service_status = final_staging_service_df.loc[:,status_col]
service_status.drop_duplicates(subset=["priority", "serviceorderstatus"], inplace=True)
service_status.dropna(subset=['priority'], inplace=True)
service_status = service_status.reset_index(drop=True).reset_index()
service_status['index'] = service_status['index'].apply(lambda x : x+1)
service_status.rename(columns={'index':'status_id'}, inplace=True)
service_status

In [None]:
service_status.info()

In [None]:
service_status.to_csv('output/dimServiceStatus.csv', index=False)

In [None]:
address_col = ['streetaddress', 'xcoord', 'ycoord', 'latitude', 'longitude', 'city', 'state', 'zipcode', 'maraddressrepositoryid', 'ward']

address_details = final_staging_service_df.loc[:, address_col]
address_details.drop_duplicates(subset=["streetaddress"], inplace=True)
address_details = address_details.reset_index(drop=True).reset_index()
address_details['index'] = address_details['index'].apply(lambda x : x+1)
address_details.rename(columns={'index':'address_id'}, inplace=True)
address_details

In [None]:
address_details.info()

In [None]:
address_details.to_csv('output/dimLocation.csv', index=False)

In [None]:
final_staging_service_df.shape

In [None]:
serviceFacts = final_staging_service_df.drop(['servicecodedescription', 'servicetypecodedescription', 'organizationacronym'], axis=1)
serviceFacts = serviceFacts.merge(service_status, left_on=['serviceorderstatus', 'priority'], right_on=['serviceorderstatus','priority'])
serviceFacts.drop(['serviceorderstatus', 'priority','status_code_y','status_code_x'], axis=1, inplace=True)
address_col.remove('streetaddress') # removes streetaddress from the list, because it would be used for merging the two dataframe
serviceFacts.drop(address_col, axis=1, inplace=True)
serviceFacts = serviceFacts.merge(address_details, left_on=['streetaddress'], right_on=['streetaddress'])
address_col.insert(0,'streetaddress') # insertng the streetaddress so the column could be droppped
serviceFacts.drop(address_col, axis=1, inplace=True)

# below code changes the time to nearest hour
serviceFacts.loc[:,['adddate', 'resolutiondate', 'serviceduedate', 'serviceorderdate']] = \
            serviceFacts[['adddate', 'resolutiondate', 'serviceduedate', 'serviceorderdate']].apply(lambda x: x.round('H'))

# serviceFacts
serviceFacts = serviceFacts[['servicerequestid','servicecode', 'status_id', 'address_id', 'adddate', \
                             'resolutiondate','serviceduedate', 'serviceorderdate', 'details', 'servicecallcount']]
serviceFacts

In [None]:
serviceFacts.info()

In [None]:
serviceFacts.to_csv('output/serviceFacts.csv', index=False)

### Uploading multiple files to S3 bucket

In [None]:
files = glob(f"./output/*.csv")
files

In [None]:
# data_to_upload = {'All_311_City_Service_Requests_-_Last_30_Days.geojson' : service_req_df_json} #, 'Roadway_Block.geojson':road_block_df_json}

def json_stream(value, S3_BUCKET_NAME, FileName, has_date=True):
    json_buffer = io.StringIO()

    if has_date:
        # Create dataframe and convert to pandas
        value.to_json(json_buffer, orient='records', date_format = 'iso', date_unit='s')
    else:
        # Create dataframe and convert to pandas
        value.to_json(json_buffer, orient='index', index=True)

    response = s3_client.put_object(Body=json_buffer.getvalue(),
                                    Bucket=S3_BUCKET_NAME,
                                    Key=FileName)

In [None]:
serviceFacts.head()

In [None]:
S3_BUCKET_NAME = working_buckets[0].name
data_to_upload = {'dimLocation.json' : address_details,
                  'dimDate.json' :  request_date,
                  'serviceFacts.json' : serviceFacts,
                  'dimServiceType.json' : service_info,
                  'dimServiceStatus.json' : service_status}

def upload_final_to_s3(S3_BUCKET_NAME, data_to_upload, folder_prefix, format='csv'):
    for key, value in data_to_upload.items():
        FileName = f'{folder_prefix}_{datetime.datetime.now().date()}/{key}'
        if format=='csv':
            csv_stream(value, S3_BUCKET_NAME, FileName)
        elif format=='json':
            json_stream(value, S3_BUCKET_NAME, FileName)
        dtt = pd.DataFrame([dict(zip(value.dtypes.keys(),[str(col).replace('|','') for col in value.dtypes.values]))]).T
        json_stream(dtt, S3_BUCKET_NAME, f"{FileName.split('.')[0]}Datatype.json", has_date=False)

def download_final_from_s3(S3_BUCKET_NAME, folder_prefix, format='csv'):
    my_dict = {}

    for key in s3_client.list_objects(Bucket=S3_BUCKET_NAME, Prefix=folder_prefix)['Contents']:
        print(key['Key'])

        df_name = key['Key'].split('/')[-1].split('.')[0].split('-')[0]

        obj = s3_client.get_object(Bucket= S3_BUCKET_NAME ,
                                   Key = key['Key'])

        if key['Key'].split('.')[-1] == 'csv':
            my_dict[df_name]  = pd.read_csv(io.BytesIO(obj['Body'].read()), parse_dates=True, infer_datetime_format=True, encoding='utf8')
        elif key['Key'].split('.')[-1] == 'json':
            my_dict[df_name]  = pd.read_json(io.BytesIO(obj['Body'].read()), orient='records', encoding='utf8')
    return my_dict

folder_prefix='transformed_data/output'
upload_final_to_s3(S3_BUCKET_NAME, data_to_upload, folder_prefix=folder_prefix, format='json')
my_dict = download_final_from_s3(S3_BUCKET_NAME, folder_prefix, format='json')

In [None]:
my_dict.keys()

In [None]:
request_date = my_dict['dimDate']
request_date.head()

In [None]:
request_date.info()

In [None]:
# ['dimDate'], ['dimDateDatatype'], ['dimLocation'], ['dimLocationDatatype'], ['dimServiceStatus'], ['dimServiceStatusDatatype'], ['dimServiceType'], ['dimServiceTypeDatatype'], ['serviceFacts'], ['serviceFactsDatatype']

In [None]:
my_dict['serviceFactsDatatype'].T.to_dict()[0]

In [None]:
for key in list(my_dict.keys())[::2]:
    my_dict[key] = my_dict[key].astype(my_dict[f'{key}Datatype'].T.to_dict()[0])

In [None]:
my_dict['serviceFacts'].info()

In [None]:
mergedf = pd.merge(my_dict['serviceFacts'], my_dict['dimServiceStatus'],on='status_id')
mergedf = pd.merge(mergedf, my_dict['dimServiceType'], on='servicecode' )
mergedf = pd.merge(mergedf, my_dict['dimLocation'], on='address_id' ) 
# mergedf = pd.merge(mergedf, request_date, left_on='serviceorderdate', right_on='Date' ) #, request_date
mergedf['priority'].value_counts()


In [None]:
import mysql.connector
import numpy as np
from sqlalchemy import create_engine
from sqlalchemy import text
from mysql.connector import connect, Error

In [None]:
client = boto3.client("rds", region_name=AWS_REGION)

response = client.describe_db_instances()
print(response)

In [None]:
# from google.colab import userdata
# host = userdata.get('planet_scale_host')
# password = userdata.get('planet_scale_pwd')
# user = userdata.get('planet_scale_username')
# database = "connectdatabase"

In [None]:
# Replace these with your actual values
dbinstance='servicedbmysql'
endpoint = f"{dbinstance}.***********.{AWS_REGION}.rds.amazonaws.com" #  Create a rds mysql instance on the AWS webpage and confirm the endpoint format of your rds
engine = "mysql"
engine_version = "8.0.28"
dbname = "servicecalls"
username = "root"
password = getpass('Enter MySQL Password: ')
host='127.0.0.1'
DBInstanceClass="db.t3.micro"
AllocatedStorage=20
security_groups = getpass('Enter Security Groups separated by a comma: ').split(',') #e.g: fg-0b870chinume0by

def create_rds_database():
    client.create_db_instance(
            DBInstanceIdentifier=dbinstance,
            DBInstanceClass=DBInstanceClass,
            Engine=engine,
            EngineVersion=engine_version,
            DBName=dbname,
            AllocatedStorage=AllocatedStorage,
            MasterUsername=username,
            MasterUserPassword=password,
            Port=3306,
            VpcSecurityGroupIds=security_groups,
            DeletionProtection=True
        )
    print("RDS MySQL database created!")

def check_and_create_rds_database(dbinstance='servicemysql',
                        endpoint = f"**.amazonaws.com",
                        engine = "mysql",
                        engine_version = "8.0.28",
                        dbname = "databasename",
                        username = "root",
                        password = "admin0",
                        DBInstanceClass="db.t3.micro",
                        AllocatedStorage=20
                       ):
    client = boto3.client("rds", region_name=AWS_REGION)

    response = client.describe_db_instances()
    if len(response["DBInstances"])!=0:
        for instance in response["DBInstances"]:
            if dbinstance == instance['DBInstanceIdentifier']:
                print('DB Instance Already Exists')
                break
        else:
            create_rds_database()
    else:
        create_rds_database()

        

        
def create_database(host='127.0.0.1', dbname='postdb', user='user', password='password'):
    try:
        # connect to default database
        conn =  mysql.connector.connect(host=host, user=user, port='3306', password=password)
        cur = conn.cursor()
        
        # create servicecalls database with UTF8 encoding
        cur.execute(f"CREATE DATABASE IF NOT EXISTS {dbname}")
        
        # close connection to default databse
        cur.close()
        conn.close()
        
    except Error as e:
        print(e)
    

def create_db(localmachine=True):
    
    if localmachine:
        create_database(host=host, dbname=dbname, user=username, password=password)        
        
        
    else:
        check_and_create_rds_database(
                            dbinstance=dbinstance,
                            endpoint = endpoint,
                            engine = engine,
                            engine_version = engine_version,
                            dbname = dbname,
                            username = username,
                            password = password,
                            DBInstanceClass=DBInstanceClass,
                            AllocatedStorage=AllocatedStorage
                           )

In [None]:
create_db(localmachine=False)

In [None]:
# Establish a connection to the database
def create_connection(instance='dbmysql', user='admin', password=password, dbname='database', localmachine=True, endpoint=endpoint):
    if localmachine==False:
        connection = mysql.connector.connect(
            host=endpoint,
            user=user,
            password=password,
            database=dbname,
            port='3306'
        )
    else:
        connection =  mysql.connector.connect(host=host, database=dbname, user=user, password=password, port='3306')
    return connection

In [None]:
connection = create_connection(instance=dbinstance, user=username, dbname=dbname, localmachine=False, endpoint=endpoint)
connection

In [None]:
dict_df = {key: my_dict[key] for key in list(my_dict.keys())[::2]}

In [None]:
dict_df['serviceFacts'][dict_df['serviceFacts']['resolutiondate'].isna()] # View NaN values

In [None]:
dict_df = {key: value.fillna(np.nan).replace([np.nan], [None]) for key, value in dict_df.items()} #Change NaN values to None

In [None]:
dict_df['serviceFacts'][dict_df['serviceFacts']['resolutiondate'].isna()]

In [None]:
# Connect to the database
# conn = create_engine(f'mysql+mysqlconnector://{user}:{password}@{host}:3306/{database}')

In [None]:
def get_indices(x: list, value: int) -> list:
    """
    This function gets the index values of any given value from x list
    """
    indices = list()
    i = 0
    while True:
        try:
            # find an occurrence of value and update i to that index
            i = x.index(value, i)
            # add i to the list
            indices.append(i)
            # advance i by 1
            i += 1
        except ValueError as e:
            break
    return indices

n = [1, 2, 3, -50, -60, 0, 6, 9, -60, -60]
print(get_indices(n, -60))

In [None]:

def create_tables(conn):
    str_dt = 'VARCHAR'
    texttype = 'TEXT'
    int_dt = 'INTEGER'
    dec = 'DECIMAL'
    bool_ = 'BOOL'
    date_dt = 'TIMESTAMP'
    
    create_serviceFacts_table_query = f"""
                CREATE TABLE serviceFacts(
                    servicerequestid {str_dt}(25) NOT NULL PRIMARY KEY,
                    servicecode {str_dt}(50),
                    status_id {int_dt},
                    address_id {int_dt},
                    adddate {date_dt},
                    resolutiondate {date_dt},
                    serviceduedate {date_dt},
                    serviceorderdate {date_dt},
                    details {texttype},
                    servicecallcount {int_dt}
                )
                """
    create_dimLocation_table_query = f"""CREATE TABLE dimLocation (
                                        address_id {int_dt} NOT NULL PRIMARY KEY,
                                        streetaddress {str_dt}(100),
                                        xcoord {dec},
                                        ycoord {dec},
                                        latitude {dec}(12,7),
                                        longitude {dec}(12,7),
                                        city {str_dt}(25),
                                        state {str_dt}(25),
                                        zipcode {str_dt}(20),
                                        maraddressrepositoryid {int_dt},
                                        ward {str_dt}(10)
                                        )
                                        """

    create_dimDate_table_query = f"""CREATE TABLE dimDate (
                                        Date {date_dt} NOT NULL PRIMARY KEY,
                                        year {int_dt},
                                        months {int_dt},
                                        day {int_dt},
                                        week {int_dt},
                                        dayofweek {int_dt},
                                        quarter {int_dt},
                                        is_weekend {bool_},
                                        hour {int_dt},
                                        time_of_day {str_dt}(20)
                                        )
                                        """


    create_dimServiceType_table_query = f"""CREATE TABLE dimServiceType (
                                        servicecode {str_dt}(20) NOT NULL PRIMARY KEY,
                                        servicecodedescription {str_dt}(255),
                                        servicetypecodedescription {str_dt}(255),
                                        organizationacronym {str_dt}(50)
                                        )
                                        """
    create_dimServiceStatus_table_query = f"""CREATE TABLE dimServiceStatus (
                                    status_id {int_dt} NOT NULL PRIMARY KEY,
                                    priority {str_dt}(20),
                                    serviceorderstatus {str_dt}(25),
                                    status_code {str_dt}(10)
                                    )
                                    """
    tables = {key.split('.')[0] : [",".join([str(i) for i in values.columns.tolist()]), values] for key, values in dict_df.items()}
 
    with conn.connect() as connection:
        for key, values in dict_df.items():
            tblname = key.split('.')[0]
            insert_value = [",".join([str(i) for i in values.columns.tolist()]), values]
            print(insert_value[1].shape)
            connection.execute(text(f"DROP Table IF EXISTS {tblname}"))
            connection.execute(text(eval(f"create_{tblname}_table_query")))
            print(f'{tblname} created successfully')
            logging.info(f"{tblname} with a defined datatype and constraint created successfully")

            insert_into_table(table_name=tblname, df=insert_value[1], cols=insert_value[0], connection=connection)

        print('All Tables Created and Data Inserted Successfully')

        
def remove_null_col_value(cols, row):
    """
    This function iterates through columns and rows, then removes the index value where in both column and rows where row value is None
    """
    table_col = cols.split(',')
    idxs = get_indices(list(row), None)
    trow = list(row)
    for idx, j in enumerate(idxs):
        table_col.pop(j-idx)
        trow.pop(j-idx)
    row = trow
    table_col = ','.join(table_col)
    return table_col, row


def insert_into_table(table_name, df, cols, connection):
    logging.info(f'Inserting Records into {table_name} table')
   
    # Insert DataFrame recrds one by one.
    for i, row in df.iterrows():
        table_col = cols
        if None in tuple(row):
            table_col, row = remove_null_col_value(cols, row)
        sql = f"INSERT INTO {table_name} ({table_col}) VALUES {tuple(row)}"
        connection.execute(text(sql))
    logging.info(f'{i+1} Records successfully Inserted into {table_name} table')
    print(f'{i+1} Records successfully Inserted')



def save_to_mysql(username, password, host, database):
    # connection = create_connection(instance=dbinstance, user=username, dbname=database)
    # cursor = connection.cursor()
    
    # Connect to the database
    conn = create_engine(f'mysql+mysqlconnector://{username}:{password}@{host}:3306/{database}')
    logging.info('Connected to mySQL Engine')

    print('Creating Tables...')
    create_tables(conn)

# save_to_mysql(username, password, hos, database)
# save_to_mysql(username, password, endpoint, dbname)
save_to_mysql(username, password, host, dbname)

In [None]:
# Select records
def select_records(connection, tablename):
    cursor = connection.cursor()
    cursor.execute(f"SELECT * FROM {tablename} LIMIT 5")
    rows = cursor.fetchall()
    print(rows)
    for row in rows:
        print(row)

In [None]:
connection = create_connection(instance=dbinstance, user=username, dbname=dbname,localmachine=True)
print(connection)


select_records(connection, 'serviceFacts')

In [None]:
# if __name__=="__main__":

#     import argparse
#     parser = argparse.ArgumentParser()

#     #database = "connectdatabase"
#     #os.environ['DB_HOST'] = host # input('Enter MySQL Host Name: ')
#     #os.environ['DB_USERNAME'] = username # input('Enter MySQL Database  Username: ')
#     #os.environ['DB_PASSWORD'] = password #  input('Enter MySQL Database Password: ')

#     # Create another group for authentication
#     auth_group = parser.add_argument_group('Authentication', 'Login credentials')

#     auth_group.add_argument("-u", "--username", help="Username to connect to a database server")
#     auth_group.add_argument("-p", "--password", help="Password to connect to a database server")
#     auth_group.add_argument("-ho", "--host", help="Database server host")
#     auth_group.add_argument("-db", "--database", help="Database name to be connected to")
#     parser.add_argument('-f', '--file', help="Normalize Database", default='Signal_Blog_posts.csv')
#     parser.add_argument('-dn', '--denormalize', type=lambda s: s.lower() in ['true', 't', 'yes', '1'], help="Normalize Database", default=True)
#     parser.add_argument('-ff', '--first_five', type=lambda s: s.lower() in ['true', 't', 'yes', '1'], help="Prints first five records.", default=False)

#     args = parser.parse_args()

#     args = vars(args)
    # save_to_mysql(args['username'], args['password'], args['host'], args['database'], file=args['file'], denormalize=args['denormalize'], first_five=args['first_five'])


## Deleting A S3 Bucket

### Deleting RDS MySQL using Boto3 resource

In [None]:
client = boto3.client('rds', AWS_REGION)
response = client.describe_db_instances()

for instance in response["DBInstances"]:
    print ("About to delete %s" %(instance['DBInstanceIdentifier']))
    if input('Enter Y/N to continue: ').lower()=='y':
        if instance['DeletionProtection']:
            response = client.modify_db_instance(
                                                    DBInstanceIdentifier=instance['DBInstanceIdentifier'],
                                                    DeletionProtection=False
                                                )
        response = client.delete_db_instance(DBInstanceIdentifier=instance['DBInstanceIdentifier'],
                                             SkipFinalSnapshot=True,
                                             DeleteAutomatedBackups=True
        )
        print(f"{instance['DBInstanceIdentifier']} successfully deleted!!!")

### Deleting non-empty S3 Bucket using Boto3

In [None]:
s3_resource = boto3.resource("s3", region_name=AWS_REGION)
buckets = [bucket for bucket in s3_resource.buckets.all() if S3_BUCKET_PREFIX in bucket.name]
print(buckets)

def cleanup_s3_bucket(s3_bucket):
    # Deleting objects
    for s3_object in s3_bucket.objects.all():
        s3_object.delete()
    # Deleting objects versions if S3 versioning enabled
    for s3_object_ver in s3_bucket.object_versions.all():
        s3_object_ver.delete()
    print(f"{mybucket.name} S3 Bucket cleaned up")

for mybucket in buckets:
    s3_bucket = s3_resource.Bucket(mybucket.name)
   
    cleanup_s3_bucket(s3_bucket)
    s3_bucket.delete()
    print(f"{mybucket.name} S3 Bucket deleted")