# Chronon Join Tester


This notebook allows you to create and compute a Join for a small set of data using sampled keys.


The output table will be written to the tmp namespace, so it won't interfere with production. You can use this to create a new join, test evolving an existing join and comparing with production.


How it works?
Cell#2 Contains scala functions to build a join that's similar to the original join, but with added filters on keys. The filters are deterministic (hash(key) % number = 0), the higher the modulo the bigger the filter. MetaData is highjacked to set outputNamespace to "tmp".


This notebook is cells are such as:


1. 
   Python setup (feel free to modify the conf name)
   
   
   
2. 
   Scala library (do not change)
   
   
   
3. 
   Python join config (modify as will)
   
   
   
4. 
   Runner (run the config for 3 or any other config in your branch)
   
   
   


<br>


In [1]:
"""
Define some global variables for the sampling run.
"""
from ai.chronon.repo.validator import ChrononRepoValidator
from ai.chronon.repo.compile import _write_obj
from mock import patch

import getpass
import sys
import os

# Global vars.
user = getpass.getuser()
home = f"/home/{user}/notebooks_home"
chronon_root = f"{home}/repo/chronon"
output_folder = "production"
output_root = os.path.join(chronon_root, output_folder)

# Config specific vars
team = "chronon_test"
conf_name = f"{user}_polynote_join_test.v0"
metadata_name = ".".join([team, conf_name])
conf = os.path.join(output_root, "joins", team, conf_name)

# Add chronon modules to the path.
sys.path.append(chronon_root)

# Utils for notebook.
class Stub:
    def __init__(self, team):
        self.filename = f"{team}/stub"

def stub(filename):
    return [None, Stub(filename)]

def write_tmp_config(obj):
    obj.metaData.name = metadata_name
    obj.metaData.team = team
    return _write_obj(
        output_root, 
        validator=ChrononRepoValidator(
            chronon_root_path=chronon_root, 
            output_root=output_folder), 
        name=metadata_name, 
        obj=obj, 
        log_level=None,
        force_compile=False,
        force_overwrite=True
    ) 

In [2]:
/**
 * Receive python set up and run a sampled join between start_partition and start_partition + days
 */
import ai.chronon.spark.Driver.parseConf
import ai.chronon.api.{Join, Source, QueryUtils, Constants, Builders, JoinPart, GroupBy, MetaData}
import ai.chronon.api.Extensions._
import ai.chronon.spark.Extensions._
import ai.chronon.spark.TableUtils
import ai.chronon.spark.{Join => JoinRunner}

import org.apache.spark.sql.DataFrame
import scala.collection.JavaConverters._

val tableUtils = TableUtils(spark)

// Sample a source keys based on a modulo value
def sampledSource(source: Source, keyFilter: Map[String, Int]): Source = {
    val ogQuery = source.query
    val newQuery = Builders.Query(
        selects = ogQuery.selects.asScala.toMap,
        startPartition = ogQuery.startPartition,
        endPartition = ogQuery.endPartition,
        setups = ogQuery.setups.asScala,
        wheres = Option(ogQuery.wheres).map(_.asScala).getOrElse(Seq.empty[String]) ++ ogQuery.selects.asScala.filter{
            case (key, value) => keyFilter.contains(key)
            }.map{ 
                case (key, value) => s"HASH($value) % ${keyFilter(key)} = 0" }.toSeq,
        timeColumn = ogQuery.timeColumn)
    if (source.isSetEntities) {
        Builders.Source.entities(query = newQuery, snapshotTable = source.table)
    } else {
        Builders.Source.events(query = newQuery, table = source.table)
    }
}

def joinKeys(join: Join): Seq[String] = join.joinParts.asScala.flatMap{ jp: JoinPart => jp.rightToLeft.keys }.distinct

def simpleFilter(join: Join, value: Int): Map[String, Int] = joinKeys(join).map(k => k -> value).toMap

def filteredGroupBy(groupBy: GroupBy, filterMap: Map[String, Int]): GroupBy = Builders.GroupBy(
    metaData = metadataToTmp(groupBy.metaData), 
    sources = groupBy.sources.asScala.map(sampledSource(_, filterMap)),
    keyColumns = groupBy.keyColumns.asScala,
    aggregations = groupBy.aggregations.asScala,
    accuracy = groupBy.accuracy)

def filteredJoin(join: Join, filterValue: Int): Join = {
    val filter = simpleFilter(join, filterValue)
    Builders.Join(
        metaData = metadataToTmp(join.metaData),
        left = sampledSource(join.left, filter), 
        joinParts = join.joinParts.asScala.map {
            jp => 
                Builders.JoinPart(
                    groupBy = filteredGroupBy(jp.groupBy, filter),
                    keyMapping = Option(jp.keyMapping).map(_.asScala.toMap).orNull,
                    selectors = jp.selectors.asScala,
                    prefix = jp.prefix)
            }
        )
}

