# Project 3


In [None]:
#Importing modules

import pandas as pd
import tabula
import yaml
from sqlalchemy import create_engine
from sqlalchemy import inspect
import requests
from database_utils import DatabaseConnector
from data_extraction import DataExtractor
from data_cleaning import DataCleaning
import importlib
import json
import boto3
import re
import numpy as np


#starting local database

#read creds
uploader = DatabaseConnector()
ml_yaml = 'ml_dbs.yaml'
uploader.read_db_creds(ml_yaml)

# initialising and returning an sqlalchemy database engine.

uploading = uploader.init_db_engine_postgresql()



## RDS Database ETL


### 1. Getting data from the RDS database


In [None]:

## read credentials
connector_rds = DatabaseConnector()
ai_core_yaml = 'db_creds.yaml'
connector_rds.read_db_creds(ai_core_yaml)

## initialising and returning an sqlalchemy database engine.
connection_rds = connector_rds.init_db_engine()

## Reading the data from the RDS database
connector_tables_rds = connector_rds.list_db_tables()





### 2. Extracting the table to a pandas DataFrame.

In [None]:
user_table_rds = 'legacy_users'

extractor_rds = DataExtractor(connection_rds)

user_df_rds = extractor_rds.read_rds_table(user_table_rds)
print(user_df_rds.head(5))
print(user_df_rds.info())




### 3.Cleaning data

In [None]:

clean_data_rds = DataCleaning()
clean_df_rds = clean_data_rds.clean_user_data(user_df_rds)



### 4. Loading the data
#### uploading the cleaned dataframe into the local database


In [None]:
# Connect with the local database



#load clean rds dataframe into local database with a new table
uploader.upload_to_db(uploading, clean_df_rds, 'dim_users')





## PDF data: ETL


### 1.Getting the data from PDF & load in df

In [None]:

connector_pdf = DatabaseConnector()
extractor_pdf = DataExtractor(connector_pdf)

link = "https://data-handling-public.s3.eu-west-1.amazonaws.com/card_details.pdf"
extraction_pdf = extractor_pdf.retrieve_pdf_data(link)
print(extraction_pdf.head(5))
print(extraction_pdf.info())

### 2.Clean the pdf df

In [None]:
clean_pdf = DataCleaning()

clean_df_pdf =clean_pdf.clean_card_data(extraction_pdf)


### 3. load into db

In [None]:


uploader.upload_to_db(uploading, clean_df_pdf, 'dim_card_details')

## PDF data: API







### 1. Getting Data from API


In [None]:
headers = {
    'x-api-key': 'yFBQbwXe9J3sd6zWVAMrK6lcxxr0q1lr2PT6DDMX'
}

number_of_stores_endpoint = 'https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/number_stores'

connector_api = DatabaseConnector()

extractor_api = DataExtractor(connector_api)

number_of_stores = extractor_api.list_number_of_stores(number_of_stores_endpoint, headers)
print(f"Number of stores: {number_of_stores}")



### 2. Extracting the table to a pandas DataFrame


In [None]:
retrieve_store_endpoint = 'https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/store_details/2'

stores_df = extractor_api.retrieve_stores_data(retrieve_store_endpoint, headers, 2)



### 3.Cleaning data


In [None]:

clean_stores_api = DataCleaning()
clean_storesdf_api = clean_stores_api.called_clean_store_data(stores_df)

In [None]:
import importlib
import data_extraction, data_cleaning,database_utils
importlib.reload(data_extraction)
importlib.reload(database_utils)
importlib.reload(data_cleaning)
from data_extraction import DataExtractor
from database_utils import DatabaseConnector
from data_cleaning import DataCleaning

### 4. Loading the data



In [None]:
uploader.upload_to_db(uploading, clean_storesdf_api, 'dim_store_details')

## S3 data


### 1. Getting data

In [None]:
connector_s3 = DatabaseConnector()
extractor_s3 = DataExtractor(connector_s3)




### 2. Putting into pd df

In [None]:
bucket = 'data-handling-public'
object_key = "products.csv"
pathway = '/Users/student/AICORE/AWS/Project_3/products.csv'

extraction_s3 = extractor_s3.extract_from_s3(bucket, object_key, pathway)
print(extraction_s3.head(20))
print(extraction_s3.info())

### 3. Cleaning data

In [None]:
clean_s3 = DataCleaning()

clean_df_s3 = clean_s3.called_clean_store_data(extraction_s3)


clean_df_s3 = clean_s3.called_clean_store_data(extraction_s3)
#print('clean_df_s3.head(20)',clean_df_s3.head(20))
#print(clean_df_s3.info())

converted_df_s3 = clean_s3.convert_product_weights(clean_df_s3)
print(converted_df_s3.head())



In [None]:
clean_df_s3 = clean_s3.clean_products_data(converted_df_s3)


### 4. Loading into local db

In [None]:
uploader.upload_to_db(uploading, clean_df_s3, 'dim_products')

## Orders - AWS DB

In [None]:
connector_tables_rds = connector_rds.list_db_tables()

#'orders_table'

In [None]:
orders_table_rds = 'orders_table'

extractor_rds = DataExtractor(connection_rds)

orders_df_rds = extractor_rds.read_rds_table(orders_table_rds)
print(orders_df_rds.head(5))
print(orders_df_rds.info())

In [None]:
clean_ordersdf_rds = clean_data_rds.clean_user_data(orders_df_rds)



In [21]:
## dropping columns

ordersdf_rds = clean_ordersdf_rds.drop(['first_name','last_name','1' ], axis = 1)

print(ordersdf_rds.info())

uploader.upload_to_db(uploading, ordersdf_rds, 'orders_table')

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 120123 entries, 0 to 120122
Data columns (total 8 columns):
 #   Column            Non-Null Count   Dtype 
---  ------            --------------   ----- 
 0   level_0           120123 non-null  int64 
 1   index             120123 non-null  int64 
 2   date_uuid         120123 non-null  object
 3   user_uuid         120123 non-null  object
 4   card_number       120123 non-null  int64 
 5   store_code        120123 non-null  object
 6   product_code      120123 non-null  object
 7   product_quantity  120123 non-null  int64 
dtypes: int64(4), object(4)
memory usage: 7.3+ MB
None
Data uploaded successfully to orders_table table.


## JSON upload

In [22]:
## JSON

connector_json = DatabaseConnector()
extractor_json = DataExtractor(connector_json)
bucket = 'data-handling-public'
object_key_json= "date_details.json"
pathway_json = '/Users/student/AICORE/AWS/Project_3/date_details.json'

extraction_json = extractor_json.extract_from_s3(bucket, object_key_json, pathway_json)
print(extraction_json.head(20))
print(extraction_json.info())



In [None]:
#clean_df_s3 = clean_s3.clean_products_data(converted_df_s3)

#dim_date_times

#uploader.upload_to_db(uploading, clean_df_s3, 'dim_date_times')