# File to STG Load

We will load latest file data into stage table. Before loading the file, We will truncate the stage table and then load the fresh data. While loading we will also calculate the SHA 256 key for below columns.

	[ID_REVIEW]
	[CAPTION]
	[RELATIVE_DATE]
	[RETRIEVAL_DATE]
	[RATING]
	[USERNAME]
	[N_REVIEW_USER]
	[N_PHOTO_USER]
	[URL_USER]

In [1]:
import pandas as pd
import pyodbc
import hashlib
from datetime import datetime

# Configuration variables
file_path = 'D:\\MTech\\4-SEM\\ProjectCode\\newest_gm_reviews.csv'  # Update with your file path
server = 'LAPTOP-MG2NCG2P\SQLEXPRESS'       # SQL Server name
database = 'GoogleReview'   # Database name
username = 'sa'   # DB username
password = 'ermifu@AMT606'   # DB password
table_name = 'GoogleReviewSTG'  # Stage table name

# Create a connection to the SQL Server database
conn = pyodbc.connect(
    'DRIVER={SQL Server};'
    f'SERVER={server};'
    f'DATABASE={database};'
    f'UID={username};'
    f'PWD={password}'
)

# Function to truncate the stage table
def truncate_stage_table():
    cursor = conn.cursor()
    truncate_query = f"TRUNCATE TABLE {table_name}"
    cursor.execute(truncate_query)
    conn.commit()
    print(f"Table {table_name} truncated.")


# Function to truncate the stage table
def truncate_stage_table():
    cursor = conn.cursor()
    truncate_query = f"TRUNCATE TABLE {table_name}"
    cursor.execute(truncate_query)
    conn.commit()
    print(f"Table {table_name} truncated.")

# Function to calculate SHA-256 hash
def calculate_sha256(*args):
    sha256 = hashlib.sha256()
    for arg in args:
        sha256.update(str(arg).encode('utf-8') if arg else b'')
    return sha256.hexdigest()

# Function to convert hex string to binary
def hex_to_binary(hex_string):
    return bytes.fromhex(hex_string)

# Function to load data from the file to the stage table
def load_data_to_stage():
    # Read the file into a pandas DataFrame
    df = pd.read_csv(file_path)

    # Clean column names (strip spaces and lowercase)
    df.columns = df.columns.str.strip().str.lower()

    # Define the required columns (matching the actual CSV columns and SQL Server table)
    required_columns = ['id_review', 'caption', 'relative_date', 'retrieval_date', 
                        'rating', 'username', 'n_review_user', 'n_photo_user', 'url_user']

    # Check if the cleaned file contains the expected columns
    if not set(required_columns).issubset(df.columns):
        raise ValueError(f"The CSV file does not contain the required columns. Found columns: {df.columns.tolist()}")

    # Ensure correct data types for SQL Server table
    df['rating'] = pd.to_numeric(df['rating'], errors='coerce', downcast='integer')  # Smallint in SQL Server
    df['n_review_user'] = pd.to_numeric(df['n_review_user'], errors='coerce', downcast='integer')  # Integer in SQL Server

    # Get current date and time
    current_datetime = datetime.now()

    # Generate insert query from DataFrame
    cursor = conn.cursor()
    for index, row in df.iterrows():
        # Calculate SHA-256 hash for each row
        sha256_key = calculate_sha256(
            row['id_review'], row['caption'], row['relative_date'],
            row['retrieval_date'], row['rating'], row['username'],
            row['n_review_user'], row['n_photo_user'], row['url_user']
        )
        sha256_key_binary = hex_to_binary(sha256_key)  # Convert hex string to binary

        values = [
            "'{}'".format(str(row['id_review'])),  # nvarchar(100)
            "'{}'".format(str(row['caption']).replace("'", "''")) if pd.notnull(row['caption']) else 'NULL',  # nvarchar(max)
            "'{}'".format(str(row['relative_date'])) if pd.notnull(row['relative_date']) else 'NULL',  # nvarchar(100)
            "'{}'".format(str(row['retrieval_date'])) if pd.notnull(row['retrieval_date']) else 'NULL',  # nvarchar(100)
            "{}".format(row['rating']) if pd.notnull(row['rating']) else 'NULL',  # smallint
            "'{}'".format(str(row['username'])) if pd.notnull(row['username']) else 'NULL',  # nvarchar(100)
            "{}".format(row['n_review_user']) if pd.notnull(row['n_review_user']) else 'NULL',  # int
            "'{}'".format(str(row['n_photo_user'])) if pd.notnull(row['n_photo_user']) else 'NULL',  # nvarchar(100)
            "'{}'".format(str(row['url_user'])) if pd.notnull(row['url_user']) else 'NULL',  # nvarchar(100)
            "0x" + sha256_key_binary.hex(),  # SHA-256 key as binary
            "'ETL_WEB'",  # CREATE_BY
            "'{}'".format(current_datetime.strftime('%Y-%m-%d %H:%M:%S')),  # CREATE_DATETIME
            "'{}'".format(current_datetime.strftime('%Y-%m-%d %H:%M:%S'))  # BUSINESS_DATE
        ]
        
        insert_query = """
            INSERT INTO {} 
            (id_review, caption, relative_date, retrieval_date, rating, 
            username, n_review_user, n_photo_user, url_user, SHA_256_KEY, 
            CREATE_BY, CREATE_DATETIME, BUSINESS_DATE) 
            VALUES ({})
        """.format(table_name, ', '.join(values))
        
        cursor.execute(insert_query)

    # Commit the transaction to the database
    conn.commit()
    print(f"Data loaded into {table_name}.")

