In [1]:
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

In [2]:
class DataWarehouse:
    def __init__(self, db_url):
        self.db_url = db_url
        self.engine = create_engine(self.db_url)
        self.Session = sessionmaker(bind=self.engine)

    def connect(self):
        try:
            self.connection = self.engine.connect()
            print("Connected to the database successfully!")
        except Exception as e:
            print(f"Error connecting to the database: {e}")

    def init_schema(self, schema_file):
        try:
            with open(schema_file, 'r') as file:
                schema_sql = file.read()
            
            # Start a transaction
            with self.connection.begin() as transaction:
                self.connection.execute(text(schema_sql))
            
            # Check if tables are created
            result = self.connection.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='public';")
            tables = result.fetchall()
            
            print("Schema created successfully! Tables in the database:")
            for table in tables:
                print(table[0])
        except Exception as e:
            print(f"Error initializing schema: {e}")

    def migration(self, df_transactional):
        session = self.Session()
        
        # Map the transactional DataFrame to the Data Warehouse tables
        try:
            for index, row in df_transactional.iterrows():
                # Insert into Dimension_Region
                region_query = text("""
                    INSERT INTO Dimension_Region (Country, Town) 
                    VALUES (:Country, :Town)
                    ON CONFLICT DO NOTHING
                """)
                session.execute(region_query, {'Country': row['Country'], 'Town': row['Town']})

                # Insert into Dimension_Bank
                bank_query = text("""
                    INSERT INTO Dimension_Bank (Bank_Name, Bank_Phone_number, Bank_Address, Bank_Website)
                    VALUES (:Bank_Name, :Bank_Phone_number, :Bank_Address, :Bank_Website)
                    ON CONFLICT DO NOTHING
                """)
                session.execute(bank_query, {
                    'Bank_Name': row['Bank_Name'],
                    'Bank_Phone_number': row['Bank_Phone_number'],
                    'Bank_Address': row['Bank_Address'],
                    'Bank_Website': row['Bank_Website']
                })

                # Insert into Dimension_Reviewer
                reviewer_query = text("""
                    INSERT INTO Dimension_Reviewer (Reviewer_Name, Reviewer_Profil_Link)
                    VALUES (:Reviewer_Name, :Reviewer_Profil_Link)
                    ON CONFLICT DO NOTHING
                """)
                session.execute(reviewer_query, {
                    'Reviewer_Name': row['Reviewer_Nane'],
                    'Reviewer_Profil_Link': row['Reviewer_Profil_Link']
                })
                
                # Insert into Dimension_Time
                publish_date = pd.to_datetime(row['Reviewer_Publish_Date'])
                time_query = text("""
                    INSERT INTO Dimension_Time (Week, Month, Year)
                    VALUES (:Week, :Month, :Year)
                    ON CONFLICT DO NOTHING
                """)
                session.execute(time_query, {
                    'Week': publish_date.isocalendar()[1],
                    'Month': publish_date.month,
                    'Year': publish_date.year
                })
                
                # Insert into Dimension_Topic
                topic_query = text("""
                    INSERT INTO Dimension_Topic (Label)
                    VALUES (:Topic)
                    ON CONFLICT DO NOTHING
                """)
                session.execute(topic_query, {'Topic': row['Topic']})
                
                # Insert into Dimension_SubTopic
                sub_topic_query = text("""
                    INSERT INTO Dimension_SubTopic (Label)
                    VALUES (:Sub_Topic)
                    ON CONFLICT DO NOTHING
                """)
                session.execute(sub_topic_query, {'Sub_Topic': row['Sub_Topic']})
                
                # Insert into Dimension_Sentiment
                sentiment_query = text("""
                    INSERT INTO Dimension_Sentiment (Type)
                    VALUES (:Type)
                    ON CONFLICT DO NOTHING
                """)
                session.execute(sentiment_query, {
                    'Type': str(row['Sentiment']).lower() if str(row['Sentiment']).lower() in ['positive', 'negative', 'neutral'] else 'neutral',
                })
                
                # Insert into Fact_Reviews
                fact_query = text("""
                    INSERT INTO Fact_Reviews (Region_ID, Bank_ID, Reviewer_ID, Time_ID, Topic_ID, Sentiment_ID, SubTopic_ID, Count_Review)
                    VALUES (
                        (SELECT Region_ID FROM Dimension_Region WHERE Country = :Country AND Town = :Town),
                        (SELECT Bank_ID FROM Dimension_Bank WHERE Bank_Name = :Bank_Name AND 
                                                            Bank_Phone_number = :Bank_Phone_number AND
                                                            Bank_Address = :Bank_Address AND
                                                            Bank_Website = :Bank_Website),
                        (SELECT Reviewer_ID FROM Dimension_Reviewer WHERE Reviewer_Name = :Reviewer_Name AND
                                                            Reviewer_Profil_Link = :Reviewer_Profil_Link),
                        (SELECT Time_ID FROM Dimension_Time WHERE Week = :Week AND Month = :Month AND Year = :Year),
                        (SELECT Topic_ID FROM Dimension_Topic WHERE Label = :Topic),
                        (SELECT Sentiment_ID FROM Dimension_Sentiment WHERE Type = :Sentiment),
                        (SELECT SubTopic_ID FROM Dimension_SubTopic WHERE Label = :Sub_Topic),
                        1
                    )
                """)
                session.execute(fact_query, {
                    'Country': row['Country'],
                    'Town': row['Town'],
                    'Bank_Name': row['Bank_Name'],
                    'Bank_Phone_number': row['Bank_Phone_number'],
                    'Bank_Address': row['Bank_Address'],
                    'Bank_Website': row['Bank_Website'],
                    'Reviewer_Name': row['Reviewer_Nane'],
                    'Reviewer_Profil_Link': row['Reviewer_Profil_Link'],
                    'Week': publish_date.isocalendar()[1],
                    'Month': publish_date.month,
                    'Year': publish_date.year,
                    'Topic': row['Topic'],
                    'Sentiment': str(row['Sentiment']).lower() if str(row['Sentiment']).lower() in ['positive', 'negative', 'neutral'] else 'neutral',
                    'Sub_Topic': row['Sub_Topic']
                })
                
            session.commit()
            print("Data migrated successfully!")
        except Exception as e:
            session.rollback()
            print(f"Error migrating data: {e}")
        finally:
            session.close()

In [3]:
# Utilisation de la classe
db_url = 'postgresql://bri:bri@localhost:5432/bri_decesional'
schema_file = './decesional_schema.sql'
#df_transactional = pd.read_csv('Macro_table_LSA.csv')
df_transactional = pd.read_csv('Macro_table_LLAMA.csv')


dw = DataWarehouse(db_url)
dw.connect()
dw.init_schema(schema_file)
dw.migration(df_transactional)

Connected to the database successfully!
Schema created successfully! Tables in the database:
dimension_bank
fact_reviews
dimension_region
dimension_reviewer
dimension_time
dimension_topic
dimension_sentiment
dimension_subtopic


  result = self.connection.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='public';")


Data migrated successfully!
