In [1]:
import pandas as pd
import pyodbc
import psycopg2
import os
from sqlalchemy import create_engine

#SQL server conecction
connection_data = (
    "Driver={ODBC Driver 17 for SQL Server};"
    "Server={ip};"
    "Database=AdventureWorks2022;"
    "UID=`{user}`;"  
    "PWD=`{password}`;"
)

# PostgreSQL connection
postgres_params = {
'host': '{ip}',
'database': 'AdventureWorksDW',
'user': '{user}',
'password': '{password}'
}

In [None]:
sql_files = [
'Tabelas_Base\Sales.SpecialOffer.sql',
'Tabelas_Base\Sales.Currency.sql',
'Tabelas_Base\HumanResources.Department.sql',
'Tabelas_Base\Person.CountryRegion.sql',
'Tabelas_Base\Sales.SalesTerritory.sql',
'Tabelas_Base\Person.StateProvince.sql',
'Tabelas_Base\Production.ProductCategory.sql',
'Tabelas_Base\Production.ProductSubcategory.sql',
'Tabelas_Base\Person.BusinessEntity.sql',
'Tabelas_Base\Person.Person.sql',
'Tabelas_Base\Production.Unitmeasure.sql',
'Tabelas_Base\HumanResources.Employee.sql',
'Tabelas_Base\Sales.SalesPerson.sql',
'Tabelas_Base\Sales.Store.sql',
'Tabelas_Base\Sales.Customer.sql',
'Tabelas_Base\Production.ProductModel.sql',
'Tabelas_Base\Production.Product.sql',
'Tabelas_Base\Person.Address.sql',
'Tabelas_Base\Person.ContactType.sql',
'Tabelas_Base\Person.BusinessEntityContact.sql',
'Tabelas_Base\Person.PhoneNumberType.sql',
'Tabelas_Base\Person.PersonPhone.sql',
'Tabelas_Base\Person.AddressType.sql',
'Tabelas_Base\Person.BusinessEntityAddress.sql',
'Tabelas_Base\Production.ProductPhoto.sql',
'Tabelas_Base\Production.ProductProductPhoto.sql',
'Tabelas_Base\Production.Culture.sql',
'Tabelas_Base\Production.ProductDescription.sql',
'Tabelas_Base\Production.ProductModelProductDescriptionCulture.sql',
'Tabelas_Base\Sales.SalesTerritoryHistory.sql',
'Tabelas_Base\Person.EmailAddress.sql',
'Tabelas_Base\HumanResources.Shift.sql',
'Tabelas_Base\HumanResources.EmployeeDepartmentHistory.sql',
'Tabelas_Base\HumanResources.EmployeePayHistory.sql',
'Tabelas_Base\Production.ProductListPriceHistory.sql'
]

# tabelas postgres
pg_table_names = [
'raw_zone.sales_specialoffer',
'raw_zone.sales_currency',
'raw_zone.humanresources_department',
'raw_zone.person_countryregion',
'raw_zone.sales_salesterritory',
'raw_zone.person_stateprovince',
'raw_zone.production_productcategory',
'raw_zone.production_productsubcategory',
'raw_zone.person_businessentity',
'raw_zone.person_person',
'raw_zone.production_unitmeasure',
'raw_zone.humanresources_employee',
'raw_zone.sales_salesperson',
'raw_zone.sales_store',
'raw_zone.sales_customer',
'raw_zone.production_productmodel',
'raw_zone.production_product',
'raw_zone.person_address',
'raw_zone.person_contacttype',
'raw_zone.person_businessentitycontact',
'raw_zone.person_phonenumbertype',
'raw_zone.person_personphone',
'raw_zone.person_addresstype',
'raw_zone.person_businessentityaddress',
'raw_zone.production_productphoto',
'raw_zone.production_productproductphoto',
'raw_zone.production_culture',
'raw_zone.production_productdescription',
'raw_zone.production_productmodelproductdescriptionculture',
'raw_zone.sales_salesterritoryhistory',
'raw_zone.person_emailaddress',
'raw_zone.humanresources_shift',
'raw_zone.humanresources_employeedepartmenthistory',
'raw_zone.humanresources_employeepayhistory',
'raw_zone.production_productlistpricehistory'
]


