# Crisp Common

This notebook contains shared code used across all Crisp Blueprints notebooks. It’s meant to be imported into other notebooks, not run on its own. The goal is to avoid code duplication and simplify maintenance. We chose not to distribute it as a Python package to ensure it's accessible in any environment and user-friendly.

## Pre-requisites

Make sure you have the following variables set in your environment:

- `ACCOUNT_ID`: Your Crisp account ID
- `CONNECTOR_ID`: Your Crisp connector ID if using Databricks

## Detect environment that you are running with

In [None]:
from enum import Enum


class EnvironmentType(Enum):
    COLAB = "colab"
    DATABRICKS = "databricks"
    LOCAL = "local"


environment_type = None
ipython_env = str(get_ipython())
if "google.colab" in ipython_env:
    environment_type = EnvironmentType.COLAB
elif "Databricks" in ipython_env:
    environment_type = EnvironmentType.DATABRICKS
elif "ipykernel" in ipython_env:
    environment_type = EnvironmentType.LOCAL
else:
    raise ValueError("Unsupported environment")

print("Environment type: {}".format(environment_type.value))

## Install dependencies


In [None]:
%pip install \
"tornado==6.4.1" \
"pandas>=2.2.2,<3.0.0" \
"matplotlib==3.9.1" \
"scikit-learn>=1.5.1,<2.0.0" \
"seaborn>=0.13.2,<0.14.0" \
"plotly>=5.23.0,<6.0.0" \
"openai>=1.44.1" \
"langchain-openai>=0.2.1" \
"folium>=0.17.0" \
"ipywidgets>=8.1.5"

#### Install environment-specific dependencies

In [None]:
if environment_type == EnvironmentType.COLAB or environment_type == EnvironmentType.LOCAL:
    %pip install "google-cloud-storage==2.18.0" \
            "google-cloud-bigquery[pandas,pyarrow]==3.25.0" \
            "google-cloud-bigquery-storage>=2.25.0,<3.0.0"

if environment_type == EnvironmentType.LOCAL:
    %pip install "python-dotenv"

## Import dependencies

In [None]:
from IPython.core.magic import register_cell_magic

import os
import pandas as pd
import re

if environment_type == EnvironmentType.COLAB:
    from google.cloud import bigquery, exceptions
    from google.colab import auth
elif environment_type == EnvironmentType.DATABRICKS:
    from pyspark.sql import SparkSession
elif environment_type == EnvironmentType.LOCAL:
    from google.cloud import bigquery, exceptions
else:
    print("No extra imports")

## Set up environment variables

In [None]:
os.environ["GRPC_VERBOSITY"] = "ERROR"
os.environ["TK_SILENCE_DEPRECATION"] = "1"

if environment_type == EnvironmentType.LOCAL:
    from dotenv import load_dotenv, find_dotenv

    denv = find_dotenv(raise_error_if_not_found=True, usecwd=True)
    l = load_dotenv(denv)
    if l:
        print("Loaded .env file from {}".format(denv))
    else:
        print("No .env file found in {}".format(denv))

## Authenticate (if needed)

In [None]:
if environment_type == EnvironmentType.COLAB:
    auth.authenticate_user()

## Define utility functions

In [None]:
def eval_python_expression(match):
    expression = match.group(1)
    global_vars = globals()
    return str(eval(expression, global_vars))


def transform_sql_to_databricks(sql: str) -> str:
    transformations = [
        # Rule for DATE_TRUNC -> TRUNC
        (
            r"DATE_TRUNC\(DATE\((?P<column>.*?)\),\s*(?P<time_unit>\w+)\)",  # Pattern with named groups
            "TRUNC(DATE({column}), '{time_unit}')",  # Replacement template
            {},
        )
        # Add more transformations here as needed
    ]
    for pattern, replacement, abstracted_components in transformations:

        def replacer(match):
            dynamic_replacement = replacement.format(
                **match.groupdict(), **abstracted_components
            )
            return dynamic_replacement

        sql = re.sub(pattern, replacer, sql, flags=re.IGNORECASE)

    return sql

## Define magic loading data into a DataFrame

The magic cell accepts the query as the cell input and the dataframe name as the argument. The query can contain variables that are defined in the global scope.

