In [165]:
import os
import pandas as pd
import pyodbc
from datetime import datetime
from decimal import Decimal

In [159]:
# Define MSSQL connection details
connection_string = 'DRIVER={SQL Server};SERVER=DESKTOP-CMTGLLQ;DATABASE=JLEARN;trusted_connection=YES'

# Connect to the database
conn = pyodbc.connect(connection_string)
cursor = conn.cursor()

In [160]:
# Create directories for the Medallion architecture
base_dir = "warehouse"

if not os.path.exists(base_dir):
    os.makedirs(base_dir)

def create_folders():
    layers = ['Bronze', 'Silver', 'Gold']
    for directory in layers:
        if not os.path.exists(base_dir + "/" + directory):
            os.makedirs(base_dir + "/" + directory)

create_folders()


In [89]:
# PREPARING DATABASE
def clean_db():
    schemas = ['raw','prep','mart']
    tables = ['user_courses','users','courses']
    def create_schema(schema):
        query = f"select schema_id('{schema}');"
        cursor.execute(query)
        res = tuple(cursor.fetchall()[0])[0]
        if(res == None):
            query = f"create schema {schema};"
            cursor.execute(query)
            print(f"{schema} schema created")
            conn.commit()

    def drop_tables(schema, table):
        query = f"DROP TABLE IF EXISTS {schema}.{table};"
        cursor.execute(query)
        conn.commit()

    def create_users_table(schema):
        query = f"""CREATE TABLE {schema}.users (
            UserID       INT            PRIMARY KEY,
            UserName     NVARCHAR(100),
            FullName     NVARCHAR(100),
            Email        NVARCHAR(255) UNIQUE,
            PasswordHash NVARCHAR(255),
            Role         NVARCHAR(50)   DEFAULT 'employee',
            RegisteredAt DATETIME       DEFAULT GETDATE()
        )"""
        cursor.execute(query)
        conn.commit()

    def create_courses_table(schema):
        query = f"""CREATE TABLE {schema}.courses (
            course_id           INT            PRIMARY KEY,
            course_title        NVARCHAR(100),
            num_subscribers     INT,
            num_reviews         SMALLINT,
            num_lectures        SMALLINT,
            level               NVARCHAR(50),
            content_duration    FLOAT,
            published_timestamp NVARCHAR(50),
            subject             NVARCHAR(50)
        )"""
        cursor.execute(query)
        conn.commit()

    def create_user_courses_table(schema):
        query = f"""CREATE TABLE {schema}.user_courses (
            user_course_id INT       PRIMARY KEY,
            user_id        INT,
            course_id      INT,
            status         VARCHAR(50),
            progress       DECIMAL(5, 2) DEFAULT 0.00,
            enrolled_at    DATETIME      DEFAULT GETDATE(),
            completed_at   DATETIME,
            score          DECIMAL(5, 2),
            CONSTRAINT FK_user_courses_users FOREIGN KEY (user_id) REFERENCES raw.users(UserID),
            CONSTRAINT FK_user_courses_courses FOREIGN KEY (course_id) REFERENCES raw.courses(course_id)
        )"""
        cursor.execute(query)
        conn.commit()

    for schema in schemas:
        create_schema(schema)
        for table in tables:
            drop_tables(schema, table)

    create_users_table('raw')
    create_courses_table('raw')
    create_user_courses_table('raw')

In [83]:
# Fetch data using pyodbc and convert to pandas DataFrame
def fetch_data(query, conn):
    cursor = conn.cursor()
    cursor.execute(query)
    columns = [column[0] for column in cursor.description]
    data = cursor.fetchall()
    df = pd.DataFrame([tuple(row) for row in data], columns=columns)
    return df


In [171]:
# Utility function to save data to both CSV and SQL Server

def save_data(df, table_name, stage, schema):

    # Save to CSV
    file_path = os.path.join(base_dir + "\\" +stage, f'{table_name}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv')
    print(file_path)
    df.to_csv(file_path, index=False)

    df = df.applymap(lambda x: x.strftime('%Y-%m-%d %H:%M:%S') if isinstance(x, pd.Timestamp) else x)
    df = df.applymap(lambda x: float(x) if isinstance(x, Decimal) else x)
    # df = df.applymap(lambda x: None if isinstance(x, pd.notna) else x)
    df = df.applymap(lambda x: None if (isinstance(x, Decimal) and (x.is_nan() or x == Decimal('NaN'))) else float(x) if isinstance(x, Decimal) else x)
    df = df.replace("'","", regex=True)

    # Save to SQL Server
    conn_str = f"INSERT INTO {schema}.{table_name} ({', '.join(df.columns)}) VALUES "
    values = ', '.join([str(tuple(row)) for row in df.values])
    query = conn_str + values
    print(values)
    with conn.cursor() as cursor:
        cursor.execute(query)
        conn.commit()