def metadataToTmp(metaData: MetaData): MetaData = {
    metaData.setOutputNamespace("tmp")
    metaData
}

def startDate(join: Join): String = {
    Option(join.left.query.startPartition).getOrElse(tableUtils.firstAvailablePartition(join.left.table).get)
}

def computeSampledJoin(join: Join, modulo: Int = 256, days: Int = 10, steps: Int = 30): Unit = {

    // Sample the left side
    join.validate()
    val testJoin: Join = filteredJoin(join, modulo)
    val start = startDate(testJoin)
    val end = Constants.Partition.shift(start, days)
    val runner = new JoinRunner(testJoin, end, tableUtils)
    runner.computeJoin(Some(steps))
}

In [3]:
"""
Sample Online Join on a small dataset.
"""
from airbnb import test_sources
from ai.chronon.join import Join, JoinPart
from group_bys.chronon_test import test_online_group_by_small
from airbnb.data_sources_2 import HiveEventSource
from ai.chronon.query import Query, select

v1 = Join(
    left=HiveEventSource(
    namespace='global',
    table="<tablename>",
    query=Query(
        selects=select(
            listing="id_product",
            m_guests="m_guests",
            m_dated_<metric>="m_dated_<metric>"
        ),
        wheres=["<dimension> = 'VALUE'"],
        time_column="UNIX_TIMESTAMP(ts) * 1000",
        start_partition="2022-01-14",
        end_partition="2022-01-14",
    )),
    right_parts=[
        JoinPart(group_by=test_online_group_by_small.v1),
    ],
    online=True,
)
print(v1)
write_tmp_config(v1)

Join(metaData=MetaData(name=None, online=True, production=False, customJson='{"check_consistency": false, "lag": 0}', dependencies=['{"name": "wait_for_<tablename>_ds", "spec": "<tablename>/ds={{ ds }}", "start": "2022-01-14", "end": "2022-01-14"}', '{"name": "wait_for_<tablename>_ds", "spec": "<tablename>/ds={{ ds }}", "start": "2022-01-14", "end": null}'], tableProperties=None, outputNamespace=None, team=None, modeToEnvMap=None, consistencyCheck=None, samplePercent=None), left=Source(events=EventSource(table='<tablename>', topic=None, query=Query(selects={'listing': 'id_product', 'm_guests': 'm_guests', 'm_dated_<metric>': 'm_dated_<metric>', 'ts': 'UNIX_TIMESTAMP(ts) * 1000'}, wheres=["<dimension> = 'VALUE'"], startPartition='2022-01-14', endPartition='2022-01-14', timeColumn='UNIX_TIMESTAMP(ts) * 1000', setups=[], mutationTimeColumn=None, reversalColumn=None), isCumulative=False), entities=None), joinParts=[JoinPart(groupBy=GroupBy(metaData=MetaData(name='chronon_test.test_online_g

true

In [4]:
// Example: Run a sampled join for a specific conf.
val join: Join = parseConf[Join](conf)
computeSampledJoin(join, modulo = 256, days = 5, steps = 10)
val outputDf = spark.sql(s"SELECT * FROM ${join.metaData.outputTable}")
outputDf.show()
outputDf[""]

Comparing Hashes:
New: Map(tmp.chronon_test_USER_polynote_join_test_v0_chronon_test_test_online_group_by_small_v1 -> bogrhLBZnZ, left_source -> FSqyOf2XaM),
Old: Map(left_source -> 2Rrw98dJUM, tmp.chronon_test_USER_polynote_join_test_v0_chronon_test_test_online_group_by_small_v1 -> bogrhLBZnZ)
Dropping table with command: DROP TABLE IF EXISTS tmp.chronon_test_USER_polynote_join_test_v0_chronon_test_test_online_group_by_small_v1

----[Running query]----
DROP TABLE IF EXISTS tmp.chronon_test_USER_polynote_join_test_v0_chronon_test_test_online_group_by_small_v1
----[End of Query]----

Dropping table with command: DROP TABLE IF EXISTS tmp.chronon_test_USER_polynote_join_test_v0

----[Running query]----
DROP TABLE IF EXISTS tmp.chronon_test_USER_polynote_join_test_v0
----[End of Query]----

Join range to fill PartitionRange(2022-01-14,2022-01-14)

Earliest hole at 2022-01-14 in output table tmp.chronon_test_USER_polynote_join_test_v0, relative to <tablename>
Input Parts   : Array(2022-01-14