In [1]:
import snowflake.connector
import pandas as pd
import numpy as np
from datetime import datetime

In [2]:
# Connect to Snowflake
connection = snowflake.connector.connect(
    user = 'brucewil',
    password = 'Psnolecy2!',
    account = 'mognruk-jlb64949'
)
cur = connection.cursor()

In [3]:
def create_database_table(dataframe, primary_key, database_name, table_name): 
    
 
    # Check if primary_key is a column or a list in the dataframe, composite key
    
    if isinstance(primary_key, str):
        if primary_key not in dataframe.columns:
            raise ValueError("Primary key column not found in the dataframe.")
    elif isinstance(primary_key, list):
        if not all(col in dataframe.columns for col in primary_key):
            raise ValueError("One or more primary key columns not found in the dataframe.")
    else:
        raise ValueError("Invalid primary key type. Expected string or list.")

        
   # Check if primary_key column(s) is/are unique
    if isinstance(primary_key, str):
        if not dataframe[primary_key].is_unique:
            raise ValueError("Primary key column is not unique.")
    elif isinstance(primary_key, list):
        if dataframe.duplicated(subset=primary_key).any():
            raise ValueError("Composite primary key values are not unique.")
    
    #check if the database exists, Connect the database or create it
    cur.execute(f"SHOW DATABASES LIKE '{database_name}'")
    exists = cur.fetchone()

    if exists:
        # Connect it
        cur.execute(f"USE DATABASE {database_name}")
    else:
        # Create it
        cur.execute(f"CREATE DATABASE {database_name}")

        
    # map the data type
    def map_dtype(dtype):
        if dtype == "object":
            return "STRING"
        elif dtype == "int64":
            return "INTEGER"
        elif dtype == "float64":
            return "FLOAT"
        elif dtype == "bool":
            return "BOOLEAN"
        elif dtype == "datetime64[ns]":
            return "TIMESTAMP"
        else:
            return "STRING"  # Default to STRING.

    
    # Generate the SQL CREATE or REPLACE command
    columns = []
    for column, dtype in dataframe.dtypes.items():
        if isinstance(primary_key, str) and column == primary_key:
            columns.append(f"{column} {map_dtype(dtype)} PRIMARY KEY")
        elif isinstance(primary_key, list) and column in primary_key:
            columns.append(f"{column} {map_dtype(dtype)}")
        else:
            columns.append(f"{column} {map_dtype(dtype)}")


    create_table = f"CREATE OR REPLACE TABLE {table_name} (\n"
    create_table += ",\n".join(columns)
    create_table += "\n)"
     
    cur.execute(create_table)

    # Generate the SQL INSERT command
    insert_statement = f"INSERT INTO {table_name} ("
    insert_statement += f", ".join(dataframe.columns) + f") VALUES ("

    # Create a placeholder for each column value in the INSERT statement
    value_placeholder = ', '.join(['%s'] * len(dataframe.columns))

    insert_statement = insert_statement + value_placeholder + ")"

  
    # Create a list to store all the values from the dataframe
        
    all_values = []
    
    # Iterate over each row in the dataframe
    for _, row in dataframe.iterrows():
        values = []
        for value in row.values:
            if pd.isna(value):
                values.append(None)
            elif isinstance(value, str):
                values.append(value)
            elif isinstance(value, pd.Timestamp):
                values.append(f"{value}")
            else:
                values.append(value)
        all_values.append(tuple(values))  # Convert to tuple and append to the list

    # Set batch size
    batch_size = 16000

    # Split the values into batches and insert each batch separately
    for i in range(0, len(all_values), batch_size):
        batch = all_values[i:i + batch_size] #slicing
        
        cur.executemany(insert_statement, batch)

    
    
    #cur.execute(insert_statement, all_values) execute can't run with all values at once
    
    #print(insert_statement, all_values)


#    connection.commit()
#    connection.autocommit(True)
# Define the batch size based on Snowflake's limitation (e.g., 16,000)


