In [None]:
import io
import re
import boto3
import logging
import requests
import pymysql
import pandas as pd
from sqlalchemy import create_engine
from botocore.exceptions import ClientError

In [None]:
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')
logger = logger = logging.getLogger("read_s3_upload_rds")

In [None]:
class GlobalVariables:
    bucket_region = 'eu-central-1'
    data_url = 'https://raw.githubusercontent.com/dogukannulu/datasets/master/dirty_store_transactions.csv'
    bucket_name = '<unique_bucket_name>'
    bucket_key = '<datasets_path>/dirty_store_transactions.csv'
    database_name = '<database_name>'
    database_username = '<user_name>'
    database_password = '<password>'
    database_endpoint = '<database_endpoint>'
    database_port = 3306
    s3_client = boto3.client('s3')
    database_uri = f"mysql+pymysql://{database_username}:{database_password}@{database_endpoint}:{database_port}/{database_name}"


In [None]:
class ModifyColumns:
    def extract_city_name(self, string):
        cleaned_string = re.sub(r'[^\w\s]', '', string)
        city_name = cleaned_string.strip()
        return city_name

    def extract_only_numbers(self, string):
        numbers = re.findall(r'\d+', string)
        return ''.join(numbers)

    def extract_floats_without_sign(self, string):
        string_without_dollar = string.replace('$', '')
        return float(string_without_dollar)

In [None]:
s3=GlobalVariables.s3_client
name=GlobalVariables.bucket_name
region=GlobalVariables.bucket_region
url=GlobalVariables.data_url
key=GlobalVariables.bucket_key

In [None]:
s3.create_bucket(
    Bucket = name,
    CreateBucketConfiguration = {
    'LocationConstraint': region
    }
)

In [None]:
response = requests.get(url)
s3.put_object(Body=response.content, Bucket=name, Key=key)

In [None]:
get_response = s3.get_object(Bucket=name, Key=key)
file_content = get_response['Body'].read()
df = pd.read_csv(io.BytesIO(file_content))

In [None]:
modify_columns = ModifyColumns()

df['STORE_LOCATION'] = df['STORE_LOCATION'].apply(modify_columns.extract_city_name)
df['PRODUCT_ID'] = df['PRODUCT_ID'].apply(modify_columns.extract_only_numbers)

column_list = ['MRP','CP','DISCOUNT','SP']
for i in column_list:
    df[i] = df[i].apply(modify_columns.extract_floats_without_sign)

In [None]:
table_name = 'clean_transaction'
sql_query = f"SELECT * FROM {table_name}"
database_uri=GlobalVariables.database_uri

In [None]:
engine = create_engine(database_uri)
df.to_sql(table_name, con=engine, if_exists='replace', index=False)

In [None]:
uploaded_df = pd.read_sql(sql_query, engine)
print(uploaded_df.head())