## Level 1: Data Ingestion to Raw

In this section, we'll get raw datasets from the "data" folder and save them to  Level 1.


In [198]:
import pandas as pd
import os

raw_folder = "C:/Users/Rauan/practice/scd/Raw/"
source_folder = "C:/Users/Rauan/practice/data/"

climdf = pd.read_csv("C:/Users/Rauan/practice/data/climate_change_indicators.csv")
popdf = pd.read_csv("C:/Users/Rauan/practice/data/World Population Dataset.csv")
gdpdf = pd.read_csv("C:/Users/Rauan/practice/data/world_gdp_data.csv")

climdf.to_csv(os.path.join(raw_folder, "climate_change_indicators.csv"), index=False)
popdf.to_csv(os.path.join(raw_folder, "World Population Dataset.csv"), index=False)
gdpdf.to_csv(os.path.join(raw_folder, "world_gdp_data.csv"), index=False)


## Level 2: Create Dims and Facts folder

In this section, we'll get Dimensions and Facts

In [199]:
base_dir = "C:/Users/Rauan/practice/"

source_folder_dims = os.path.join(base_dir, "content/Dim")
source_folder_facts = os.path.join(base_dir, "content/Fact")
refined_dims_folder = os.path.join(base_dir, "scd/Refined/Dims")
refined_facts_folder = os.path.join(base_dir, "scd/Refined/Facts")

dims_to_process = ["DimCountry", "DimRank", "DimArea", "DimCapital"]

## Level 2: Implementing SCD Type 2 for FactNormalizedPopulation

In this section, we will focus on implementing Slowly Changing Dimension (SCD) Type 2 for the FactNormalizedPopulation table. SCD Type 2 allows us to maintain historical data by creating new records for changes in dimension attributes over time, ensuring accurate historical analysis while keeping track of current information.

In this section, it prepares each dimension table by adding start date, end date, and current flag columns to implement SCD Type 2.

In [200]:
import hashlib

start_date = "01.01.1900"
end_date = "31.12.9999"
current_flag = 1

for dim_name in dims_to_process:
    dim_data = pd.read_csv(os.path.join(source_folder_dims, dim_name + ".csv"))
    
    dim_data['start'] = start_date
    dim_data['end'] = end_date
    dim_data['current'] = current_flag
    
    dim_data.to_csv(os.path.join(refined_dims_folder, dim_name + ".csv"), index=False)

This code reads the fact data, generates unique identifiers for each row using MD5 hashing, and then saves the updated data to a CSV in "Refined/Facts" folder.

In [201]:
fact_data = pd.read_csv(os.path.join(source_folder_facts, "FactNormalizedPopulation.csv"))

fact_data['unique_id'] = fact_data.apply(lambda x: hashlib.md5(str(x).encode()).hexdigest(), axis=1)

fact_data.to_csv(os.path.join(refined_facts_folder, "FactNormalizedPopulation.csv"), index=False)

This code reads the current data from the "DimCountry" dimension table, filters it to select only the active records(current = 1), and then saves the selected data to a CSV in "Produced/Dims" folder.

## Level 3: Ingestion to Produced


In [202]:
produced_dims_folder = os.path.join(base_dir, "scd/Produced/Dims")
produced_facts_folder = os.path.join(base_dir, "scd/Produced/Facts")

dim_country_current = pd.read_csv(os.path.join(refined_dims_folder,"DimCountry.csv", ))
dim_country_current = dim_country_current[dim_country_current['current'] == 1][['country_id', 'country']]

dim_country_current.to_csv(os.path.join(produced_dims_folder, "DimCountry.csv"), index=False)

This code filters each dimension table by the current = 1, selects specified columns, and saves the filtered data to CSV in the "Produced/Dims" folder.








In [203]:
base_dir = "C:/Users/Rauan/practice/"

refined_dims_folder = os.path.join(base_dir, "scd/Refined/Dims")
produced_dims_folder = os.path.join(base_dir, "scd/Produced/Dims")

def filter(dim_name, columns):
    dim_data = pd.read_csv(os.path.join(refined_dims_folder, dim_name + ".csv"))
    dim_data_current = dim_data[dim_data['current'] == 1][columns]
    dim_data_current.to_csv(os.path.join(produced_dims_folder, dim_name + ".csv"), index=False)

filter("DimArea", ['country_id', 'Area (km²)'])

filter("DimCapital", ['country_id', 'Capital'])

filter("DimRank", ['country_id', 'Rank'])

This code reads a CSV from Refined facts, then duplicates this process for the production data. After merging production data with the refined data based on specific columns, it drops the 'unique_id' column and saves the resulting DataFrame to CSV file

In [204]:
refined_fact_data = pd.read_csv(os.path.join(refined_facts_folder, "FactNormalizedPopulation.csv"))

produced_fact_data = pd.read_csv(os.path.join(refined_facts_folder, "FactNormalizedPopulation.csv"))

produced_fact_data = produced_fact_data.merge(refined_fact_data[['unique_id', 'country_id', 'date', 'population']])

produced_fact_data.drop(columns=['unique_id'], inplace=True)

produced_fact_data.to_csv(os.path.join(produced_facts_folder, "Produced_FactNorm.csv"), index=False)


Implementing new recording on Level 2, and add it to Level 3

Code replaces occurrences of 'Kazakhstan' with 'Qazaqstan' in the 'country' column and 'Nursultan' with 'Astana' in the 'Capital' column for both the refined and produced DimCountry and DimCapital datasets, and then saves the updated datasets

In [205]:
dim_country_data = pd.read_csv(os.path.join(refined_dims_folder, "DimCountry.csv"))
dim_capital_data = pd.read_csv(os.path.join(refined_dims_folder, "DimCapital.csv"))
produced_country_data = pd.read_csv(os.path.join(produced_dims_folder, "DimCountry.csv"))
produced_capital_data= pd.read_csv(os.path.join(produced_dims_folder, "DimCapital.csv"))

dim_country_data.loc[dim_country_data['country'] == 'Kazakhstan', 'country'] = 'Qazaqstan'
dim_capital_data.loc[dim_capital_data['Capital'] == 'Nursultan', 'Capital'] = 'Astana'
produced_country_data.loc[produced_country_data['country'] == 'Kazakhstan', 'country'] = 'Qazaqstan'
produced_capital_data.loc[produced_capital_data['Capital'] == 'Nursultan', 'Capital'] = 'Astana'

dim_country_data.to_csv(os.path.join(refined_dims_folder, "DimCountry.csv"), index=False)
dim_capital_data.to_csv(os.path.join(refined_dims_folder, "DimCapital.csv"), index=False)
produced_country_data.to_csv(os.path.join(produced_dims_folder, "DimCountry.csv"), index=False)
produced_capital_data.to_csv(os.path.join(produced_dims_folder, "DimCapital.csv"), index=False)