In [4]:
# Define function to create surrogate keys and initial surrogate key mapping table
def create_initial_surrogate_key_mapping_table(table, surrogate_key_name, start_date, end_date):
    # Add surrogate key to table
    num_rows = table.shape[0]
    surrogate_key = range(num_rows)
    table[surrogate_key_name] = surrogate_key
    table = table[table.columns[-1:].tolist() + table.columns[:-1].tolist()]
       
    # Create surrogate key mapping table
    surrogate_key_mapping_table = table.iloc[:, 0:2]
    surrogate_key_mapping_table['Start_Date'] = start_date
    surrogate_key_mapping_table['End_Date'] = None
    surrogate_key_mapping_table['Current_Flag'] = True
    
    return table, surrogate_key_mapping_table

#### 1)  Create the FILM_DIM table and surrogate key mapping table

In [5]:
#  Read the data from the sakila database into the dataframe FILM
cur.execute("SELECT f.film_id, f.rental_rate, f.title as FILM_TITLE, \
               f.rating as FILM_RATING, l.name as FILM_LANGUAGE \
               FROM sakila.public.film f \
               LEFT JOIN sakila.public.language l \
               WHERE f.language_id = l.language_id")
rows = cur.fetchall()
columns = [column[0] for column in cur.description]
FILM = pd.DataFrame(rows, columns = columns)
FILM, film_surrogate_key_mapping = create_initial_surrogate_key_mapping_table(table = FILM,
                                                                              surrogate_key_name = 'FILM_KEY', 
                                                                              start_date = datetime(2000,1,1),
                                                                              end_date = datetime(2099,12,31))

In [6]:
FILM

Unnamed: 0,FILM_KEY,FILM_ID,RENTAL_RATE,FILM_TITLE,FILM_RATING,FILM_LANGUAGE
0,0,1,0.99,ACADEMY DINOSAUR,PG,English
1,1,2,4.99,ACE GOLDFINGER,G,English
2,2,3,2.99,ADAPTATION HOLES,NC-17,English
3,3,4,2.99,AFFAIR PREJUDICE,G,English
4,4,5,2.99,AFRICAN EGG,G,English
...,...,...,...,...,...,...
995,995,996,0.99,YOUNG LANGUAGE,G,English
996,996,997,0.99,YOUTH KICK,NC-17,English
997,997,998,0.99,ZHIVAGO CORE,NC-17,English
998,998,999,2.99,ZOOLANDER FICTION,R,English


In [7]:
film_surrogate_key_mapping

Unnamed: 0,FILM_KEY,FILM_ID,Start_Date,End_Date,Current_Flag
0,0,1,2000-01-01,,True
1,1,2,2000-01-01,,True
2,2,3,2000-01-01,,True
3,3,4,2000-01-01,,True
4,4,5,2000-01-01,,True
...,...,...,...,...,...
995,995,996,2000-01-01,,True
996,996,997,2000-01-01,,True
997,997,998,2000-01-01,,True
998,998,999,2000-01-01,,True


In [8]:
create_database_table(FILM, 'FILM_KEY', 'sakila_data_warehouse_1', 'film_dim')

In [9]:
create_database_table(film_surrogate_key_mapping, 'FILM_KEY', 'sakila_data_warehouse_1', 'film_surrogate_mapping_table' )

#### 2)  Create the STORE_DIM table and surrogate key mapping table

In [10]:
#  Read the data from the sakila database into the dataframe STORE
cur.execute("SELECT s.store_id, ci.CITY AS store_city, co.COUNTRY as store_country\
            FROM sakila.public.STORE s\
            LEFT JOIN sakila.public.ADDRESS a ON a.ADDRESS_ID = s.ADDRESS_ID\
            LEFT JOIN sakila.public.CITY ci ON ci.CITY_ID = a.CITY_ID\
            LEFT JOIN sakila.public.COUNTRY co ON ci.COUNTRY_ID = co.COUNTRY_ID")
rows = cur.fetchall()
columns = [column[0] for column in cur.description]

columns = [column[0] for column in cur.description]
STORE = pd.DataFrame(rows, columns = columns)
STORE, store_surrogate_key_mapping = create_initial_surrogate_key_mapping_table(table = STORE,
                                                                              surrogate_key_name = 'STORE_KEY',
                                                                              start_date = datetime(2000,1,1),
                                                                              end_date = datetime(2099,12,31))

