In [1]:
!pip install psycopg2



In [15]:
!pip install python-dotenv

Collecting python-dotenv
  Downloading python_dotenv-1.0.0-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.0


In [51]:
## Importing libraries
import psycopg2 # library for working with Postgres database
from dotenv import load_dotenv # library for working with environment variables
import os
import csv
import io
from datetime import datetime

In [2]:
load_dotenv()  # Load environment variables from .env file

True

In [3]:
## Reading connection parameters and others from environment variables (BEST PRACTICE!)
host = os.environ.get('DB_HOST')
dbname = os.environ.get('DB_NAME')
user = os.environ.get('DB_USER')
password = os.environ.get('DB_PASS')
path = os.environ.get('FILE_PATH') 

In [4]:
## Other parameters
tbl_sales = 'Sales_2015'
tbl_products = 'Products'
tbl_territories = 'Territories'
tbl_customers = 'Customers'
tbl_product_subcategories = 'Product_Subcategories'
tbl_control = 'ControlFilesLoaded'


In [None]:
## Start connection to Postgres Database, create cursor and create Postgres tables.
# Using WITH (context manager) to ensure the connection is properly closed (BEST PRACTICE!), 
try:
    with psycopg2.connect(f"host={host} dbname={dbname} user={user} password={password}") as conn: 
        # Create a cursor object
        with conn.cursor() as cursor:
            # Set the datestyle configuration
            cursor.execute("SET datestyle = 'ISO, MDY'")

            # Create tables with primary & foreign keys           
            create_control_table = f"CREATE TABLE IF NOT EXISTS {tbl_control} (FileName VARCHAR, TableName VARCHAR, TimeLoaded TIMESTAMP) "
            create_territory_table = f"CREATE TABLE IF NOT EXISTS {tbl_territories} (SalesTerritoryKey INT PRIMARY KEY, Region VARCHAR, Country VARCHAR, Continent VARCHAR)"
            create_customers_table = f"CREATE TABLE IF NOT EXISTS {tbl_customers} (CustomerKey INT PRIMARY KEY, Prefix VARCHAR, FirstName VARCHAR, LastName VARCHAR, BirthDate DATE, MaritalStatus CHAR(3), Gender CHAR(3), EmailAddress VARCHAR, AnnualIncome VARCHAR, TotalChildren INT, EducationLevel VARCHAR, Occupation VARCHAR, HomeOwner CHAR(3))"
            create_product_sub_table = f"CREATE TABLE IF NOT EXISTS {tbl_product_subcategories} (ProductSubcategoryKey INT PRIMARY KEY, SubcategoryName VARCHAR, ProductCategoryKey INT)"
            create_products_table = f"CREATE TABLE IF NOT EXISTS {tbl_products} (ProductKey INT PRIMARY KEY, ProductSubcategoryKey INT REFERENCES Product_Subcategories(ProductSubcategoryKey), ProductSKU VARCHAR, ProductName VARCHAR, ModelName VARCHAR, ProductDescription VARCHAR, ProductColor VARCHAR, ProductSize VARCHAR, ProductStyle VARCHAR, ProductCost NUMERIC(10,2), ProductPrice NUMERIC(10,2))"
            create_sales_table = f"CREATE TABLE IF NOT EXISTS {tbl_sales} (OrderDate DATE, StockDate DATE, OrderNumber VARCHAR PRIMARY KEY, ProductKey INT REFERENCES Products(ProductKey), CustomerKey INT REFERENCES Customers(CustomerKey), TerritoryKey INT REFERENCES Territories(SalesTerritoryKey), OrderLineItem INT, OrderQuantity INT)"
                      
            cursor.execute(create_control_table)
            cursor.execute(create_territory_table)
            cursor.execute(create_customers_table)
            cursor.execute(create_product_sub_table)
            cursor.execute(create_products_table)
            cursor.execute(create_sales_table)
            
    conn.commit()
except psycopg2.OperationalError as e:
    print("Could not start a connection to the database")
    print(e)
except psycopg2.DatabaseError as e:
    conn.rollback() # Rollback the transaction in case of any error
    print("An error occur while working with the database, Rolling back!")
    print(e)

In [88]:
## Defining a function that returns a list of all csv file names in a specific path
def get_filenames_from_path(path):
    path_files = []
    for filename in os.listdir(path):
        if filename.endswith('.csv'):
            path_files.append(filename)
    return path_files

In [115]:
## Retrieving CSV filse in path using function
path_files = get_filenames_from_path(path)
for file_name in path_files:
    print(file_name)

Customers.csv
Products.csv
Product_Subcategories.csv
Sales_2015.csv
Territories.csv


In [107]:
## Retrieving file names from Control table.
try:
    with psycopg2.connect(f"host={host} dbname={dbname} user={user} password={password}") as conn:
        with conn.cursor() as cursor:
            cursor.execute(f"SELECT filename FROM {tbl_control}")
            rows = cursor.fetchall()
            # Extract filenames from the rows
            control_filenames = [row[0] for row in rows]

except psycopg2.OperationalError as e:
    print("Could not start a connection to the database")
    print(e)