# Main function to truncate and load data
def main():
    truncate_stage_table()  # Truncate the stage table first
    load_data_to_stage()    # Load new data from the file

if __name__ == "__main__":
    main()

# Close the connection
conn.close()


Table GoogleReviewSTG truncated.
Data loaded into GoogleReviewSTG.


# STG To SCD2 Table

After loading stage table, we will load the scd2 table. SCD2 table will have historical record with valid from ad valid to date to get latest data.

In [2]:
import pandas as pd
import pyodbc
from sqlalchemy import create_engine
from urllib.parse import quote_plus

# SQL Server connection details
server = 'LAPTOP-MG2NCG2P\SQLEXPRESS'       # SQL Server name
database = 'GoogleReview'   # Database name
username = 'sa'   # DB username
password = 'ermifu@AMT606'   # DB password

conn = pyodbc.connect(
    'DRIVER={SQL Server};'
    f'SERVER={server};'
    f'DATABASE={database};'
    f'UID={username};'
    f'PWD={password}'
)

connection_string = ("Driver={ODBC Driver 17 for SQL Server};"
               "Server=LAPTOP-MG2NCG2P\SQLEXPRESS;"
               "Database=GoogleReview;"
               "UID=sa;"
               "PWD=ermifu@AMT606;"
               )

engine = create_engine(f"mssql+pyodbc:///?odbc_connect={quote_plus(connection_string)}")

# Connection string for pyodbc

# Connect to SQL Server using pyodbc
with engine.connect() as connection:
    print("Connection successful")

# Read data from staging table
query_staging = "SELECT * FROM [DBO].GoogleReviewSTG"
df_staging = pd.read_sql(query_staging, conn)

# Check if SCD2 table is empty
query_scd2_check = "SELECT COUNT(*) FROM [dbo].[GoogleReviewSCD2]"
df_scd2_check = pd.read_sql(query_scd2_check, conn)
is_scd2_empty = df_scd2_check.iloc[0, 0] == 0

# Close connection
#conn.close()

# If SCD2 table is empty, insert all records from staging
if is_scd2_empty:
    df_staging['Valid_From'] = pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')
    df_staging['Valid_To'] = '9999-12-31'
    
    # Connect to SQL Server using SQLAlchemy
    #engine = create_engine(f'mssql+pyodbc://{username}:{password}@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server')
    
    # Start a new transaction
    with engine.begin() as connection:
        # Insert all records into SCD2 table
        df_staging.to_sql('GoogleReviewSCD2', connection, if_exists='append', index=False)

    print("All records successfully inserted into the SCD2 table.")

else:
    # Read data from SCD2 table
    query_scd2 = "SELECT * FROM [dbo].[GoogleReviewSCD2] WHERE Valid_To = '9999-12-31'"
    df_scd2 = pd.read_sql(query_scd2, conn)

    # Merge staging data with SCD2 data
    merged = df_staging.merge(df_scd2, on='ID_REVIEW', suffixes=('_staging', '_scd2'), how='left')

    # Identify new records
    new_records = merged[merged['SHA_256_KEY_scd2'].isna()]

    # Identify changed records
    changed_records = merged[~merged['SHA_256_KEY_scd2'].isna() & (merged['SHA_256_KEY_staging'] != merged['SHA_256_KEY_scd2'])]

    # Update existing records
    update_records = df_scd2[df_scd2['ID_REVIEW'].isin(changed_records['ID_REVIEW'])]

    # Set Valid_To for existing records to current date
    update_records['Valid_To'] = pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')

    # Insert new records
    new_records = new_records.drop(columns=[col for col in new_records.columns if col.endswith('_scd2')])
    new_records['Valid_From'] = pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')
    new_records['Valid_To'] = '9999-12-31'

    # Insert changed records
    changed_records = changed_records.drop(columns=[col for col in changed_records.columns if col.endswith('_scd2')])
    changed_records['Valid_From'] = pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')
    changed_records['Valid_To'] = '9999-12-31'

    # Connect to SQL Server using SQLAlchemy
    #
    # Start a new transaction
    with engine.begin() as connection:
        # Update existing records
        for _, row in update_records.iterrows():
            update_query = f"""
            UPDATE [dbo].[GoogleReviewSCD2]
            SET Valid_To = '{row['Valid_To']}'
            WHERE ID_REVIEW = '{row['ID_REVIEW']}' AND Valid_To = '9999-12-31'
            """
            connection.execute(update_query)
        
        # Insert new records
        new_records.to_sql('GoogleReviewSCD2', connection, if_exists='append', index=False)
        
        # Insert changed records
        changed_records.to_sql('GoogleReviewSCD2', connection, if_exists='append', index=False)

    print("Data successfully updated in the SCD2 table.")


Connection successful




All records successfully inserted into the SCD2 table.