In [11]:
STORE

Unnamed: 0,STORE_KEY,STORE_ID,STORE_CITY,STORE_COUNTRY
0,0,1,Lethbridge,Canada
1,1,2,Woodridge,Australia


In [12]:
store_surrogate_key_mapping

Unnamed: 0,STORE_KEY,STORE_ID,Start_Date,End_Date,Current_Flag
0,0,1,2000-01-01,,True
1,1,2,2000-01-01,,True


In [13]:
create_database_table(STORE, 'STORE_KEY', 'sakila_data_warehouse_1', 'store_dim')

In [14]:
create_database_table(store_surrogate_key_mapping, 'STORE_KEY', 'sakila_data_warehouse_1', 'store_surrogate_mapping_table')

#### 3)  Create the CUSTOMER_DIM table and surrogate key mapping table

In [15]:
# Read the data from the sakila database into the dataframe CUSTOMER
cur.execute("SELECT c.customer_id, \
                c.first_name AS customer_first_name, \
                c.last_name AS customer_last_name, \
                c.email AS customer_email, \
                ci.CITY AS customer_city, \
                co.COUNTRY AS customer_country\
             FROM sakila.public.customer c \
             LEFT JOIN sakila.public.address a ON c.ADDRESS_ID = a.ADDRESS_ID \
             LEFT JOIN sakila.public.city ci ON a.CITY_ID = ci.CITY_ID \
             LEFT JOIN sakila.public.country co ON ci.COUNTRY_ID = co.COUNTRY_ID")
rows = cur.fetchall( )
columns = [column[0] for column in cur.description]
CUSTOMER = pd.DataFrame(rows, columns = columns)
CUSTOMER, customer_surrogate_key_mapping = create_initial_surrogate_key_mapping_table(table = CUSTOMER,
                                                                                      surrogate_key_name = 'CUSTOMER_KEY',
                                                                                      start_date = datetime(2000,1,1),
                                                                                      end_date = datetime(2099 ,12,31))

In [16]:
CUSTOMER

Unnamed: 0,CUSTOMER_KEY,CUSTOMER_ID,CUSTOMER_FIRST_NAME,CUSTOMER_LAST_NAME,CUSTOMER_EMAIL,CUSTOMER_CITY,CUSTOMER_COUNTRY
0,0,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,Sasebo,Japan
1,1,2,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,San Bernardino,United States
2,2,3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,Athenai,Greece
3,3,4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,Myingyan,Myanmar
4,4,5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,Nantou,Taiwan
...,...,...,...,...,...,...,...
594,594,595,TERRENCE,GUNDERSON,TERRENCE.GUNDERSON@sakilacustomer.org,Jinzhou,China
595,595,596,ENRIQUE,FORSYTHE,ENRIQUE.FORSYTHE@sakilacustomer.org,Patras,Greece
596,596,597,FREDDIE,DUGGAN,FREDDIE.DUGGAN@sakilacustomer.org,Sullana,Peru
597,597,598,WADE,DELVALLE,WADE.DELVALLE@sakilacustomer.org,Lausanne,Switzerland


In [17]:
create_database_table(CUSTOMER, 'CUSTOMER_KEY', 'sakila_data_warehouse_1', 'customer_dim')

In [18]:
create_database_table(customer_surrogate_key_mapping, 'CUSTOMER_KEY', 'sakila_data_warehouse_1', 'customer_surrogate_mapping_table')

#### 4A) Create DATE_DIM table from spreadsheet

In [19]:
#DATES = pd.read_csv('Date_Lookup_Table.csv')

In [20]:
#DATES

In [21]:
#create_database_table(DATES, 'Date', 'sakila_data_warehouse_1', 'date_dim')

#### 4B) Create DATE_DIM table from rentals table

In [22]:
#  Read the date data from the RENTAL TABLE into the dataframe DATE
cur.execute("SELECT rental_date AS date\
            FROM sakila.public.RENTAL")
rows = cur.fetchall()
columns = [column[0] for column in cur.description]
DATES = pd.DataFrame(rows, columns = columns)
DATES