Example usage:
```
%%load df
SELECT * FROM `{project}`.`{dataset}`.`table`
```

In [None]:
@register_cell_magic
def load(line, cell):
    formatted_query = re.sub(r"\{(.*?)\}", eval_python_expression, cell)

    if environment_type == EnvironmentType.COLAB:
        client = bigquery.Client(project=project)
        query_job = client.query(formatted_query)
        df = query_job.result().to_dataframe()
    elif environment_type == EnvironmentType.LOCAL:
        client = bigquery.Client()
        query_job = client.query(formatted_query)
        df = query_job.result().to_dataframe()
    elif environment_type == EnvironmentType.DATABRICKS:
        formatted_query = re.sub(
            r"`exp_", "`", transform_sql_to_databricks(formatted_query)
        )  # Exported tables have that prefix dropped
        spark = SparkSession.builder.getOrCreate()
        df = spark.sql(formatted_query).toPandas()
    else:
        raise ValueError("Unsupported environment")

    if line:
        globals()[line.strip()] = df
    else:
        return df

## Define magic saving a query or Dataframew as a table

The magic cell accepts a table name as the argument and a Dataframe or query as the cell input. The table name should be in the format `project.dataset.table`.

Example usage:
```
%%save project.dataset.table 
SELECT * FROM `{project}`.`{dataset}`.`table`
```
```
%%save project.dataset.table
df
```

In [None]:
@register_cell_magic
def save(line, cell):
    global_vars = globals()

    input_first_line = cell.strip().split("\n")[0]
    try:
        df = global_vars[input_first_line]
        is_dataframe = isinstance(df, pd.DataFrame)
    except KeyError:
        is_dataframe = False

    table_id = re.sub(r"\{(.*?)\}", eval_python_expression, line.strip())

    table = table_id.split(".")

    if len(table) != 3:
        raise ValueError("Table name should be in the format project.dataset.table")
    project, dataset, table = table

    if (
        environment_type == EnvironmentType.COLAB
        or environment_type == EnvironmentType.LOCAL
    ):
        client = bigquery.Client(project=project)
        dest_dataset = client.dataset(project=project, dataset_id=dataset)
        try:
            dest_dataset = client.get_dataset(dest_dataset)
        except exceptions.NotFound:
            dest_dataset = client.create_dataset(dest_dataset)

        if is_dataframe:
            job_config = bigquery.LoadJobConfig()
            job_config.write_disposition = "WRITE_TRUNCATE"
            table_ref = dest_dataset.table(table)
            job = client.load_table_from_dataframe(
                df, destination=table_ref, job_config=job_config
            )
            job.result()
        else:
            formatted_query = cell.format(**global_vars)
            job_config = bigquery.QueryJobConfig(
                destination=dest_dataset.table(table),
                write_disposition="WRITE_TRUNCATE",
            )
            query_job = client.query(formatted_query, job_config=job_config)
            query_job.result()

    elif environment_type == EnvironmentType.DATABRICKS:
        spark = SparkSession.builder.getOrCreate()
        if is_dataframe:
            frame = spark.createDataFrame(df)
        else:
            formatted_query = cell.format(**global_vars)
            formatted_query = re.sub(
                r"`exp_", "`", transform_sql_to_databricks(formatted_query)
            )
            frame = spark.sql(formatted_query)
        frame.write.mode("overwrite").saveAsTable(table_id)
    else:
        raise ValueError("Unsupported environment")

### Define source dataset

In [None]:
account_id = os.getenv("ACCOUNT_ID")

if not account_id or account_id == "[your-account-id]":
    raise ValueError("Please set your ACCOUNT_ID")

if (
    environment_type == EnvironmentType.COLAB
    or environment_type == EnvironmentType.LOCAL
):
    project = "crisp-frontier-dev"
    dataset = f"analytics_blueprints_{account_id}"
elif environment_type == EnvironmentType.DATABRICKS:
    project = "prod"
    connector_id = os.getenv("CONNECTOR_ID")
    if not connector_id or connector_id == "[your-connector-id]":
        raise ValueError("Please set your CONNECTOR_ID")
    dataset = f"schema_{account_id}_{connector_id}"
else:
    print("Unsupported environment")