- Extract and clean datasets from different sources

In [None]:
#%% import all classes/ methods developed for data manipulations
from data_extraction import DataExtractor
from data_cleaning import DataCleaning
from database_utils import DatabaseConnector

The historical data of users is currently stored in an AWS database in the cloud.
You will now create methods in your DataExtractor and DatabaseConnector class which help extract the information from an AWS RDS database.  
Following the steps:

Create a db_creds.yaml file containing the database credentials

You should add your db_creds.yaml file to the .gitignore file in your repository, so that the database credentials are not uploaded to your public GitHub repository.
If you don't currently have a .gitignore file, you can create one by typing touch .gitignore in the terminal. Then just add the names of any files you don't want git to track.


Now you will need to develop methods in your DatabaseConnector class to extract the data from the database.


Step 2:

Create a method read_db_creds this will read the credentials yaml file and return a dictionary of the credentials.
You will need to pip install PyYAML and import yaml to do this.


Step 3:

Now create a method init_db_engine which will read the credentials from the return of read_db_creds and initialise and return an sqlalchemy database engine.


Step 4:

Using the engine from init_db_engine create a method list_db_tables to list all the tables in the database so you know which tables you can extract data from.
Develop a method inside your DataExtractor class to read the data from the RDS database.


Step 5:

Develop a method called read_rds_table in your DataExtractor class which will extract the database table to a pandas DataFrame.

It will take in an instance of your DatabaseConnector class and the table name as an argument and return a pandas DataFrame.
Use your list_db_tables method to get the name of the table containing user data.
Use the read_rds_table method to extract the table containing user data and return a pandas DataFrame.


Step 6:

Create a method called clean_user_data in the DataCleaning class which will perform the cleaning of the user data.

You will need clean the user data, look out for NULL values, errors with dates, incorrectly typed values and rows filled with the wrong information.


Step 7:

Now create a method in your DatabaseConnector class called upload_to_db. This method will take in a Pandas DataFrame and table name to upload to as an argument.


Step 8:

Once extracted and cleaned use the upload_to_db method to store the data in your sales_data database in a table named dim_users.


In [None]:
###### import table form data from database
get_db = DataExtractor()
#link to the database
engine = get_db.init_db_engine()
#read list in the database
list_table = get_db.list_db_tables()
print(list_table)
#get table and transfer to pandas form
selcted_table = get_db.read_rds_table(list_table[1])
#%%clean table
clean_data = DataCleaning()
cleaned_table = clean_data.clean_user_data(selcted_table)
#%%upload the cleaned table to my sales database
update_db = DatabaseConnector()
update_db.upload_to_db(cleaned_table,'dim_users')

The users card details are stored in a PDF document in an AWS S3 bucket.

Step 1:
Install the Python package tabula-py this will help you to extract data from a pdf document. 


Step 2:
Create a method in your DataExtractor class called retrieve_pdf_data, which takes in a link as an argument and returns a pandas DataFrame.
Use the tabula-py Python package, imported with tabula to extract all pages from the pdf document at following link .
Then return a DataFrame of the extracted data.


Step 3:
Create a method called clean_card_data in your DataCleaning class to clean the data to remove any erroneous values, NULL values or errors with formatting.


Step 4:
Once cleaned, upload the table with your upload_to_db method to the database in a table called dim_card_details.

In [None]:

###### import table form data from database
get_db = DataExtractor()
url = "https://data-handling-public.s3.eu-west-1.amazonaws.com/card_details.pdf"
selcted_table = get_db.retrieve_pdf_data(url)
#%%clean table
clean_data = DataCleaning()
cleaned_table = clean_data.clean_card_details(selcted_table)
#%%upload the cleaned table to my sales database
update_db = DatabaseConnector()
update_db.upload_to_db(cleaned_table,'dim_card_details')

The store data can be retrieved through the use of an API.

The API has two GET methods. One will return the number of stores in the business and the other to retrieve a store given a store number.

To connect to the API you will need to include the API key to connect to the API in the method header.

Create a dictionary to store the header details it will have a key x-api-key.

Step 1:
Create a method in your DataExtractor class called list_number_of_stores which returns the number of stores to extract. It should take in the number of stores endpoint and header dictionary as an argument.


Step 2:
Now that you know how many stores need to be extracted from the API.