def execute_sql_and_insert_to_pg(connection_data, postgres_params, sql_file, pg_table_name):
    try:

        sql_conn = pyodbc.connect(connection_data)
        sql_cursor = sql_conn.cursor()

        with open(sql_file, 'r') as file:
            sql_query = file.read()

       
        sql_cursor.execute(sql_query)

        
        rows = sql_cursor.fetchall()

        
        pg_conn = psycopg2.connect(**postgres_params)
        pg_cursor = pg_conn.cursor()

        
        for row in rows:
            
            row_data = tuple(row)

            pg_cursor.execute(f"INSERT INTO {pg_table_name} VALUES %s", (row_data,))

        
        pg_conn.commit()

        
        sql_cursor.close()
        sql_conn.close()
        pg_cursor.close()
        pg_conn.close()

        print(f'Dados do {sql_file} inserido na tabela {pg_table_name} com sucesso.')

    except Exception as e:
        print(f'Ocorreu um erro:', str(e))



for sql_file, pg_table_name in zip(sql_files, pg_table_names):
    execute_sql_and_insert_to_pg(connection_data, postgres_params, sql_file, pg_table_name)


In [None]:
import traceback

dim_files = [
    'Tabelas_Base\ETL_DIM\DimPromotion.sql',
    'Tabelas_Base\ETL_DIM\DimSalesTerritory.sql',
    'Tabelas_Base\ETL_DIM\DimCurrency.sql',
    'Tabelas_Base\ETL_DIM\DimDepartmentGroup.sql',
    'Tabelas_Base\ETL_DIM\DimGeography.sql',
    'Tabelas_Base\ETL_DIM\DimProductCategory.sql',
    'Tabelas_Base\ETL_DIM\DimProductSubcategory.sql',
    'Tabelas_Base\ETL_DIM\DimProduct.sql',
    'Tabelas_Base\ETL_DIM\DimCustomer.sql',
    'Tabelas_Base\ETL_DIM\DimEmployee.sql',
    'Tabelas_Base\ETL_DIM\DimReseller.sql',
#   'DimDate'_
    'Tabelas_Base\ETL_DIM\DimScenario.sql'
]

pg_dim_table = [
    'public.dimpromotion',
    'public.dimsalesterritory',
    'public.dimcurrency',
    'public.dimdepartmentgroup',
    'public.dimgeography',
    'public.dimproductcategory',
    'public.dimproductsubcategory',
    'public.dimproduct',
    'public.dimcustomer',
    'public.dimemployee',
#   'public.dimdate',
    'public.dimreseller',
    'public.dimscenario',
]


def execute_dim(pg_params, dim_files, pg_dim_table):
    try:
        pg_conn = psycopg2.connect(**pg_params)
        pg_cursor = pg_conn.cursor()

        
        with open(dim_files, 'r') as file:
            sql_pg_query = file.read()

        
        pg_cursor.execute(sql_pg_query)

        
        rows = pg_cursor.fetchall()

        
        placeholders = ','.join(['%s'] * len(rows[0]))
        insert_query = f"INSERT INTO {pg_dim_table} VALUES ({placeholders})"

        
        pg_cursor.executemany(insert_query, rows)

        
        pg_conn.commit()

    except Exception as e:
        print(f'Ocorreu um erro ao processar {dim_files}:', str(e))
        traceback.print_exc()  
    finally:
        
        pg_cursor.close()
        pg_conn.close()

        print(f'Dados do {dim_files} inserido na tabela {pg_dim_table} com sucesso.')

s
for sql_pg_file, pg_dim_name in zip(dim_files, pg_dim_table):
    execute_dim(postgres_params, sql_pg_file, pg_dim_name)

def load_dim_date(postgres_params):
    try:
        df_date = pd.read_csv('DimDate.csv')
        
        pg_conn = psycopg2.connect(**postgres_params)
        pg_cursor = pg_conn.cursor()
        
        rows = [tuple(x) for x in df_date.to_numpy()]
        for row in rows:
            row_data = tuple(row)
            pg_cursor.execute(f"INSERT INTO public.dimdate VALUES %s", (row_data,))
        
        pg_conn.commit()
        pg_cursor.close()
        pg_conn.close()

        print(f'Dados do CSV inseridos com sucesso.')
    except Exception as e:
            print(f'Ocorreu um erro:', str(e))


load_dim_date(postgres_params)