In [1]:
# update the pandas package - optional
# !pip install --upgrade pandas

In [None]:
import sqlite3
import sys
import pandas as pd

# Add the hamilton module to your path - optinal
# project_dir = "### ADD PATH HERE ###"
# sys.path.append(project_dir)

from hamilton import base, driver
from hamilton.io.materialization import to

In [None]:
# We use the autoreload extension that comes with ipython to automatically reload modules when
# the code in them changes.

# import the jupyter extension
%load_ext autoreload
# set it to only reload the modules imported
%autoreload 1

In [None]:
%%writefile spend_calculations.py
# Define your new Hamilton functions.
# The %%writefile magic command creates a new Python module with the functions below.
# We will import this later and pass it into our Driver.

import pandas as pd
 
# Look at `my_functions` to see how these functions connect.
def avg_3wk_spend(spend: pd.Series) -> pd.Series:
    """Rolling 3 week average spend."""
    return spend.rolling(3).mean()


def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series:
    """The cost per signup in relation to spend."""
    return spend / signups


def spend_mean(spend: pd.Series) -> float:
    """Shows function creating a scalar. In this case it computes the mean of the entire column."""
    return spend.mean()


def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series:
    """Shows function that takes a scalar. In this case to zero mean spend."""
    return spend - spend_mean


def spend_std_dev(spend: pd.Series) -> float:
    """Function that computes the standard deviation of the spend column."""
    return spend.std()


def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series:
    """Function showing one way to make spend have zero mean and unit variance."""
    return spend_zero_mean / spend_std_dev

In [None]:
initial_columns = {  # load from actuals or wherever -- this is our initial data we use as input.
    # Note: these values don't have to be all series, they could be a scalar.
    "signups": pd.Series([1, 10, 50, 100, 200, 400]),
    "spend": pd.Series([10, 10, 20, 40, 40, 50]),
}

In [None]:
%aimport spend_calculations

df_builder = base.PandasDataFrameResult()
dr = driver.Driver({}, spend_calculations)  # can pass in multiple modules

In [None]:
# we need to specify what we want in the final dataframe. These can be string names, or function references.
output_columns = [
    "spend",
    "signups",
    "avg_3wk_spend",
    "spend_per_signup",
    "spend_zero_mean_unit_variance",
]

In [None]:
# set up db connection for sql materializer below
conn = sqlite3.connect("df.db")

# remove an previous instances of the 'test' table that will be created next
conn.cursor().execute("DROP TABLE IF EXISTS test;")
conn.commit()

In [None]:
materializers = [
    # materialize the dataframe to a pickle file
    to.pickle(
        dependencies=output_columns,
        id="df_to_pickle",
        path="./df.pkl",
        combine=df_builder,
    ),
    # materialize the dataframe to a JSON file
    to.json(
        dependencies=output_columns,
        id="df_to_json",
        filepath_or_buffer="./df.json",
        combine=df_builder,
    ),
    to.sql(
        dependencies=output_columns,
        id="df_to_sql",
        table_name="test",
        db_connection=conn,
        combine=df_builder,
    ),
    # materialize the dataframe to a XML file
    to.xml(
        dependencies=output_columns,
        id="df_to_xml",
        path_or_buffer="./df.xml",
        combine=df_builder,
    ),
    to.html(
        dependencies=output_columns,
        id="df_to_html",
        buf="./df.html",
        combine=df_builder,
    ),
    to.stata(
        dependencies=output_columns,
        id="df_to_stata",
        path="./df.dta",
        combine=df_builder,
    ),
    to.feather(
        dependencies=output_columns,
        id="df_to_feather",
        path="./df.feather",
        combine=df_builder,
    ),
    to.excel(
        dependencies=output_columns,
        id="df_to_excel",
        excel_writer="./df.xlsx",
        combine=df_builder
    )
]
# Visualize what is happening
dr.visualize_materialization(
    *materializers,
    additional_vars=output_columns,
    inputs=initial_columns,
)

In [None]:
# Materialize a result, i.e. execute the DAG!
materialization_results, additional_outputs = dr.materialize(
    *materializers,
    additional_vars=[
        "df_to_pickle_build_result",
        "df_to_json_build_result",
        "df_to_sql_build_result",
        "df_to_xml_build_result",
        "df_to_html_build_result",
        "df_to_stata_build_result",
        "df_to_feather_build_result",
        "df_to_excel_build_result",
    ],  # because combine is used, we can get that result here.
    inputs=initial_columns,
)

In [None]:
materialization_results

In [None]:
additional_outputs["df_to_pickle_build_result"]

In [None]:
additional_outputs["df_to_json_build_result"]

In [None]:
print(additional_outputs["df_to_sql_build_result"])

In [None]:
print(additional_outputs["df_to_xml_build_result"])

In [None]:
print(additional_outputs["df_to_html_build_result"])

In [None]:
print(additional_outputs["df_to_stata_build_result"])

In [None]:
print(additional_outputs["df_to_feather_build_result"])

In [None]:
print(additional_outputs["df_to_excel_build_result"])

In [None]:
# closing out db connection
conn.close()