In [1]:
from datetime import datetime
from modelhub import ModelHub
from bach import display_sql_as_markdown

In [2]:
modelhub = ModelHub(time_aggregation='%Y-%m-%d')

In [3]:
df = modelhub.get_objectiv_dataframe(start_date='2022-02-02')
df['feature_nice_name'] = df.location_stack.ls.nice_name

In [4]:
# construct steps column which is a list of strings 
df_steps = df.groupby('session_id')['feature_nice_name'].to_json_array().reset_index()
df_steps = df_steps.rename(columns={'feature_nice_name': 'steps'})
df_steps = df.merge(df_steps, on='session_id')[['session_id', 'steps']].drop_duplicates()
df_steps.to_pandas()

Unnamed: 0_level_0,session_id,steps
event_id,Unnamed: 1_level_1,Unnamed: 2_level_1
6307e080-d6a4-4e7d-b47f-a029b90cf6e1,97,"[Root Location: blog, Media Player: objectiv-i..."
03bfb487-a6ae-4175-bf3a-db127a75a85f,1651,"[Root Location: home, Media Player: 2-minute-v..."
21629b85-c17a-458b-a5ab-ff4c074a963c,2872,"[Root Location: home, Media Player: 2-minute-v..."
0f3b8b89-e4ff-4256-9beb-c3f252216699,2116,[Content: post-meet-objectiv-open-source-produ...
02ad3934-665c-47e9-8d76-c2a7a79f2c60,3600,[Pressable: hamburger located at Root Location...
...,...,...
0f0fc271-a3ca-4952-bed7-90ca7656dbb1,3836,[Pressable: after located at Root Location: ho...
1ba7c155-0521-431d-8f5d-564aa3e8daa6,3852,"[Root Location: home, Link: docs located at Ro..."
0d74fe99-d796-423b-bccb-f7c64a155aa6,3720,[Expandable: Reference located at Root Locatio...
044ec938-81c0-496a-9a11-f35412f81614,3975,"[Root Location: home, Link: docs located at Ro..."


## My custom lambda func

In [5]:
func_ngram = lambda data, n: [data[i: i + n] for i in range(len(data) - n + 1)]

func_ngram(['a', 'b', 'c', 'd', 'e', 'f'], 3)

[['a', 'b', 'c'], ['b', 'c', 'd'], ['c', 'd', 'e'], ['d', 'e', 'f']]

In [6]:
x = ['a', 'b', 'c', 'd', 'e', 'f']
len(x) - 3 + 1

4

In [7]:
x[:4]

['a', 'b', 'c', 'd']

### For each row of `df_steps.steps` want to apply `func_ngram` func

In [8]:
df_steps_pandas = df_steps.to_pandas()
n_gram = 3 # this value will provide the user

df_steps_pandas['desired_col'] = df_steps_pandas['steps'].apply(lambda x: func_ngram(x, n_gram))

In [9]:
df_steps_pandas['steps'].iloc[0]

['Root Location: blog',
 'Media Player: objectiv-in-2-minutes located at Root Location: blog => Content: post-meet-objectiv-open-source-product-analytics-designed-for-data-sc']

In [10]:
df_steps_pandas['desired_col'].iloc[0]

[]

In [11]:
df_steps_pandas['steps'].apply(lambda x: func_ngram(x, n_gram))

event_id
6307e080-d6a4-4e7d-b47f-a029b90cf6e1                                                   []
03bfb487-a6ae-4175-bf3a-db127a75a85f    [[Root Location: home, Media Player: 2-minute-...
21629b85-c17a-458b-a5ab-ff4c074a963c    [[Root Location: home, Media Player: 2-minute-...
0f3b8b89-e4ff-4256-9beb-c3f252216699    [[Content: post-meet-objectiv-open-source-prod...
02ad3934-665c-47e9-8d76-c2a7a79f2c60    [[Pressable: hamburger located at Root Locatio...
                                                              ...                        
0f0fc271-a3ca-4952-bed7-90ca7656dbb1    [[Pressable: after located at Root Location: h...
1ba7c155-0521-431d-8f5d-564aa3e8daa6    [[Root Location: home, Link: docs located at R...
0d74fe99-d796-423b-bccb-f7c64a155aa6    [[Expandable: Reference located at Root Locati...
044ec938-81c0-496a-9a11-f35412f81614    [[Root Location: home, Link: docs located at R...
110f27ac-183f-44d6-8800-ad8655825699    [[Expandable: Reference located at Root Locati...
N

In [12]:
import bach
from sql_models.util import is_postgres, is_bigquery, DatabaseNotSupportedException
from abc import abstractmethod


class ListShifterOperation:
    FIRST_ELEMENT_SUBLIST_OFFSET = '__first_sublist_offset'
    GENERATED_OFFSET_SUBLIST = '__sublist_item_offset'

    def __init__(self, list_series: bach.SeriesJson, shifting_n: int) -> None:
        self._list_series = list_series
        self._shifting_n = shifting_n

    def r_shift(self) -> bach.SeriesJson:
        """
        Steps for generating shifted sublists:
            1. Cast list_series expression to array (JSON_QUERY_ARRAY)
            2. Generate index of items per sublist
            3. Iterate over generated indexes and extract items from original array
            4. Generate expression for unnesting original array and creating array
                with final sub-lists
        """
        # step 1. cast list string to correct db type
        arr_parsed_list_series = self._get_array_parsed_list_series()

        # step 2.
        # Example: n = 6 and current item_pos = 2. Expression will generate:
        # [2, 3, 4, 5]
        generated_offsets_expr = self._get_generated_sublist_offset_expr()

        # step 3.
        extracted_sub_list_expr = self._get_extract_items_sublist_expr(
            arr_parsed_list_series, generated_offsets_expr,
        )

        # step 4.
        r_shifted_lists = self._get_rshifted_sublists_series(
            extracted_sub_list_expr,
            arr_parsed_list_series,
        )

        return r_shifted_lists

    @abstractmethod
    def _get_array_parsed_list_series(self) -> bach.SeriesJson:
        raise NotImplemented()

    @abstractmethod
    def _get_generated_sublist_offset_expr(self) -> bach.expression.Expression:
        raise NotImplemented()

    @abstractmethod
    def _get_extract_items_sublist_expr(
        self,
        array_parsed_series: bach.SeriesJson,
        generated_offsets_expr: bach.expression.Expression,
    ) -> bach.expression.Expression:
        raise NotImplemented()

    @abstractmethod
    def _get_rshifted_sublists_series(
        self,
        extracted_sub_list_expr: bach.expression.Expression,
        arr_parsed_list_series: bach.SeriesJson,
    ) -> bach.SeriesJson:
        raise NotImplemented()


class PostgresListShifterOperation(ListShifterOperation):

    def _get_array_parsed_list_series(self) -> bach.SeriesJson:
        return self._list_series.copy_override(
            expression=bach.expression.Expression.construct(
                'jsonb_array_elements({} :: jsonb)',
                self._list_series
            ),
        )

    def _get_generated_sublist_offset_expr(self) -> bach.expression.Expression:
        start_stmt = f'{self.FIRST_ELEMENT_SUBLIST_OFFSET} - 1'
        end_stmt = f'{self.FIRST_ELEMENT_SUBLIST_OFFSET} + {self._shifting_n - 2}'

        return bach.expression.Expression.raw(f'GENERATE_SERIES({start_stmt}, {end_stmt})')

    def _get_extract_items_sublist_expr(
        self,
        array_parsed_series: bach.SeriesJson,
        generated_offsets_expr: bach.expression.Expression,
    ) -> bach.expression.Expression:
        return bach.expression.Expression.construct(
            f"""
            SELECT {{}} -> cast({self.GENERATED_OFFSET_SUBLIST} as int)
            FROM {{}} as {self.GENERATED_OFFSET_SUBLIST}
            """,
            array_parsed_series,
            generated_offsets_expr
        )

    def _get_rshifted_sublists_series(
            self,
            extracted_sub_list_expr: bach.expression.Expression,
            arr_parsed_list_series: bach.SeriesJson,
    ) -> bach.SeriesJson:
        _ARRAY_TO_STR_STMT = "ARRAY(({}))"
        sub_array_list_series = self._list_series.copy_override(
            expression=bach.expression.Expression.construct(
                (
                    f'SELECT {_ARRAY_TO_STR_STMT} FROM {{}} '
                    f'WITH ORDINALITY rshift(elem, {self.FIRST_ELEMENT_SUBLIST_OFFSET}) '
                    f'WHERE {self.FIRST_ELEMENT_SUBLIST_OFFSET} <= {{}} - {self._shifting_n} '
                ),
                extracted_sub_list_expr,
                arr_parsed_list_series,
                self._list_series.json.get_array_length(),
            ),
        )

        final_array_list_series = sub_array_list_series.copy_override(
            expression=bach.expression.Expression.construct(
                f'{_ARRAY_TO_STR_STMT} :: "text"',
                sub_array_list_series,
            )
        )
        return final_array_list_series


class BigQueryListShifterOperation(ListShifterOperation):

    def _get_array_parsed_list_series(self) -> bach.SeriesJson:
        return self._list_series.copy_override(
            expression=bach.expression.Expression.construct(
                'JSON_QUERY_ARRAY({})',
                self._list_series
            )
        )

    def _get_generated_sublist_offset_expr(self) -> bach.expression.Expression:
        start_stmt = f'{self.FIRST_ELEMENT_SUBLIST_OFFSET}'
        end_stmt = f'{self.FIRST_ELEMENT_SUBLIST_OFFSET} + {self._shifting_n - 1}'

        return bach.expression.Expression.raw(f'GENERATE_ARRAY({start_stmt}, {end_stmt})')

    def _get_extract_items_sublist_expr(
        self,
        array_parsed_series: bach.SeriesJson,
        generated_offsets_expr: bach.expression.Expression,
    ) -> bach.expression.Expression:
        return bach.expression.Expression.construct(
            f"""
            SELECT {{}}[OFFSET({self.GENERATED_OFFSET_SUBLIST})]
            FROM UNNEST({{}}) as {self.GENERATED_OFFSET_SUBLIST}
            """,
            array_parsed_series,
            generated_offsets_expr
        )

    def _get_rshifted_sublists_series(
            self,
            extracted_sub_list_expr: bach.expression.Expression,
            arr_parsed_list_series: bach.SeriesJson,
    ) -> bach.SeriesJson:
        _ARRAY_TO_STR_STMT = "'[' || ARRAY_TO_STRING(ARRAY({}), ', ') || ']'"
        sub_array_list_series = self._list_series.copy_override(
            expression=bach.expression.Expression.construct(
                (
                    f'SELECT {_ARRAY_TO_STR_STMT} FROM UNNEST({{}}) WITH OFFSET '
                    f'AS {self.FIRST_ELEMENT_SUBLIST_OFFSET} '
                    f'WHERE {self.FIRST_ELEMENT_SUBLIST_OFFSET} <= {{}} - {self._shifting_n} '
                ),
                extracted_sub_list_expr,
                arr_parsed_list_series,
                self._list_series.json.get_array_length(),
            ),
        )

        final_array_list_series = sub_array_list_series.copy_override(
            expression=bach.expression.Expression.construct(
                _ARRAY_TO_STR_STMT, sub_array_list_series,
            )
        )
        return final_array_list_series


In [13]:
def bach_func_gram(list_series: bach.SeriesJson, n: int) -> bach.SeriesJson:
    engine = list_series.engine
    if is_postgres(engine):
        return PostgresListShifterOperation(list_series=list_series, shifting_n=n).r_shift()
    
    if is_bigquery(engine):
        return BigQueryListShifterOperation(list_series=list_series, shifting_n=n).r_shift()
    
    raise DatabaseNotSupportedException(engine)


In [21]:
import os
def get_steps_df(db_dialect):
    if db_dialect == 'postgres':
        os.environ["DSN"] = "postgresql://objectiv:@localhost:5432/objectiv"
    else:
        os.environ["DSN"] = "bigquery://objectiv-production/snowplow"
        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = (
            f"{os.getcwd().replace('notebooks', 'modelhub')}/.secrets/objectiv-production--bigquery-read-only.json"
        )
        
    df = modelhub.get_objectiv_dataframe(start_date='2022-02-02')
    df['feature_nice_name'] = df.location_stack.ls.nice_name
    df_steps = df.groupby('session_id')['feature_nice_name'].to_json_array().reset_index()
    df_steps = df_steps.rename(columns={'feature_nice_name': 'steps'})
    df_steps = df.merge(df_steps, on='session_id')[['session_id', 'steps']].drop_duplicates()
    return df_steps

In [22]:
pg_steps_df = get_steps_df('postgres')
bq_steps_df = get_steps_df('bigquery')

In [24]:
pg_func_gram = bach_func_gram(pg_steps_df['steps'], n=3)
bq_func_gram = bach_func_gram(bq_steps_df['steps'], n=3)

In [25]:
pg_func_gram.to_pandas()

event_id
6307e080-d6a4-4e7d-b47f-a029b90cf6e1                                                   {}
03bfb487-a6ae-4175-bf3a-db127a75a85f    {{"\"Root Location: home\"","\"Media Player: 2...
21629b85-c17a-458b-a5ab-ff4c074a963c    {{"\"Root Location: home\"","\"Media Player: 2...
0f3b8b89-e4ff-4256-9beb-c3f252216699                                                   {}
02ad3934-665c-47e9-8d76-c2a7a79f2c60    {{"\"Pressable: hamburger located at Root Loca...
                                                              ...                        
0f0fc271-a3ca-4952-bed7-90ca7656dbb1    {{"\"Pressable: after located at Root Location...
1ba7c155-0521-431d-8f5d-564aa3e8daa6    {{"\"Root Location: home\"","\"Link: docs loca...
0d74fe99-d796-423b-bccb-f7c64a155aa6    {{"\"Expandable: Reference located at Root Loc...
044ec938-81c0-496a-9a11-f35412f81614    {{"\"Root Location: home\"","\"Link: docs loca...
110f27ac-183f-44d6-8800-ad8655825699    {{"\"Expandable: Reference located at Root Loc...
N

In [27]:
print(bq_func_gram.view_sql())

with `from_table___0d78d70b3493ee1854302f20f2c99cbb` as (SELECT `contexts_io_objectiv_taxonomy_1_0_0`,`collector_tstamp` FROM `events`),
`bq_extra_processing___c73901c9a1ceb3db8227a83b8418c372` as (select `collector_tstamp` as `collector_tstamp`, `contexts_io_objectiv_taxonomy_1_0_0`[OFFSET(0)].`_type` as `event_type`, cast(`contexts_io_objectiv_taxonomy_1_0_0`[OFFSET(0)].`_types` as STRING) as `stack_event_types`, cast(`contexts_io_objectiv_taxonomy_1_0_0`[OFFSET(0)].`global_contexts` as STRING) as `global_contexts`, cast(`contexts_io_objectiv_taxonomy_1_0_0`[OFFSET(0)].`location_stack` as STRING) as `location_stack`, cast(cast(`contexts_io_objectiv_taxonomy_1_0_0`[OFFSET(0)].`time` as STRING) as INT64) as `time`, cast(`contexts_io_objectiv_taxonomy_1_0_0`[OFFSET(0)].`cookie_id` as STRING) as `user_id`, cast(`contexts_io_objectiv_taxonomy_1_0_0`[OFFSET(0)].`event_id` as STRING) as `event_id` 
from `from_table___0d78d70b3493ee1854302f20f2c99cbb` 
 
 
 
 
 
),
`drop_duplicates_event_id_