# Opis działania całego notatnika

### Import Packages

In [1]:
from datetime import datetime
import sqlalchemy
import pandas as pd

### Global Variables

In [2]:
objects_list = ['categories', 'customer','employee_territories', 'employees','order_details', 'orders', 'products',
               'regions','shippers', 'suppliers',]
load_date = str(datetime.now()).replace(' ','_').replace('.','_').replace(':','_')

### Object loading from dropzone to standardised

In [4]:
for i in objects_list:
    try:
        object_name = i 
        print('Loading is starting for:', i)
        %run "C:\PScripts\DEV\LoadToStandardised.ipynb"
        print('Loading is complited for:', i)
        print('--------------------------------------------------------------------')
    except:
        print('ETL for', object_name, 'is not implemented')
        


Loading is starting for: categories
-------------------------
load_date:  2023-01-08_18_44_36_905140
file_name:  categories_2023-01-08_18_44_36_905140
path_dropzone:  C:\Users\1\Documents\filesystem\dropzone\categories.csv
path_raw:  C:\Users\1\Documents\filesystem\raw\categories_2023-01-08_18_44_36_905140.csv
path_standardised:  C:\Users\1\Documents\filesystem\standardised\categories
-------------------------
  categoryName  categoryID                                        description  \
0    Beverages           1            Soft drinks coffees teas beers and ales   
1   Condiments           2  Sweet and savory sauces relishes spreads and s...   
2  Confections           3                  Desserts candies and sweet breads   

                                             picture  
0  0x151C2F00020000000D000E0014002100FFFFFFFF4269...  
1  0x151C2F00020000000D000E0014002100FFFFFFFF4269...  
2  0x151C2F00020000000D000E0014002100FFFFFFFF4269...  
Loading is complited for: categories
----

Loading is complited for: suppliers
--------------------------------------------------------------------


### Target database connection setup

In [None]:
dbschema='stage'
engine = sqlalchemy.create_engine(
    'postgresql+psycopg2://postgres:Witek69Witos@localhost:5432/reporting_db',
    connect_args={'options': '-csearch_path={}'.format(dbschema)})

### Truncate stage tables

In [5]:
connection = engine.connect()
truncate_query = sqlalchemy.text('''TRUNCATE TABLE categories, customer,employee_territories, employees,
                                 order_details, orders, products, regions, shippers, suppliers;''')
connection.execution_options(autocommit=True).execute(truncate_query)
connection.close()

### Object loading from standardised to stage tables

In [21]:
for i in objects_list:
    try:
        object_name = i
        standardised_path = categories_config.get_standardised_path(object_name)
        stardardised_path_with_load_date = standardised_path +'\\'+'loadDate='+load_date
        data_frame_to_postgres = pd.read_parquet(stardardised_path_with_load_date, engine='fastparquet')
        data_frame_to_postgres.to_sql(i,engine, if_exists='append', index = False)
        print("Store loading successful:",object_name)
    except:
        print("Store loading failed:",object_name,)

Store loading successful: categories
Store loading successful: customer
Store loading successful: employee_territories
Store loading successful: employees
Store loading successful: order_details
Store loading successful: orders
Store loading successful: products
Store loading successful: regions
Store loading successful: shippers
Store loading successful: suppliers


### Store tables loading (scd type 1)

In [19]:
connection = engine.connect()
procedure_list = ['''CALL store."pMergeCategories"()''',
                 '''CALL store."pMergeCustomers"()''',
                 '''CALL store."pMergeEmployee_territories"()''',
                 '''CALL store."pMergeEmployees"()''',
                 '''CALL store."pMergeOrder_details"()''',
                 '''CALL store."pMergeOrders"()''',
                 '''CALL store."pMergeProducts"()''',
                 '''CALL store."pMergeRegions"()''',
                 '''CALL store."pMergeShippers"()''',
                 '''CALL store."pMergeSuppliers"()''']
for i in range(len(procedure_list)):
    try:
        exec_procedure_sql = sqlalchemy.text(procedure_list[i])
        connection.execution_options(autocommit=True).execute(exec_procedure_sql)
        print("Store loading successful:", sqlalchemy.text(procedure_list[i]))
    except:
        print("Store loading failed:", sqlalchemy.text(procedure_list[i]))
       
connection.close()

Store loading successful: CALL store."pMergeCategories"()
Store loading successful: CALL store."pMergeCustomers"()
Store loading successful: CALL store."pMergeEmployee_territories"()
Store loading successful: CALL store."pMergeEmployees"()
Store loading successful: CALL store."pMergeOrder_details"()
Store loading successful: CALL store."pMergeOrders"()
Store loading successful: CALL store."pMergeProducts"()
Store loading successful: CALL store."pMergeRegions"()
Store loading successful: CALL store."pMergeShippers"()
Store loading successful: CALL store."pMergeSuppliers"()


### END