Unnamed: 0,DATE
0,2005-05-24 22:53:30
1,2005-05-24 22:54:33
2,2005-05-24 23:03:39
3,2005-05-24 23:04:41
4,2005-05-24 23:05:21
...,...
16039,2005-08-23 22:25:26
16040,2005-08-23 22:26:47
16041,2005-08-23 22:42:48
16042,2005-08-23 22:43:07


In [23]:
#pip install holidays

In [24]:
#  Use Python to add the other date attributes
from datetime import datetime
import holidays
from datetime import date

In [25]:
DATES['day_of_week'] = DATES['DATE'].dt.day_name()
DATES['week_of_year'] = DATES['DATE'].dt.isocalendar().week
DATES['month_of_year'] = DATES['DATE'].dt.month
DATES['year'] = DATES['DATE'].dt.year
DATES['weekend_flag'] = DATES['DATE'].dt.dayofweek >= 5

# Create a list of US holidays for the relevant years
us_holidays = holidays.US(years=[date.year for date in DATES['DATE'].dt.date.unique()])

# Create 'holiday_flag' column
DATES['holiday_flag'] = DATES['DATE'].dt.date.apply(lambda x: x in us_holidays)

In [26]:
# Drop any duplicate rows (transactions that occurred at the same date/time)
DATES = DATES.drop_duplicates()

In [27]:
create_database_table(DATES, 'DATE', 'sakila_data_warehouse_1', 'date_dim')

#### 5) Create initial fact table

In [28]:
#  Read the source rental table along with other required fields

cur.execute("SELECT r.rental_id, r.rental_date, i.film_ID, r.customer_ID, i.store_id, r.return_date,\
            p.amount, f.rental_rate, f.rental_duration\
            FROM sakila.public.RENTAL r\
            LEFT JOIN sakila.public.PAYMENT p ON r.rental_ID = p.payment_ID\
            LEFT JOIN sakila.public.INVENTORY i ON r.inventory_ID = i.inventory_ID\
            LEFT JOIN sakila.public.FILM f ON i.film_ID = f.film_ID")
rows = cur.fetchall()
columns = [column[0] for column in cur.description]
rentals_data = pd.DataFrame(rows, columns = columns)
rentals_data

Unnamed: 0,RENTAL_ID,RENTAL_DATE,FILM_ID,CUSTOMER_ID,STORE_ID,RETURN_DATE,AMOUNT,RENTAL_RATE,RENTAL_DURATION
0,1,2005-05-24 22:53:30,80,130,1,2005-05-26 22:04:30,2.99,2.99,7
1,2,2005-05-24 22:54:33,333,459,2,2005-05-28 19:40:33,0.99,2.99,7
2,3,2005-05-24 23:03:39,373,408,2,2005-06-01 22:12:39,5.99,2.99,7
3,4,2005-05-24 23:04:41,535,333,1,2005-06-03 01:43:41,0.99,0.99,6
4,5,2005-05-24 23:05:21,450,222,2,2005-06-02 04:33:21,9.99,2.99,5
...,...,...,...,...,...,...,...,...,...
16039,16045,2005-08-23 22:25:26,168,14,1,2005-08-25 23:54:26,4.99,0.99,5
16040,16046,2005-08-23 22:26:47,951,74,2,2005-08-27 18:02:47,1.99,0.99,6
16041,16047,2005-08-23 22:42:48,452,114,2,2005-08-25 02:48:48,8.99,0.99,4
16042,16048,2005-08-23 22:43:07,439,103,1,2005-08-31 21:33:07,2.99,4.99,4


In [29]:
#  Read the three surrogate key mapping tables

cur.execute("Select * FROM CUSTOMER_SURROGATE_MAPPING_TABLE")
rows = cur.fetchall()
columns = [column[0] for column in cur.description]
customer_surrogate_mapping_table = pd.DataFrame(rows, columns = columns)


cur.execute("Select * FROM FILM_SURROGATE_MAPPING_TABLE")
rows = cur.fetchall()
columns = [column[0] for column in cur.description]
film_surrogate_mapping_table = pd.DataFrame(rows, columns = columns)

