# EXTRACT CSV,EXCEL, DATABASE SALES DATA


In [42]:
#Import libraries needed
import psycopg2
import glob
import pandas as pd
from sqlalchemy import create_engine


    
#funtion extracting from excel
def consolidated_sales_excel():
    #excel file path
    excel_file_path = r'C:\Users\\Public\Sales_January_April_2019.xlsx'
    
    sales_excel = pd.read_excel(excel_file_path, sheet_name=None)
    
    excel_df = pd.concat(sales_excel.values(),  ignore_index=True)
        
    return excel_df

#funtion extracting from csv
def consolidated_sales_csv():
    new_folder = r'C:\Users\Public\sales_csv'
    csv_files = glob.glob(f'{new_folder}/*.csv')
    
    if csv_files:
        csv_df=[]
    
        for csv_file in csv_files:
            csv_df.append(pd.read_csv(csv_file))
            
            csv_dfs = pd.concat(csv_df,ignore_index= True)
        
        return csv_dfs
    else:
        print("There is no file in folder.")
        
#funtion extracting from database
def consolidated_sales_db():
    # PostgreSQL connection
    pg_username = 'postgres'
    pg_password = 'posgres1234'
    pg_host = 'localhost'
    pg_port = '5432'
    pg_database = 'sales_sep_dec_2019'
    
    

    # Define PostgreSQL connection string
    pg_connection_string = f'postgresql://{pg_username}:{pg_password}@{pg_host}:{pg_port}/{pg_database}'

    # Create SQLAlchemy engine
    engine = create_engine(pg_connection_string)

    # SQL query to extract data from PostgreSQL table
    sql_query = 'SELECT * FROM sales_db'

    # Use pandas to read the SQL query result into a data frame
    pg_df = pd.read_sql(sql_query, engine)
    
    return pg_df



def consolidated_data():
    result_excel = consolidated_sales_excel()
    result_csv = consolidated_sales_csv()
    result_db = consolidated_sales_db()

    # Concatenate the DataFrames vertically
    consolidated_result = pd.concat([result_excel, result_csv], ignore_index=True)

    #renaming columns
    consolidated_result = consolidated_result.rename(columns={'Order ID': 'order_id',
                                                              'Product': 'product',
                                                              'Quantity Ordered':'quantity_ordered',
                                                              'Price Each':'price_each',
                                                              'Order Date':'order_date',
                                                              'Purchase Address':'address'})
    consolidated_result = pd.concat([consolidated_result, result_db], ignore_index=True)
    
    return consolidated_result

# transform consolidated SALES DATA

In [53]:
def transform_sales(staging_table : pd.DataFrame):
    transformed_table = staging_table.copy()
    
    transformed_table = transformed_table.dropna(how='any')

    # Filter out rows where 'Order Date' is not a valid date
    transformed_table = transformed_table[transformed_table['order_date'] != 'Order Date']

    # Convert 'order_id' to numeric, replace non-numeric values with NaN
    transformed_table['order_id'] = pd.to_numeric(transformed_table['order_id'], errors='coerce')
    transformed_table = transformed_table.dropna(subset =['order_id'])
    transformed_table['order_id'] = transformed_table['order_id'].astype(int)

    # Ensure 'product' column is of string type
    transformed_table['product'] = transformed_table['product'].astype(str)

    # Convert 'quantity_ordered' to numeric, replace non-numeric values with NaN
    transformed_table['quantity_ordered'] = pd.to_numeric(transformed_table['quantity_ordered'], errors='coerce')

    # Convert 'price_each' to numeric, replace non-numeric values with NaN
    transformed_table['price_each'] = pd.to_numeric(transformed_table['price_each'], errors='coerce')

    # Ensure 'purchase_address' column is of string type
    transformed_table['address'] = transformed_table['address'].astype(str)
    
    # Convert 'order_date' to datetime
    transformed_table['order_date'] = pd.to_datetime(transformed_table['order_date'], errors='coerce')
    
    # Extract yy, mm, dd from the order date
    transformed_table['order_year'] = pd.to_datetime(transformed_table['order_date']).dt.year
    transformed_table['order_month'] = pd.to_datetime(transformed_table['order_date']).dt.strftime('%b')
    transformed_table['order_day'] = pd.to_datetime(transformed_table['order_date']).dt.day

    address_split = transformed_table['address'].str.split(', ', expand=True)
    transformed_table['street'] = address_split[0]
    transformed_table['city'] = address_split[1]
    transformed_table['state'] = address_split[2]
    
    zip_split = transformed_table['state'].str.split(' ', expand=True)
    transformed_table['state'] = zip_split[0]
    transformed_table['zip'] = zip_split[1]
    
    transformed_table['country'] = "United States"

    # Rearrange columns
    transformed_table = transformed_table[['order_id',
                                           'order_year', 
                                           'order_month', 
                                           'order_day',
                                           'product', 
                                           'quantity_ordered',
                                           'price_each',
                                           'street', 
                                           'city',
                                           'state',
                                           'zip',
                                           'country']]

    # Remove duplicates
    transformed_table.drop_duplicates(subset=['order_id'], inplace=True)
    
    return transformed_table


In [41]:
staging_table = consolidated_data()
transformed_table = transform_sales(staging_table)
print(transformed_table.info())

<class 'pandas.core.frame.DataFrame'>
Int64Index: 185686 entries, 0 to 186849
Data columns (total 12 columns):
 #   Column            Non-Null Count   Dtype  
---  ------            --------------   -----  
 0   order_id          185686 non-null  int32  
 1   order_year        185686 non-null  int64  
 2   order_month       185686 non-null  object 
 3   order_day         185686 non-null  int64  
 4   product           185686 non-null  object 
 5   quantity_ordered  185686 non-null  int64  
 6   price_each        185686 non-null  float64
 7   street            185686 non-null  object 
 8   city              185686 non-null  object 
 9   state             185686 non-null  object 
 10  zip               185686 non-null  object 
 11  country           185686 non-null  object 
dtypes: float64(1), int32(1), int64(3), object(7)
memory usage: 17.7+ MB
None


# LOAD transformed_table TO DATABASE

In [54]:
import pandas as pd
from sqlalchemy import create_engine,exc

pg_username = 'postgres'
pg_password = 'posgres1234'
pg_host = 'localhost'
pg_port = '5432'
pg_database = 'sales_sep_dec_2019'

def get_engine():
    pg_connection_string = f'postgresql://{pg_username}:{pg_password}@{pg_host}:{pg_port}/{pg_database}'
    engine = create_engine(pg_connection_string)
    return engine

def load_to_db(transformed_table: pd.DataFrame, table_name: str):
    engine = get_engine()
    
    try:
        transformed_table.to_sql(table_name, con=engine, if_exists="append", index=False)
        print(f'sales data successfully loaded into {table_name} table.')
    except exc.SQLAlchemyError as e:
        print(f'Error loading data into {table_name} table: {e}')
    

In [55]:
staging_table = consolidated_data()
transformed_table = transform_sales(staging_table)
load = load_to_db(transformed_table, table_name='annual_sales_2019')
print(load)

sales data successfully loaded into annual_sales_2019 table.
None
