In [19]:
# Import libraries
import pyodbc
import sqlalchemy
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.engine.url import URL
from sqlalchemy.exc import SQLAlchemyError
import logging
from datetime import datetime

import warnings
warnings.filterwarnings('ignore')

## Database Connection

In [2]:
# Database connection parameters
driver_name = 'ODBC Driver 18 for SQL Server'
server_name = 'Tommie\\SQLEXPRESS'
database_name = 'AdventureWorks2022'
connection_string = f"mssql+pyodbc://{server_name}/{database_name}?driver={driver_name}&Trusted_Connection=yes&TrustServerCertificate=yes&Encrypt=yes"


In [3]:
logging.basicConfig(level=logging.INFO)

# Function to connect to database
def connect_to_database(connection_string):
    try:
        engine = create_engine(connection_string)
        connection = engine.connect()
        print("Connection successful")
        return connection
    except Exception as e:
        print(f"Connection failed: {e}")
        return None


In [4]:
# Set up logging
log_filename = f"log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
logging.basicConfig(filename=log_filename, level=logging.INFO, 
                    format='%(asctime)s - %(levelname)s - %(message)s')

# Also set up logging to print to console
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logging.getLogger().addHandler(console_handler)

# Log the start of the script
logging.info("Script started.")


INFO:root:Script started.
2024-06-07 00:36:09,594 - INFO - Script started.


In [5]:
# connect to database
connect_to_database(connection_string)

Connection successful


<sqlalchemy.engine.base.Connection at 0x23c5b916610>

## Exercise 1: Executing SQL Commands

Objective: Understand how to execute SQL queries and commands from Python.

Skills to cover:
- > Connecting to a SQL database from Python.
- > Executing DDL commands (CREATE, ALTER).
- > Executing DML commands (INSERT, UPDATE, DELETE)

In [6]:
# Function to execute query
def execute_query(query, server_name, database_name):
    """
    Execute a SQL query on the provided database connection.

    Args:
    query (str): The SQL query to be executed.
    server_name (str): The name of the SQL server.
    database_name (str): The name of the database.

    Returns:
    result_proxy: The result of the executed query.
    """
    try:
        # Format the connection string
        connection_string = f"mssql+pyodbc://{server_name}/{database_name}?Encrypt=yes&TrustServerCertificate=yes&driver=ODBC+Driver+18+for+SQL+Server&trusted_connection=yes"
        
        # Create engine
        engine = create_engine(connection_string)
        
        # Connect to the database and execute query
        with engine.connect() as connection:
            result_proxy = connection.execute(text(query))
            print("Query executed successfully")
            return result_proxy
    except SQLAlchemyError as e:
        print(f"Error executing query: {e}")
        return None


In [7]:
# Function to create a new table
def create_table(table_name, schema, server_name, database_name):
    """
    Create a table in the specified database with the given schema.

    Args:
    table_name (str): The name of the table to be created.
    schema (str): The schema definition for the table.
    server_name (str): The name of the SQL server.
    database_name (str): The name of the database.

    Returns:
    bool: True if the table creation was successful, False otherwise.
    """
    query = f"CREATE TABLE {table_name} ({schema})"

    try:
        execute_query(query, server_name, database_name)
        print(f"Table {table_name} created successfully")
        return True
    except SQLAlchemyError as e:
        print(f"Error creating table: {e}")
        return False


In [8]:
# Create a new table in AdventureWorks2022 Database
if __name__ == "__main__":
    table_name = "CustomerFeedback"
    schema = """
        FeedBackID INT PRIMARY KEY,
        CustomerID INT,
        FeedBackDate DATE,
        Comments NVARCHAR(100)
    """
    server_name = "Tommie\\SQLEXPRESS"
    database_name = "AdventureWorks2022"
    
    success = create_table(table_name, schema, server_name, database_name)
    
    if success:
        print("Table creation completed successfully.")
    else:
        print("Table creation failed.")


Query executed successfully
Table CustomerFeedback created successfully
Table creation completed successfully.