cur.execute("Select * FROM STORE_SURROGATE_MAPPING_TABLE")
rows = cur.fetchall()
columns = [column[0] for column in cur.description]
store_surrogate_mapping_table = pd.DataFrame(rows, columns = columns)

In [30]:
# Calculate the appropriate foreign keys to the dimension tables

rentals_data['STORE_KEY'] = rentals_data['STORE_ID'].map(store_surrogate_mapping_table.set_index('STORE_ID')['STORE_KEY'])
rentals_data['CUSTOMER_KEY'] = rentals_data['CUSTOMER_ID'].map(customer_surrogate_mapping_table.set_index('CUSTOMER_ID')['CUSTOMER_KEY'])
rentals_data['FILM_KEY'] = rentals_data['FILM_ID'].map(film_surrogate_mapping_table.set_index('FILM_ID')['FILM_KEY'])

In [31]:
# Write out the columns needed for the fact table to a dataframe

rentals_data['rental_duration'] = rentals_data['RETURN_DATE'] - rentals_data['RENTAL_DATE']
rentals_data.rename(columns={'AMOUNT': 'AMOUNT_PAID'}, inplace=True)
fact_table = rentals_data[['FILM_KEY', 'CUSTOMER_KEY', 'STORE_KEY', 'RENTAL_DATE', 'RENTAL_DURATION', 'AMOUNT_PAID']]
fact_table

Unnamed: 0,FILM_KEY,CUSTOMER_KEY,STORE_KEY,RENTAL_DATE,RENTAL_DURATION,AMOUNT_PAID
0,79,129,0,2005-05-24 22:53:30,7,2.99
1,332,458,1,2005-05-24 22:54:33,7,0.99
2,372,407,1,2005-05-24 23:03:39,7,5.99
3,534,332,0,2005-05-24 23:04:41,6,0.99
4,449,221,1,2005-05-24 23:05:21,5,9.99
...,...,...,...,...,...,...
16039,167,13,0,2005-08-23 22:25:26,5,4.99
16040,950,73,1,2005-08-23 22:26:47,6,1.99
16041,451,113,1,2005-08-23 22:42:48,4,8.99
16042,438,102,0,2005-08-23 22:43:07,4,2.99


In [32]:
#  Write the fact table to the data warehouse
create_database_table(fact_table, ['FILM_KEY', 'CUSTOMER_KEY', 'STORE_KEY', 'RENTAL_DATE'], \
                                   "sakila_data_warehouse_1", "RETAIL_FACT_TABLE")

In [33]:
#def create_database_table(dataframe, primary_key, database_name, table_name):  

# Type 1 change

In [34]:
# Read the data from the sakila database into the dataframe CUSTOMER
cur.execute("SELECT c.customer_id, \
                c.first_name AS customer_first_name, \
                c.last_name AS customer_last_name, \
                c.email AS customer_email, \
                ci.CITY AS customer_city, \
                co.COUNTRY AS customer_country\
             FROM sakila.public.customer c \
             LEFT JOIN sakila.public.address a ON c.ADDRESS_ID = a.ADDRESS_ID \
             LEFT JOIN sakila.public.city ci ON a.CITY_ID = ci.CITY_ID \
             LEFT JOIN sakila.public.country co ON ci.COUNTRY_ID = co.COUNTRY_ID")
rows = cur.fetchall( )
columns = [column[0] for column in cur.description]
CUSTOMER = pd.DataFrame(rows, columns = columns)

# Function to perform Type 1 change on the CUSTOMER DataFrame
def perform_type1_change(df, new_data):
    for record in new_data:
        customer_id = record["CUSTOMER_ID"]
        new_values = record.copy()
        new_values.pop("CUSTOMER_ID")  # Remove CUSTOMER_ID from the new values
        df.loc[df["CUSTOMER_ID"] == customer_id, new_values.keys()] = list(new_values.values())