Step 3:
Create another method retrieve_stores_data which will take the retrieve a store endpoint as an argument and extracts all the stores from the API saving them in a pandas DataFrame.


Step 4:
Create a method in the DataCleaning class called_clean_store_data which cleans the data retrieve from the API and returns a pandas DataFrame.


Step 5:
Upload your DataFrame to the database using the upload_to_db method storing it in the table dim_store_details.



In [None]:
###### import table form data from database
###### grasp the data
get_db = DataExtractor()
#get table and transfer to pandas form
selcted_table = get_db.retrieve_stores_data()
#%%clean table
clean_data = DataCleaning()
cleaned_table = clean_data.clean_store_data(selcted_table)
#%%upload the cleaned table to my sales database
update_db = DatabaseConnector()
update_db.upload_to_db(cleaned_table,'dim_store_details')

The information for each product the company currently sells is stored in CSV format in an S3 bucket on AWS.

Step 1:
Create a method in DataExtractor called extract_from_s3 which uses the boto3 package to download and extract the information returning a pandas DataFrame.

The S3 address for the products data is the following s3://data-handling-public/products.csv the method will take this address in as an argument and return the pandas DataFrame.


You will need to be logged into the AWS CLI before you retrieve the data from the bucket.

Step 2:
Create a method in the DataCleaning class called convert_product_weights this will take the products DataFrame as an argument and return the products DataFrame.
If you check the weight column in the DataFrame the weights all have different units.
Convert them all to a decimal value representing their weight in kg. Use a 1:1 ratio of ml to g as a rough estimate for the rows containing ml.
Develop the method to clean up the weight column and remove all excess characters then represent the weights as a float.


Step 3:
Now create another method called clean_products_data this method will clean the DataFrame of any additional erroneous values.


Step 4:
Once complete insert the data into the sales_data database using your upload_to_db method storing it in a table named dim_products.

In [None]:
#%%                                 milestone2 task6
########################################################################################
###### import table form data from database
###### grasp the data
get_db = DataExtractor()
#get table and transfer to pandas form
selcted_table = get_db.extract_from_s3('s3://data-handling-public/products.csv')
#%%clean table
clean_data = DataCleaning()
cleaned_table = clean_data.clean_products_data(selcted_table)
#%%upload the cleaned table to my sales database
update_db = DatabaseConnector()
update_db.upload_to_db(cleaned_table,'dim_products')

This table which acts as the single source of truth for all orders the company has made in the past is stored in a database on AWS RDS.


Step 1:
Using the database table listing methods you created earlier list_db_tables, list all the tables in the database to get the name of the table containing all information about the product orders.


Step 2:
Extract the orders data using the read_rds_table method you create earlier returning a pandas DataFrame.


Step 3:
Create a method in DataCleaning called clean_orders_data which will clean the orders table data.

You should remove the columns, first_name, last_name and 1 to have the table in the correct form before uploading to the database.

You will see that the orders data contains column headers which are the same in other tables.

This table will act as the source of truth for your sales data and will be at the center of your star based database schema.


Step 4:
Once cleaned upload using the upload_to_db method and store in a table called orders_table

In [None]:
#grasp data
get_db = DataExtractor()
engine = get_db.init_db_engine()
list_table = get_db.list_db_tables()
#list_table[2] contains the info regarding orders
selcted_table = get_db.read_rds_table(list_table[2])
#clean table
clean_data = DataCleaning()
cleaned_table = clean_data.clean_orders_data(selcted_table)
#upload
update_db = DatabaseConnector()
update_db.upload_to_db(selcted_table,'orders_table')

The final source of data is a JSON file containing the details of when each sale happened, as well as related attributes.

The file is currently stored on S3 and can be found at the following link https://data-handling-public.s3.eu-west-1.amazonaws.com/date_details.json.

Extract the file and perform any necessary cleaning, then upload the data to the database naming the table dim_date_times.

In [None]:
#grasp data
get_db = DataExtractor()
selcted_table = get_db.extract_datetime_url()
#clean table
clean_data = DataCleaning()
cleaned_table = clean_data.clean_datetime_date(selcted_table)
#upload
update_db = DatabaseConnector()
update_db.upload_to_db(selcted_table,'dim_date_times')