In [9]:
# Function to insert data into the created table
def insert_data(table_name, data_list, server_name, database_name):
    """
    Insert data_list into the specified table in the database.

    Args:
    table_name (str): The name of the table where data_list will be inserted.
    data_list (dict): A dictionary of data_list to be inserted. Keys are column names and values are the corresponding values to insert.
    server_name (str): The name of the SQL server.
    database_name (str): The name of the database.

    Returns:
    bool: True if the insertion was successful, False otherwise.
    """
    # Create a list of column names and a list of values
    columns = ', '.join(data_list.keys())
    values = ', '.join(f":{key}" for key in data_list.keys())

    # Create the SQL query for insertion
    query = text(f"INSERT INTO {table_name} ({columns}) VALUES ({values})")
    
    engine = connect_to_database(connection_string)
    if engine is None:
        return False

    try:
        # Execute the query
        with engine.connect() as connection:
            connection.execute(query, data_list)
            print("Data inserted successfully")
        return True
    except SQLAlchemyError as e:
        print(f"Error inserting data_list: {e}")
        return False

In [10]:
if __name__ == "__main__":
    table_name = "CustomerFeedback"
    data_list =   [
        {
            "FeedbackID": 1,
            "CustomerID": 10,
            "FeedbackDate": '2023-01-15',
            "Comments": 'Great service!'
        },
        {
            "FeedbackID": 2,
            "CustomerID": 23,
            "FeedbackDate": '2023-02-20',
            "Comments": 'Very satisfied with the product.'
        },
        {
            "FeedbackID": 3,
            "CustomerID": 35,
            "FeedbackDate": '2023-03-10',
            "Comments": 'Delivery was on time.'
        },
        {
            "FeedbackID": 4,
            "CustomerID": 42,
            "FeedbackDate": '2023-04-05',
            "Comments": 'Excellent customer support.'
        },
        {
            "FeedbackID": 5,
            "CustomerID": 56,
            "FeedbackDate": '2023-05-12',
            "Comments": 'Product quality exceeded expectations.'
        },
        {
            "FeedbackID": 6,
            "CustomerID": 78,
            "FeedbackDate": '2023-06-25',
            "Comments": 'Will definitely purchase again.'
        },
        {
            "FeedbackID": 7,
            "CustomerID": 84,
            "FeedbackDate": '2023-07-30',
            "Comments": 'Good value for money.'
        },
        {
            "FeedbackID": 8,
            "CustomerID": 92,
            "FeedbackDate": '2023-08-14',
            "Comments": 'Website was easy to navigate.'
        },
        {
            "FeedbackID": 9,
            "CustomerID": 101,
            "FeedbackDate": '2023-09-18',
            "Comments": 'Had an issue but it was resolved quickly.'
        },
        {
            "FeedbackID": 10,
            "CustomerID": 115,
            "FeedbackDate": '2023-10-22',
            "Comments": 'Happy with the purchase experience.'
        },
        {
            "FeedbackID": 11,
            "CustomerID": 123,
            "FeedbackDate": '2023-11-05',
            "Comments": 'User-friendly interface.'
        },
        {
            "FeedbackID": 12,
            "CustomerID": 134,
            "FeedbackDate": '2023-11-15',
            "Comments": 'Highly recommend to friends.'
        },
        {
            "FeedbackID": 13,
            "CustomerID": 145,
            "FeedbackDate": '2023-11-25',
            "Comments": 'Fast and reliable service.'
        },
        {
            "FeedbackID": 14,
            "CustomerID": 157,
            "FeedbackDate": '2023-12-01',
            "Comments": 'Customer support was very helpful.'
        },
        {
            "FeedbackID": 15,
            "CustomerID": 168,
            "FeedbackDate": '2023-12-12',
            "Comments": 'Products arrived in good condition.'
        },
        {
            "FeedbackID": 16,
            "CustomerID": 179,
            "FeedbackDate": '2023-12-20',
            "Comments": 'Easy to place orders.'
        },
        {
            "FeedbackID": 17,
            "CustomerID": 189,
            "FeedbackDate": '2023-12-25',
            "Comments": 'Received prompt assistance.'
        },
        {
            "FeedbackID": 18,
            "CustomerID": 194,
            "FeedbackDate": '2024-01-02',
            "Comments": 'Good range of products.'
        },
        {
            "FeedbackID": 19,
            "CustomerID": 205,
            "FeedbackDate": '2024-01-10',
            "Comments": 'Very convenient shopping experience.'
        },
        {
            "FeedbackID": 20,
            "CustomerID": 215,
            "FeedbackDate": '2024-01-18',
            "Comments": 'Excellent return policy.'
        },
    ]
    server_name = "Tommie\\SQLEXPRESS"
    database_name = "AdventureWorks2022"
    
    for data in data_list:
        success = insert_data(table_name, data, server_name, database_name)
        
        if success:
            print(f"Data insertion for FeedbackID {data['FeedbackID']} completed successfully.")
        else:
            print(f"Data insertion for FeedbackID {data['FeedbackID']} failed.")