# Fake data for Type 1 change
new_data_fake_1 = [
    {
        "CUSTOMER_ID": 1,
        "CUSTOMER_FIRST_NAME": "MARY",
        "CUSTOMER_LAST_NAME": "JOHN",
        "CUSTOMER_EMAIL": "MARY.SMITH@sakilacustomer.org",
        "CUSTOMER_CITY": "Sasebo",
        "CUSTOMER_COUNTRY": "Japan"
    },
    {
        "CUSTOMER_ID": 2,
        "CUSTOMER_FIRST_NAME": "PATRICIA",
        "CUSTOMER_LAST_NAME": "JOHNSON",
        "CUSTOMER_EMAIL": "PATRICIA@sakilacustomer.org",
        "CUSTOMER_CITY": "San Bernardino",
        "CUSTOMER_COUNTRY": "United States"
    }
]

# Perform Type 1 change on the CUSTOMER DataFrame
perform_type1_change(CUSTOMER, new_data_fake_1)

# Write the CUSTOMER DataFrame to Snowflake as a dimension table
customer_surrogate_mapping_table = create_initial_surrogate_key_mapping_table(
    table=CUSTOMER,
    surrogate_key_name='CUSTOMER_KEY',
    start_date=datetime(2000, 1, 1),
    end_date=datetime(2099, 12, 31)
)[1]

create_database_table(CUSTOMER, 'CUSTOMER_KEY', 'sakila_data_warehouse_1', 'customer_dim')
create_database_table(customer_surrogate_mapping_table, 'CUSTOMER_KEY', 'sakila_data_warehouse_1', 'customer_surrogate_mapping_table')


# Type 2 change 

In [35]:
from datetime import datetime
cur.execute('USE DATABASE sakila_data_warehouse_1')
cur.execute("SELECT * FROM sakila_data_warehouse_1.public.customer_dim")
rows = cur.fetchall( )
columns = [column[0] for column in cur.description]
CUSTOMER = pd.DataFrame(rows, columns = columns)
CUSTOMER

Unnamed: 0,CUSTOMER_ID,CUSTOMER_FIRST_NAME,CUSTOMER_LAST_NAME,CUSTOMER_EMAIL,CUSTOMER_CITY,CUSTOMER_COUNTRY,CUSTOMER_KEY
0,1,MARY,JOHN,MARY.SMITH@sakilacustomer.org,Sasebo,Japan,0
1,2,PATRICIA,JOHNSON,PATRICIA@sakilacustomer.org,San Bernardino,United States,1
2,3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,Athenai,Greece,2
3,4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,Myingyan,Myanmar,3
4,5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,Nantou,Taiwan,4
...,...,...,...,...,...,...,...
594,595,TERRENCE,GUNDERSON,TERRENCE.GUNDERSON@sakilacustomer.org,Jinzhou,China,594
595,596,ENRIQUE,FORSYTHE,ENRIQUE.FORSYTHE@sakilacustomer.org,Patras,Greece,595
596,597,FREDDIE,DUGGAN,FREDDIE.DUGGAN@sakilacustomer.org,Sullana,Peru,596
597,598,WADE,DELVALLE,WADE.DELVALLE@sakilacustomer.org,Lausanne,Switzerland,597


In [36]:
cur.execute("SELECT * FROM sakila_data_warehouse_1.public.customer_surrogate_mapping_table")
rows = cur.fetchall( )
columns = [column[0] for column in cur.description]
customer_surrogate_key_mapping = pd.DataFrame(rows, columns = columns)
customer_surrogate_key_mapping

Unnamed: 0,CUSTOMER_KEY,CUSTOMER_ID,START_DATE,END_DATE,CURRENT_FLAG
0,0,1,2000-01-01,,True
1,1,2,2000-01-01,,True
2,2,3,2000-01-01,,True
3,3,4,2000-01-01,,True
4,4,5,2000-01-01,,True
...,...,...,...,...,...
594,594,595,2000-01-01,,True
595,595,596,2000-01-01,,True
596,596,597,2000-01-01,,True
597,597,598,2000-01-01,,True


