To start, set up the connection strings for both the source (AdventureWorks) and target databases. Replace placeholders with actual connection details.

In [None]:
source_conn_str = "DRIVER={SQL Server};SERVER=localhost;DATABASE=AdventureWorks;UID=user;PWD=password"
target_conn_str = "DRIVER={SQL Server};SERVER=localhost;DATABASE=TargetDB;UID=user;PWD=password"


##Define the ETL Class
Define an AdventureWorksETL class with __init__ to accept connection strings for the source and target databases.


In [None]:
import pyodbc
import pandas as pd

class AdventureWorksETL:
    def __init__(self, source_conn_str, target_conn_str):
        self.source_conn_str = source_conn_str
        self.target_conn_str = target_conn_str


##Extract Data
Define the extract_data method to retrieve data from the AdventureWorks database. This method will take an SQL query as input.

In [None]:
    def extract_data(self, query):
        with pyodbc.connect(self.source_conn_str) as conn:
            df = pd.read_sql(query, conn)
        return df


#Transform Data
Define the transform_data method to process the extracted DataFrame. Here’s an example of filtering and renaming columns.

In [None]:
    def transform_data(self, df):
        transformed_df = df[df['SalesAmount'] > 1000]
        transformed_df = transformed_df.rename(columns={"SalesAmount": "Amount"})
        return transformed_df


#Load Data
Define the load_data method to insert transformed data into the target database.

In [None]:
    def load_data(self, df, target_table):
        with pyodbc.connect(self.target_conn_str) as conn:
            cursor = conn.cursor()
            for index, row in df.iterrows():
                cursor.execute(f"INSERT INTO {target_table} (CustomerID, Amount) VALUES (?, ?)", row.CustomerID, row.Amount)
            conn.commit()


#Run ETL Process
Define a run_etl method that chains the extract_data, transform_data, and load_data steps for a complete ETL pipeline.

In [None]:
    def run_etl(self, query, target_table):
        data = self.extract_data(query)
        transformed_data = self.transform_data(data)
        self.load_data(transformed_data, target_table)
        return transformed_data


#Initialize and Execute ETL
Now, create an instance of AdventureWorksETL and execute the ETL process with your desired query and target table.

In [None]:
etl = AdventureWorksETL(source_conn_str, target_conn_str)
query = "SELECT CustomerID, SalesAmount FROM Sales.SalesOrderHeader"
transformed_data = etl.run_etl(query, 'TargetTable')


# Inspect Transformed Data
Finally, view the transformed data to ensure it meets the transformation criteria.

In [None]:
transformed_data.head()
