In [1]:
import os
import numpy as np
import pandas as pd
import requests
import sqlalchemy
import psycopg2
from sqlalchemy import create_engine
from sqlalchemy.types import Integer, Date, String

In [2]:
# GitHub repository information which contain csv files for database
repository_owner = 'AntonMiniazev'
repository_name = 'online_retail_reporting'
folder_path = 'initial_data_source'

In [3]:
# Set connection parameters
param_dic = {
    "host"      : "localhost",
    "database"  : "fine_delivery",
    "user"      : "postgres",
    "password"  : "postgres",
    "port"      : "5432"
}
schema_name = 'sales'

In [4]:
# Use Github API to get source files from repository
# GitHub API URL to fetch directory contents
api_url = f'https://api.github.com/repos/{repository_owner}/{repository_name}/contents/{folder_path}'

# Send GET request to fetch directory contents
response = requests.get(api_url)

# Check if the request was successful
if response.status_code == 200:
    contents = response.json()
    
    # List to store CSV file links
    csv_links = []
    
    # Iterate over the contents
    for item in contents:
        if item['type'] == 'file' and item['name'].endswith('.csv'):
            # Construct the raw file link
            raw_link = item['download_url']
            
            # Append the raw file link to the CSV links list
            csv_links.append(raw_link)
else:
    print(f"Failed to fetch directory contents. Status Code: {response.status_code}")

In [5]:
# Create the list of columns to be set as integer in database
columns_type_float = ['total_value','quantity','selling_price','cost_of_sales','total_price','cost']

In [6]:
def table_exists(table_name):
    # Check if the table exists in the specified schema
    with engine.connect() as connection:
        query = f"""
        SELECT EXISTS (
            SELECT 1
            FROM information_schema.tables
            WHERE table_schema = '{schema_name}'
            AND table_name = '{table_name}'
        )
        """
        result = connection.execute(query).scalar()
        return result

def create_table_from_csv(csv_path):
    # Extract table name from CSV file name
    table_name = os.path.splitext(os.path.basename(csv_path))[0]

    # Read the CSV file into a pandas DataFrame with all columns as objects
    df = pd.read_csv(csv_path, dtype=object, delimiter = ';', thousands=',')

    # Drop the table if it already exists in the specified schema
    if table_exists(table_name):
        with engine.connect() as connection:
            connection.execute(f"DROP TABLE IF EXISTS {schema_name}.{table_name} CASCADE")
    
    # Set "date" data type for columns with "_date" in their names
    for column in df.columns:
        if "_date" in column.lower():
            df[column] = pd.to_datetime(df[column],format="%Y-%m-%d").dt.date
    # Set "int" data type for columns with "_id" in their names
    for column in df.columns:
        if "_id" in column.lower():
            df[column] = df[column].astype(int)
            
    # Set "float" data type for columns with specific names
    for column in df.columns:
        if any(col in column.lower() for col in columns_type_float):
            df[column] = df[column].astype(float)             
            
    # Create the table with appropriate data types inferred from the DataFrame in the specified schema
    df.to_sql(table_name, con=engine, schema=schema_name, index=False, if_exists='replace')

    print(f"Table '{table_name}' created in schema '{schema_name}'.")

In [5]:
# Create the PostgreSQL engine using SQLAlchemy
engine = create_engine(f'postgresql://{param_dic["user"]}:{param_dic["password"]}@{param_dic["host"]}:{param_dic["port"]}/{param_dic["database"]}')

In [8]:
# Iterate over the CSV files in the folder
for file in csv_links:
    if file.endswith('.csv'):
        create_table_from_csv(file)

Table 'assortment' created in schema 'sales'.
Table 'delivery_types' created in schema 'sales'.
Table 'orders' created in schema 'sales'.
Table 'products' created in schema 'sales'.
Table 'resource' created in schema 'sales'.
Table 'store' created in schema 'sales'.
Table 'zone' created in schema 'sales'.


In [9]:
# Create dictionaty with primary keys for each table
primary_keys = {'assortment': 'product_id'
                ,'delivery_types': 'delivery_type'
                ,'orders': 'order_id'
                ,'resource': 'resource_id'
                ,'store': 'store_id'
                ,'zone': 'zone_id'
               }

In [10]:
# Add primary keys to database tables

with engine.connect() as connection:
    for key in primary_keys:
        connection.execute(f"ALTER TABLE {schema_name}.{key} ADD PRIMARY KEY ({primary_keys[key]})")    

In [11]:
# Create dictionaty with foreign keys for each table with link to related table and columns
# {table:{constraint_name:(related_table, columns)}}

constraints = {'products':{'fk_order_id':('orders','order_id'),
                           'fk_product_id':('assortment','product_id')}
               ,'orders':{'fk_delivery_type':('delivery_types','delivery_type'),
                           'fk_zone_id':('zone','zone_id')}
               ,'zone':{'fk_store_id':('store','store_id')}
               ,'resource':{'fk_store_id':('store','store_id')}             
              }

In [12]:
# Iterate over constraints dictionary to set constraints

with engine.connect() as connection:
    for table in constraints:
        for constraint_name in constraints[table]:    
            connection.execute(f"""
                ALTER TABLE {schema_name}.{table} DROP CONSTRAINT IF EXISTS {constraint_name}
                """)
            
            connection.execute(f"""
                ALTER TABLE {schema_name}.{table}
                ADD CONSTRAINT {constraint_name} FOREIGN KEY ({constraints[table][constraint_name][1]})
                REFERENCES {schema_name}.{constraints[table][constraint_name][0]} ({constraints[table][constraint_name][1]}) MATCH SIMPLE
                ON UPDATE NO ACTION
                ON DELETE NO ACTION
                NOT VALID
                """)

In [6]:
# Create target table with zone economy 
with engine.connect() as connection:
    connection.execute(f"DROP TABLE IF EXISTS sales.zone_economy CASCADE")

with engine.connect() as connection:
    connection.execute("""
    CREATE TABLE IF NOT EXISTS sales.zone_economy (
        delivery_zone TEXT,
        day DATE,
        revenue INT,
        cost_of_sales INT,
        delivery_cost INT,
        store_cost INT,
        gross_margin INT,
        orders INT,
        version timestamptz
    );
    """)

In [7]:
engine.dispose()