Connection successful
Data inserted successfully
Data insertion for FeedbackID 1 completed successfully.
Connection successful
Data inserted successfully
Data insertion for FeedbackID 2 completed successfully.
Connection successful
Data inserted successfully
Data insertion for FeedbackID 3 completed successfully.
Connection successful
Data inserted successfully
Data insertion for FeedbackID 4 completed successfully.
Connection successful
Data inserted successfully
Data insertion for FeedbackID 5 completed successfully.
Connection successful
Data inserted successfully
Data insertion for FeedbackID 6 completed successfully.
Connection successful
Data inserted successfully
Data insertion for FeedbackID 7 completed successfully.
Connection successful
Data inserted successfully
Data insertion for FeedbackID 8 completed successfully.
Connection successful
Data inserted successfully
Data insertion for FeedbackID 9 completed successfully.
Connection successful
Data inserted successfully
Data i

In [11]:
# Function to update table
def update_data(table_name, set_clause, condition, server_name, database_name):
    """
    Update data in the specified table.

    Args:
    table_name (str): The name of the table to update.
    set_clause (str): The SET clause defining the columns and values to be updated.
    condition (str): The condition for the rows to be updated.
    server_name (str): The name of the SQL server.
    database_name (str): The name of the database.

    Returns:
    bool: True if the update was successful, False otherwise.
    """
    # Create the SQL query for updating the data
    query = f"UPDATE {table_name} SET {set_clause} WHERE {condition}"
    
    try:
        execute_query(query, server_name, database_name)
        print(f"Data in table {table_name} updated successfully")
        return True
    except SQLAlchemyError as e:
        print(f"Error updating data: {e}")
        return False

In [12]:
# Update the CustomerID for FeedbackID 2
if __name__ == "__main__":
    table_name = "CustomerFeedback"
    set_clause = "CustomerID = 9900"
    condition = "FeedbackID = 2"
    server_name = "Tommie\\SQLEXPRESS"
    database_name = "AdventureWorksDW2022"
    success = update_data(table_name, set_clause, condition, server_name, database_name)

    if success:
        print("Data update completed successfully.")
    else:
        print("Data update failed.")

Query executed successfully
Data in table CustomerFeedback updated successfully
Data update completed successfully.


In [13]:
def delete_data(table_name, condition, server_name, database_name):
    """
    Delete data from the specified table.

    Args:
    table_name (str): The name of the table to delete from.
    condition (str): The condition for the rows to be deleted.
    server_name (str): The name of the SQL server.
    database_name (str): The name of the database.

    Returns:
    bool: True if the deletion was successful, False otherwise.
    """
    # Create the SQL query for deleting the data
    query = f"DELETE FROM {table_name} WHERE {condition}"
    
    try:
        execute_query(query, server_name, database_name)
        print(f"Data from table {table_name} deleted successfully")
        return True
    except SQLAlchemyError as e:
        print(f"Error deleting data: {e}")
        return False



In [14]:
# Delete record for FeedbackID 1 from CustomerFeedback table
if __name__ == "__main__":
    table_name = "CustomerFeedback"
    condition = "FeedbackID = 1"
    server_name = "Tommie\\SQLEXPRESS"
    database_name = "AdventureWorks2022"
    success = delete_data(table_name, condition, server_name, database_name)

    if success:
        print("Data deletion completed successfully.")
    else:
        print("Data deletion failed.")

