In [14]:
%load_ext autoreload
%autoreload 2
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import lit, col
from functools import reduce 
from thundera.metadata import Table
from thundera.metrics import generate_table_metrics
from generate_data import generate


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [15]:
def get_spark() -> SparkSession:
    return (
        SparkSession.builder.master("local[1]")
        .appName("local-tests")
        .config("spark.executor.cores", "1")
        .config("spark.executor.instances", "1")
        .config("spark.sql.shuffle.partitions", "1")
        .config("spark.driver.bindAddress", "127.0.0.1")
        .getOrCreate()
    )


spark = get_spark()

df_list = []

for day in range(1, 31):
    df = generate(spark, 20_000, mu=90+day, sigma=10+day/10)
    df = df.withColumn("dt_ref", lit(f"2024-01-{day:02d}"))
    df_list.append(df)

data = reduce(DataFrame.unionAll, df_list)
data = data.repartition(30, col("dt_ref"))
data.show()



+------------------+-------------------+------------------+------+----------+
|             var_a|              var_b|             var_c| var_d|    dt_ref|
+------------------+-------------------+------------------+------+----------+
|105.65995957992877|             -101.0| 9.865223567681806| -99.0|2024-01-19|
|            -999.0|             -101.0|19.689316672509857|-999.0|2024-01-19|
|101.59287694152663|             -101.0|10.924491189872091|-999.0|2024-01-19|
|116.73816562713517|-18.822586189094736| 9.588745669864165|  NULL|2024-01-19|
|112.63329024205859|             -101.0| 33.32887164021661|  NULL|2024-01-19|
|110.85667900049951|             -101.0|              -8.0|-999.0|2024-01-19|
|110.76980700850802|             -101.0|              NULL|-999.0|2024-01-19|
|105.98427777843428|             -101.0|16.908664599126308| -99.0|2024-01-19|
|    99.20913588101|             -101.0|              -8.0|-999.0|2024-01-19|
|119.87582110765938|  85.60685696364126|              -8.0|-999.

                                                                                

In [23]:
table = Table.from_yaml("./metadata.yml")

metric_data = {}

for day in range(1, 31):
    partition = data.where(col("dt_ref") == lit(f"2024-01-{day:02d}"))
    metric_data[f"2024-01-{day:02d}"] = generate_table_metrics(partition, table)

                                                                                

In [28]:
import altair as alt
import pandas as pd 
field = "var_d"
domain = "values"

data_h = []
data_c = []
for dt, m_data in metric_data.items():
    for domain, hist_data in m_data[field]["histogram"].items():
        domain_h = pd.DataFrame([dict(zip(("dt_ref", "domain", "bin", "center", "count"), (dt, domain) + item)) for item in hist_data])
        data_h.append(domain_h)

    dt_c = m_data[field]["count"]
    dt_c = pd.DataFrame([dict(zip(("dt_ref", "domain", "count"), (dt,) + item)) for item in dt_c])
    data_c.append(dt_c)

data_h = pd.concat(data_h)
data_c = pd.concat(data_c)

In [29]:
selection = alt.selection_interval(encodings=["x"])

histogram = alt.Chart(data_h).mark_line().encode(
    x="center:Q", 
    y="count:Q", 
    color=alt.condition(selection, "domain:N", alt.value('lightgray')),
    opacity=alt.condition(selection, alt.value(0.7), alt.value(0.2)),
    detail="dt_ref:T"
)
counts = alt.Chart(data_c).mark_bar().encode(
    x = "dt_ref:T", 
    y="count", 
    color="domain",
    opacity=alt.condition(selection, alt.value(1.0), alt.value(0.2)),
).add_params(selection)

(counts | histogram)

In [19]:
import altair as alt
import pandas as pd
# Sample data (bin, center, count) as a list of dictionaries
data = [
    {'bin': 1, 'center': 66.06343324348504, 'count': 1.0},
    {'bin': 2, 'center': 68.45591960684908, 'count': 2.0},
    {'bin': 3, 'center': 71.33356492685698, 'count': 23.0},
    {'bin': 4, 'center': 73.82702645212825, 'count': 34.0},
    {'bin': 5, 'center': 77.06390408174018, 'count': 74.0}
]

data = pd.DataFrame(data)

# Create a basic bar chart
histogram = alt.Chart(data).mark_bar().encode(
    x=alt.X('center:Q', title='Bin Center'),
    y=alt.Y('count:Q', title='Count')
).properties(
    title='Histogram of Binned Data'
)

histogram.show()