In [37]:
def update_customer_dim_and_mapping_table(df, mapping_table, new_data):
    max_surrogate_key = df["CUSTOMER_KEY"].max()

    for record in new_data:
        customer_id = record["CUSTOMER_ID"]
        # Use loc or iloc to get a reference to the row
        original_row = df.loc[df["CUSTOMER_ID"] == customer_id]

        # Use at or iat to change the value of the row
        mapping_table.loc[mapping_table["CUSTOMER_KEY"] == original_row["CUSTOMER_KEY"].values[0], "END_DATE"] = datetime.now()
        mapping_table.loc[mapping_table["CUSTOMER_KEY"] == original_row["CUSTOMER_KEY"].values[0], "CURRENT_FLAG"] = False

        # Increase the surrogate key by 1
        max_surrogate_key += 1
        original_row.at[original_row.index[0], "CUSTOMER_KEY"] = max_surrogate_key

        for key, value in record.items():
            original_row.at[original_row.index[0], key] = value

        #df = df.append(original_row,ignore_index=True)
        df = pd.concat([df, original_row], ignore_index=True)
        new_mapping_row = pd.Series({
            "CUSTOMER_KEY": max_surrogate_key,
            "CUSTOMER_ID": customer_id,
            "START_DATE": datetime.now(),
            "END_DATE": None,
            "CURRENT_FLAG": True
        }, name='x')
        #mapping_table = mapping_table.append(new_mapping_row)
        mapping_table = pd.concat([mapping_table, new_mapping_row], ignore_index=True)  # Use concat to append the row to the DataFrame


    return df, mapping_table

# Usage example
new_data = [
    {
        "CUSTOMER_ID": 2,
        "CUSTOMER_FIRST_NAME": "PATRICIA",
        "CUSTOMER_LAST_NAME": "JOHNSON",
        "CUSTOMER_EMAIL": "PATRICIA.JOHNSON@sakilacustomer.org",
        "CUSTOMER_CITY": "New York",
        "CUSTOMER_COUNTRY": "United States"
    }
]

CUSTOMER, customer_surrogate_key_mapping = update_customer_dim_and_mapping_table(
    df=CUSTOMER,
    mapping_table=customer_surrogate_key_mapping,
    new_data=new_data
)


In [38]:
CUSTOMER

Unnamed: 0,CUSTOMER_ID,CUSTOMER_FIRST_NAME,CUSTOMER_LAST_NAME,CUSTOMER_EMAIL,CUSTOMER_CITY,CUSTOMER_COUNTRY,CUSTOMER_KEY
0,1,MARY,JOHN,MARY.SMITH@sakilacustomer.org,Sasebo,Japan,0
1,2,PATRICIA,JOHNSON,PATRICIA@sakilacustomer.org,San Bernardino,United States,1
2,3,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,Athenai,Greece,2
3,4,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,Myingyan,Myanmar,3
4,5,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,Nantou,Taiwan,4
...,...,...,...,...,...,...,...
595,596,ENRIQUE,FORSYTHE,ENRIQUE.FORSYTHE@sakilacustomer.org,Patras,Greece,595
596,597,FREDDIE,DUGGAN,FREDDIE.DUGGAN@sakilacustomer.org,Sullana,Peru,596
597,598,WADE,DELVALLE,WADE.DELVALLE@sakilacustomer.org,Lausanne,Switzerland,597
598,599,AUSTIN,CINTRON,AUSTIN.CINTRON@sakilacustomer.org,Tieli,China,598


In [39]:
customer_surrogate_key_mapping

Unnamed: 0,CUSTOMER_KEY,CUSTOMER_ID,START_DATE,END_DATE,CURRENT_FLAG,0
0,0.0,1.0,2000-01-01,,True,
1,1.0,2.0,2000-01-01,2023-08-07 16:44:03.878948,False,
2,2.0,3.0,2000-01-01,,True,
3,3.0,4.0,2000-01-01,,True,
4,4.0,5.0,2000-01-01,,True,
...,...,...,...,...,...,...
599,,,NaT,,,599
600,,,NaT,,,2
601,,,NaT,,,2023-08-07 16:44:03.881304
602,,,NaT,,,


In [40]:
create_database_table(CUSTOMER, 'CUSTOMER_KEY', 'sakila_data_warehouse_1', 'customer_dim')

In [41]:
create_database_table(customer_surrogate_key_mapping, 'CUSTOMER_KEY', 'sakila_data_warehouse_1', 'customer_surrogate_mapping_table')

ValueError: Primary key column is not unique.