In [None]:
# DO NOT DELETE THIS CELL
# ADD YOUR PARAMETER DEFAULT VALUES HERE
write_to_sql=False
debug=True
data={
    "example_dataset": [
        {"name":"Mario", "age":34},
        {"name": "Luigi", "age":34}
]}
kwargs={
    "anything_else_you_could_use": 42
}

In [None]:
import os
from dotenv import load_dotenv
import pandas as pd
import pyodbc
from sqlalchemy import create_engine
from sqlalchemy.engine import URL

def debug_value(var):
    """
    only debug (print) if debug is True
    """
    if debug:
        return var
    else:
        pass

def get_available_sql_driver():
    """
    automatically fetch any available sql driver from the machine os. 
    raise ValueError if there is no valid driver.
    """
    driver_names = [x for x in pyodbc.drivers() if x.endswith(' for SQL Server')]
    if len(driver_names) > 0:
        driver_name = driver_names[0]
        return driver_name
    else:
       raise ValueError("Cannot connect. No suitable driver found.\nInstall driver from here: https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server?view=sql-server-ver16\n\n")

def create_sql_connection_string_with_driver(odcb_conn_str: str, driver: str):
    """
    get the odbc connection string from here: https://portal.azure.com/#@dolderag.onmicrosoft.com/resource/subscriptions/888af663-7438-4b1b-940a-87795e06ba4d/resourceGroups/rg-prod-sql/providers/Microsoft.Sql/servers/sds-prod-dolder/databases/DolderDB/connectionStrings
    returns a string in this format: Driver={ODBC Driver 18 for SQL Server};Server=tcp:sds-prod-dolder.database.windows.net,1433;Database=DolderDB;Uid={your_user_name};Pwd={your_user_password};Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;
    """
    return f"Driver={{{driver}}};{odcb_conn_str}"

def initialize_sql_connection(odcb_conn_str: str):
    """
    initialize the db connection with the following steps
    1. get available odbc driver or raise exception if not suitable option is found
    2. add driver prefix to odcb connection string
    3. create connection with sqlalchemy using a mssql+pyodbc conn string
    """
    driver = get_available_sql_driver()
    sql_conn_str = create_sql_connection_string_with_driver(odcb_conn_str, driver)
    connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": sql_conn_str})
    engine = create_engine(connection_url)
    conn = engine.connect()
    return (conn, engine)

load_dotenv()
ODBC_CONN_STR = os.environ["SQL_CONNECTION_STRING"]
_, engine = initialize_sql_connection(ODBC_CONN_STR)

In [None]:
# GET DATA
sales_transactions = pd.read_sql(con=engine, sql="SELECT * FROM etl.SalesTransactions st WHERE YEAR(st.PostingDate) > 2020")

In [None]:
debug_value(sales_transactions)

In [None]:
sales_transactions["PostingDate"] = pd.to_datetime(sales_transactions["PostingDate"])
sales_transactions["Year"] = sales_transactions["PostingDate"].dt.year
sales_transactions["Month"] = sales_transactions["PostingDate"].dt.month
grouped_sales = sales_transactions.groupby(by=["CompanyCode", "Year", "Month"])[["Quantity", "NetSalesEUR", "GrossProfitEUR"]].sum()

In [None]:
debug_value(grouped_sales)

In [None]:
# DO NOT DELETE THIS CELL
# ASSIGN YOUR RETURN VALUE HERE
return_value: pd.DataFrame = grouped_sales.reset_index()

In [None]:
# EDIT THESE VARIABLES TO DEFINE THE SINK TABLE
sink_table_name="SalesYTDByCompanyYear"
sink_schema_name="etl"

In [None]:
# DO NOT DELETE OR EDIT THIS CELL
if write_to_sql:
    return_value.to_sql(name=sink_table_name, schema=sink_schema_name, if_exists="replace", con=engine)

In [None]:
# DO NOT EDIT OR DELETE THIS CELL
return_value.to_dict(orient="records")