Query executed successfully
Data from table CustomerFeedback deleted successfully
Data deletion completed successfully.


## Exercise 2: Uploading Data

Objective: Learn to upload data from a CSV file to the SQL Server database.

Skills to Cover:
- > Reading files into Python
- > Bulk data upload from Python to SQL Server
- > Handling file paths and exceptions

In [15]:
def read_csv_and_insert(file_path, table_name, server_name, database_name):
    """
    Read a CSV file and insert its contents into the specified database table.

    Args:
    file_path (str): The path to the CSV file.
    table_name (str): The name of the table where data will be inserted.
    server_name (str): The name of the SQL server.
    database_name (str): The name of the database.

    Returns:
    bool: True if the insertion was successful, False otherwise.
    """
    try:
        # Read the CSV file into a DataFrame
        df = pd.read_csv(file_path)

        # Connect to the database
        engine = connect_to_database(server_name, database_name)
        
        if engine is None:
            return False

        # Insert the DataFrame into the database table
        df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
        print(f"Data from {file_path} inserted successfully into {table_name}")
        return True
    except Exception as e:
        print(f"Error inserting data from {file_path} into {table_name}: {e}")
        return False


In [16]:
if __name__ == "__main__":
    file_path = r"C:\Users\HP\OneDrive\Documents\GitHub\Data Engineering\Week_4_First_Project\ProductReviews.csv"
    table_name = "ProductReviews"
    server_name = "Tommie\\SQLEXPRESS"
    database_name = "AdventureWorks2022"
    
    try:
        success = read_csv_and_insert(file_path, table_name, server_name, database_name)
        if success:
            print("CSV data insertion completed successfully.")
        else:
            print("CSV data insertion failed.")
    except Exception as e:
        print(f"An error occurred: {e}")


Error inserting data from C:\Users\HP\OneDrive\Documents\GitHub\Data Engineering\Week_4_First_Project\ProductReviews.csv into ProductReviews: connect_to_database() takes 1 positional argument but 2 were given
CSV data insertion failed.


## Exercise 3: Extracting Data

Objective: Extract data from SQL Server using queries and store it in a structured format in
Python.

Skills to cover
- > Writing Complex SQL queries
- > Using pandas for data manipulation
- > Exporting data to CSV

In [17]:
# Function to read data from the database
def fetch_data(query, server_name, database_name):

    try:
        # Create engine and connect to the database
        engine = create_engine(connection_string)

        # Execute the query and fetch the result
        result = engine.execute(query)

        # Convert the result to a Pandas DataFrame
        df = pd.DataFrame(result.fetchall(), columns=result.keys())

        return df
    except Exception as e:
        print(f"An error occurred: {e}")
        return pd.DataFrame()

In [18]:
# SQL query to fetch data for the last year and calculate total sales per product
sales_query = f"""
SELECT 
    ProductID, 
    SUM(OrderQty) AS TotalOrder,
    SUM(LineTotal) AS TotalSales
FROM 
    Sales.SalesOrderDetail
WHERE 
    YEAR(ModifiedDate) = 2013
GROUP BY 
    ProductID
ORDER BY
    TotalSales DESC
"""


# Execute the query and retrieve data
df = fetch_data(sales_query, server_name, database_name)

# Display the first few rows of the dataframe
print(df.head())

# Export the DataFrame to a CSV file
csv_file_path = "lastYearSales.csv"
df.to_csv(csv_file_path, index=False)
logging.info(f"Data exported successfully to {csv_file_path}.")

# Log the end of the script
logging.info("Script ended.")

INFO:root:Data exported successfully to lastYearSales.csv.
2024-06-07 00:37:33,263 - INFO - Data exported successfully to lastYearSales.csv.
INFO:root:Script ended.
2024-06-07 00:37:33,274 - INFO - Script ended.


   ProductID  TotalOrder      TotalSales
0        782        1470  2212974.782652
1        783        1262  1932388.290685
2        779        1164  1815673.093232
3        784        1036  1666660.023576
4        781        1033  1657616.282084
