In [1]:
import os
from dotenv import load_dotenv
from data_extraction import DataExtractor
from database_utils import DatabaseConnector
from data_cleaning import DataCleaning


Creating instance of the DatabaseConnector

Initialising the db engine and listing the tables: 
- 'legacy_store_details' 
- 'legacy_users'
- 'orders_table'

Then creating instances of extractor and data cleaning object

In [2]:
RDS_CONNECTOR = DatabaseConnector()
RDS_CONNECTOR.init_db_engine()
RDS_CONNECTOR.list_db_tables()

lamb = DataExtractor()
test_clean = DataCleaning()

['legacy_store_details', 'legacy_users', 'orders_table']


Use read_rds_table to extract the legacy users table

Cleaning and uploading user data

In [3]:
# ['legacy_store_details', 'legacy_users', 'orders_table']
# print(lamb.read_rds_table('legacy_users'))

## User Data
retrieved_user = lamb.read_rds_table('legacy_users')
cleaned_user = test_clean.clean_user_data(retrieved_user)
RDS_CONNECTOR.upload_to_db(cleaned_user, 'dim_users')

15284
UPLOAD SUCCESSFUL


Extracted from pdf file using tabula

Cleaning and uploading card data

In [4]:
## Card Data
retrieved_data = lamb.retrieve_pdf_data('https://data-handling-public.s3.eu-west-1.amazonaws.com/card_details.pdf')
#print(retrieved_data)
cleaned_card_data = test_clean.clean_card_data(retrieved_data)
RDS_CONNECTOR.upload_to_db(cleaned_card_data, 'dim_card_details')

Error importing jpype dependencies. Fallback to subprocess.
No module named 'jpype'


15299
UPLOAD SUCCESSFUL


Find the number of stores which is 451 (retrieved from api)

The extracted the store data 

Cleaning and uploading store data

In [5]:
## Store data
load_dotenv()
api_key = os.getenv("API_KEY")
endpoint = 'https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/number_stores'

headers = {'x-api-key': api_key}

number_of_stores = lamb.list_number_of_stores(endpoint, headers)

other = 'https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/store_details/'
stores_data = lamb.retrieve_stores_data(other, headers, number_of_stores)
cleaned_stores_data = test_clean.clean_store_data(stores_data)
RDS_CONNECTOR.upload_to_db(cleaned_stores_data, 'dim_store_details')

Status Code: 200
API Response: {'statusCode': 200, 'number_stores': 451}
     index                                            address  longitude  \
0        0                                                N/A        NaN   
1        1  Flat 72W, Sally isle, East Deantown, E7B 8EB, ...   51.62907   
2        2        Heckerstraße 4/5, 50491 Säckingen, Landshut   48.52961   
3        3  5 Harrison tunnel, South Lydia, WC9 2BE, Westbury   51.26000   
4        4  Studio 6, Stephen landing, South Simon, B77 2W...   53.02330   
..     ...                                                ...        ...   
445    445  Flat 7, Stephanie lake, Morrisside, HP8 8LH, C...   50.76306   
446    446    Täschestraße 25, 39039 Nördlingen, Kirchlengern   52.20000   
448    448  Studio 8, Moss mall, West Linda, M0E 6XR, High...   51.62907   
449    449               Baumplatz 6, 80114 Kötzting, Bretten   49.03685   
450    450  Gotthilf-Rose-Straße 7/3, 45457 Feuchtwangen, ...   50.64336   

         local

Extract the the products csv from the s3 bucket

Cleaning and uploading product data

In [6]:
### Product Data
s3_address_products = 's3://data-handling-public/products.csv'
extracted_product_data = lamb.extract_from_s3(s3_address_products)
cleaned_product_data = test_clean.clean_products_data(extracted_product_data)
RDS_CONNECTOR.upload_to_db(cleaned_product_data, "dim_products")

Downloading file from s3://data-handling-public/products.csv...
data-handling-public products.csv
data-handling-public products.csv
File has sucessfully retrieved.
Reading the CSV file into DataFrame...
UPLOAD SUCCESSFUL


Use read_rds_table to extract the orders_table

Cleaning and uploading orders data

In [7]:
## Order Data
retrieved_orders = lamb.read_rds_table('orders_table')
cleaned_orders = test_clean.clean_orders_data(retrieved_orders)
RDS_CONNECTOR.upload_to_db(cleaned_orders, 'orders_table')

UPLOAD SUCCESSFUL


In [8]:
print(retrieved_orders.columns)


Index(['level_0', 'index', 'date_uuid', 'first_name', 'last_name', 'user_uuid',
       'card_number', 'store_code', 'product_code', '1', 'product_quantity'],
      dtype='object')


Extract the time data json file from the s3 bucket

Clean and upload the time data df to db

In [9]:
## Time Data
s3_address_date = 'https://data-handling-public/date_details.json'
extracted_date_data = lamb.extract_from_s3(s3_address_date)
cleaned_date = test_clean.clean_date_data(extracted_date_data)
RDS_CONNECTOR.upload_to_db(cleaned_date, 'dim_date_times')

Downloading file from https://data-handling-public/date_details.json...
data-handling-public date_details.json
data-handling-public date_details.json
File has sucessfully retrieved.
Reading the JSON file into DataFrame...
UPLOAD SUCCESSFUL