In [172]:
# DON'T FORGET TO CREATE RESPECTIVE TABLES IN EACH SCHEMA


# ----------------------------------------------
# 1. Bronze Layer: Raw data ingestion from tables
# ----------------------------------------------
# Ingest raw data from SQL Server
def bronze_ingestion():
    user_query = "SELECT * FROM users"
    course_query = "SELECT * FROM courses"
    user_courses_query = "SELECT * FROM user_courses"
    
    # Fetching raw data
    # users_df = fetch_data(user_query, conn)
    # courses_df = fetch_data(course_query, conn)
    user_courses_df = fetch_data(user_courses_query, conn)
    
    # Save raw data
    # save_data(users_df, 'users', 'Bronze','raw')
    # save_data(courses_df, 'courses', 'Bronze','raw')
    save_data(user_courses_df, 'user_courses', 'Bronze','raw')

clean_db()
bronze_ingestion()

warehouse\Bronze\user_courses_20241005_203111.csv
(22, 1, 8325, 'undertaken', 41.0, '2024-10-03 12:04:08', '2024-10-05 10:09:00', nan), (23, 76, 1117796, 'undertaken', 100.0, '2024-03-01 15:44:00', '2024-05-10 15:44:00', 95.0), (24, 76, 606928, 'undertaken', 100.0, '2024-06-02 08:52:19', '2024-06-04 08:52:19', 56.0), (25, 76, 192870, 'undertaken', 100.0, '2024-03-28 03:53:42', '2024-07-04 03:53:42', 87.0), (26, 76, 1113822, 'assigned', 100.0, '2024-09-15 21:07:02', '2024-11-03 21:07:02', 65.0), (27, 45, 820194, 'undertaken', 100.0, '2024-04-22 13:42:49', '2024-05-03 13:42:49', 72.0), (28, 45, 595876, 'assigned', 100.0, '2023-10-23 08:49:26', '2023-12-08 08:49:26', 62.0), (29, 86, 801486, 'assigned', 100.0, '2024-04-29 22:03:01', '2024-05-17 22:03:01', 64.0), (30, 86, 1151326, 'undertaken', 100.0, '2024-01-23 16:23:51', '2024-03-20 16:23:51', 82.0), (33, 96, 1146014, 'undertaken', 100.0, '2024-03-26 07:01:16', '2024-05-15 07:01:16', 95.0), (34, 96, 837722, 'undertaken', 100.0, '2024-02-

ProgrammingError: ('42S22', "[42S22] [Microsoft][ODBC SQL Server Driver][SQL Server]Invalid column name 'nan'. (207) (SQLExecDirectW)")

In [35]:
# ----------------------------------------------
# 2. Silver Layer: Cleansing and Enrichment
# ----------------------------------------------
def silver_transformation():
    # Clean and join the data (Enrichment)
    join_query = """
    SELECT uc.user_course_id, u.FullName, c.course_title, uc.status, uc.progress, uc.enrolled_at
    FROM user_courses uc
    JOIN users u ON uc.user_id = u.UserID
    JOIN courses c ON uc.course_id = c.course_id
    WHERE uc.status IN ('assigned', 'undertaken')
    """
    
    # Fetching enriched data
    enriched_df = pd.read_sql(join_query, conn)
    
    # Save enriched data
    save_data(enriched_df, 'user_course_silver', 'Silver')

silver_transformation()

  enriched_df = pd.read_sql(join_query, conn)


TypeError: save_data() missing 1 required positional argument: 'schema'

In [None]:
# ----------------------------------------------
# 3. Gold Layer: Aggregation and Analysis
# ----------------------------------------------
def gold_transformation():
    # Aggregate course completion statistics
    gold_query = """
    SELECT c.course_title, COUNT(uc.user_course_id) as num_users, AVG(uc.progress) as avg_progress
    FROM user_courses uc
    JOIN courses c ON uc.course_id = c.course_id
    WHERE uc.progress = 100
    GROUP BY c.course_title
    """
    
    # Fetching aggregated data
    aggregated_df = pd.read_sql(gold_query, conn)
    
    # Save aggregated data
    save_data(aggregated_df, 'course_completion_gold', 'Gold')

gold_transformation()

# Close connection
conn.close()