In [1]:
import pandas as pd
import boto3
import re
import io

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


# Read Data From S3 Bucket

In [2]:
s3_client = boto3.client('s3')

In [3]:
obj = s3_client.get_object(Bucket="storetransactiondemo1", Key="dirty_store_transactions.csv")

In [4]:
df = pd.read_csv(obj['Body'])

In [5]:
df.head()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date
0,YR7220,New York(,Electronics,12254943,$31,$20.77,$1.86,$29.14,2019-11-26
1,YR7220,New York+,Furniture,72619323C,$15,$9.75,$1.5,$13.5,2019-11-26
2,YR7220,New York,Electronics,34161682B,$88,$62.48,$4.4,$83.6,2019-11-26
3,YR7220,New York!,Kitchen,79411621,$91,$58.24,$3.64,$87.36,2019-11-26
4,YR7220,New York,Fashion,39520263T,$85,$51,$2.55,$82.45,2019-11-26


# Transform and Clean Data

In [6]:
def clean_store_location(st_loc):
        return re.sub(r'[^\w\s]', '', st_loc).strip()

In [7]:
df['STORE_LOCATION'] = df['STORE_LOCATION'].map(lambda x: clean_store_location(x))

In [8]:
df.head()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date
0,YR7220,New York,Electronics,12254943,$31,$20.77,$1.86,$29.14,2019-11-26
1,YR7220,New York,Furniture,72619323C,$15,$9.75,$1.5,$13.5,2019-11-26
2,YR7220,New York,Electronics,34161682B,$88,$62.48,$4.4,$83.6,2019-11-26
3,YR7220,New York,Kitchen,79411621,$91,$58.24,$3.64,$87.36,2019-11-26
4,YR7220,New York,Fashion,39520263T,$85,$51,$2.55,$82.45,2019-11-26


In [9]:
def clean_product_id(pd_id):
        matches = re.findall(r'\d+', pd_id)
        if matches:
            return matches[0]
        return pd_id

In [10]:
df['PRODUCT_ID'] = df['PRODUCT_ID'].map(lambda x: clean_product_id(x))

In [11]:
df.head()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date
0,YR7220,New York,Electronics,12254943,$31,$20.77,$1.86,$29.14,2019-11-26
1,YR7220,New York,Furniture,72619323,$15,$9.75,$1.5,$13.5,2019-11-26
2,YR7220,New York,Electronics,34161682,$88,$62.48,$4.4,$83.6,2019-11-26
3,YR7220,New York,Kitchen,79411621,$91,$58.24,$3.64,$87.36,2019-11-26
4,YR7220,New York,Fashion,39520263,$85,$51,$2.55,$82.45,2019-11-26


In [12]:
def remove_dollar(amount):
    return float(amount.replace('$', ''))

In [13]:
for to_clean in ['MRP', 'CP', 'DISCOUNT', 'SP']:
        df[to_clean] = df[to_clean].map(lambda x: remove_dollar(x))

In [14]:
df.head()

Unnamed: 0,STORE_ID,STORE_LOCATION,PRODUCT_CATEGORY,PRODUCT_ID,MRP,CP,DISCOUNT,SP,Date
0,YR7220,New York,Electronics,12254943,31.0,20.77,1.86,29.14,2019-11-26
1,YR7220,New York,Furniture,72619323,15.0,9.75,1.5,13.5,2019-11-26
2,YR7220,New York,Electronics,34161682,88.0,62.48,4.4,83.6,2019-11-26
3,YR7220,New York,Kitchen,79411621,91.0,58.24,3.64,87.36,2019-11-26
4,YR7220,New York,Fashion,39520263,85.0,51.0,2.55,82.45,2019-11-26


In [16]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 37853 entries, 0 to 37852
Data columns (total 9 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   STORE_ID          37853 non-null  object 
 1   STORE_LOCATION    37853 non-null  object 
 2   PRODUCT_CATEGORY  37853 non-null  object 
 3   PRODUCT_ID        37853 non-null  object 
 4   MRP               37853 non-null  float64
 5   CP                37853 non-null  float64
 6   DISCOUNT          37853 non-null  float64
 7   SP                37853 non-null  float64
 8   Date              37853 non-null  object 
dtypes: float64(4), object(5)
memory usage: 2.6+ MB


# Save Clean Data to S3 Bucket

In [15]:
s3_resource = boto3.resource('s3')

In [16]:
csv_buffer = io.StringIO()

In [17]:
df.to_csv(csv_buffer, index=False)

In [18]:
s3_resource.Object("storetransactiondemo1","clean_transaction/clean_store_transactions_1.csv").put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': 'QJPMEPXF6100R1KY',
  'HostId': 'Z3mOe/YawrxRYIlv7H4xRBcD/uDsAk+Ua+ZCZmkST+B00faiOjjTiHPjMO5flPphHKzcJ84YCq+/7u6uqDizbg==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'Z3mOe/YawrxRYIlv7H4xRBcD/uDsAk+Ua+ZCZmkST+B00faiOjjTiHPjMO5flPphHKzcJ84YCq+/7u6uqDizbg==',
   'x-amz-request-id': 'QJPMEPXF6100R1KY',
   'date': 'Thu, 25 Jan 2024 10:33:00 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"46e8c787e9e1883b09e3c39ff9473825"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 1},
 'ETag': '"46e8c787e9e1883b09e3c39ff9473825"',
 'ServerSideEncryption': 'AES256'}

# Create a SQLModel Model

In [19]:
!pip install sqlmodel

Defaulting to user installation because normal site-packages is not writeable


In [20]:
!pip install pymysql


Defaulting to user installation because normal site-packages is not writeable


In [21]:
# from typing import Optional
#from sqlmodel import Field, SQLModel
#from sqlmodel import create_engine

#username = "admin"
#password = "Ankara06"
#host = "clean-store-transacation-db.cjs0c0aucs2c.eu-central-1.rds.amazonaws.com"
#port = 3306
#database = "clean-store-transacation-db"

import pandas as pd
from sqlalchemy import create_engine

# Replace the following with your RDS MySQL credentials and updated database name
username = "admin"
password = "Ankara06"
host = "awsdataops1.cjs0c0aucs2c.eu-central-1.rds.amazonaws.com"
port = 3306
database = "cleantransacation"  # Updated database name

# Create SQLAlchemy engine
engine = create_engine(f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}", pool_pre_ping=True)


In [29]:
#engine = create_engine(f"mysql+pymysql://admin:Ankara06@clean-store-transacation-db.cjs0c0aucs2c.eu-central-1.rds.amazonaws.com:3306/clean-store-transacation-db")

In [22]:
# Assuming 'your_table_name' is the name of the table you want to create or append data to
table_name = "clean_store_table"

# Write DataFrame to MySQL
df.to_sql(table_name, con=engine, if_exists='replace', index=False)


37853