except psycopg2.DatabaseError as e:
    conn.rollback() # Rollback the transaction in case of any error
    print("An error occur while working with the database, Rolling back!")
    print(e)


In [116]:
print(control_filenames)

['Territories.csv', 'Product_Subcategories.csv', 'Customers.csv', 'Products.csv', 'Sales_2015.csv']


In [117]:
## Compare path files  with file names in Control table to get all new csv files
new_files = []

for path_file in path_files:
    if path_file not in control_filenames:
        new_files.append(path_file)

print(new_files)

[]


In [112]:
# Get the right order for loading (loading_order)
order_list = ["territories", "product_subcategories", "customers", "products", 'sales']
loading_order = []

for table in order_list:
    for new_file in new_files:
        if table in new_file.lower():
            loading_order.append(new_file)

print(loading_order)      
    

[]


In [114]:
## A new database connection to load new CSV files into Postgres tables
try:
    with psycopg2.connect(f"host={host} dbname={dbname} user={user} password={password}") as conn:
        with conn.cursor() as cursor:
                # Set the datestyle configuration
                cursor.execute("SET datestyle = 'ISO, MDY'")

                for new_file in loading_order:
                    if "Territories" in new_file:
                        print("Starting to read Territories files!")
                        with open(f'{path}/{new_file}', 'r') as file:
                            next(file) #skip first row
                            cursor.copy_from(file, new_file.lower().replace(".csv","") , sep=',' )
                            print("Territories table loaded successfully!")
                            timestamp = datetime.now()
                            cursor.execute(f"INSERT INTO {tbl_control.lower()} (filename, tablename, timeloaded) VALUES (%s, %s, %s)",(new_file, new_file.lower().replace('.csv',''), timestamp))
                            print("Territories table saved in Control table!")

                    elif "Product_Subcategories" in new_file:
                        print("Starting to read Product_Subcategories files!")
                        with open(f'{path}/{new_file}', 'r') as file:  #ok
                            next(file) #skip first row                
                            cursor.copy_from(file, new_file.lower().replace(".csv",""), sep=',' )
                            print("Product_Subcategories table loaded successfully!")
                            timestamp = datetime.now()
                            cursor.execute(f"INSERT INTO {tbl_control.lower()} (filename, tablename, timeloaded) VALUES (%s, %s, %s)",(new_file, new_file.lower().replace('.csv',''), timestamp))
                            print("Product_Subcategories table saved in Control table!")

                    elif "Products" in new_file:
                        print("Starting to read Products files!")
                        with open(f'{path}/{new_file}', 'r') as file:  # ok
                            file_content = file.read()
                            # Create a file-like object from the string content
                            file_obj = io.StringIO(file_content)
                            # Load the CSV data into the PostgreSQL table using copy_expert()
                            cursor.copy_expert("COPY products FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER TRUE, QUOTE '\"', ESCAPE '\"')", file_obj)
                            print("Product table loaded successfully!")
                            timestamp = datetime.now()
                            cursor.execute(f"INSERT INTO {tbl_control.lower()} (filename, tablename, timeloaded) VALUES (%s, %s, %s)",(new_file, new_file.lower().replace('.csv',''), timestamp))
                            print("Products table saved in Control table!")

                    elif "Customers" in new_file:
                        print("Starting to read Customers files!")
                        with open(f'{path}/{new_file}', 'r', encoding='iso-8859-1') as file:  # ok
                            file_content = file.read()
                            # Create a file-like object from the string content
                            file_obj = io.StringIO(file_content)
                            # Load the CSV data into the PostgreSQL table using copy_expert()
                            cursor.copy_expert("COPY customers FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER TRUE, QUOTE '\"', ESCAPE '\"')", file_obj)
                            print("Customers table loaded successfully!")
                            timestamp = datetime.now()
                            cursor.execute(f"INSERT INTO {tbl_control.lower()} (filename, tablename, timeloaded) VALUES (%s, %s, %s)",(new_file, new_file.lower().replace('.csv',''), timestamp))
                            print("Customers table saved in Control table!")

                    elif "Sales" in new_file:
                        print("Starting to read Sales files!")
                        with open(f'{path}/{new_file}', 'r') as file:  
                            file_content = file.read()
                            # Create a file-like object from the string content
                            file_obj = io.StringIO(file_content)
                            # Load the CSV data into the PostgreSQL table using copy_expert()
                            cursor.copy_expert("COPY sales_2015 FROM STDIN WITH (FORMAT CSV, DELIMITER ',', HEADER TRUE, QUOTE '\"', ESCAPE '\"')", file_obj)        
                            print("Sales table loaded successfully!")
                            timestamp = datetime.now()
                            cursor.execute(f"INSERT INTO {tbl_control.lower()} (filename, tablename, timeloaded) VALUES (%s, %s, %s)",(new_file, new_file.lower().replace('.csv',''), timestamp))
                            print("Sales table saved in Control table!")
                    else:
                         print("Table not identified")

    conn.commit()              
except psycopg2.OperationalError as e:
    print("Could not start a connection to the database")
    print(e)
except psycopg2.DatabaseError as e:
    conn.rollback() # Rollback the transaction in case of any error
    print("An error occur while working with the database, Rolling back!")
    print(e)
        

hola
