Skip to content
38 changes: 19 additions & 19 deletions analyzers/hashtags_web/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import plotly.graph_objects as go
import polars as pl
from dateutil import parser as dateutil_parser
from shiny import reactive, render, ui
from shinywidgets import output_widget, render_widget

Expand All @@ -26,12 +27,17 @@
df_global = None
context_global = None
df_raw = None
TZ_UTC = "UTC" # normalizing to UTC, before stripping


def set_df_global_state(df_input, df_output):
global df_global, df_raw
df_global = df_output
df_raw = df_input # Will be loaded from context when needed
df_raw = df_input.with_columns(
pl.col(COL_TIME)
.dt.convert_time_zone(TZ_UTC) # normalize to UTC
.dt.replace_time_zone(None) # strip to tz naive format
) # Will be loaded from context when needed


@lru_cache(maxsize=32)
Expand Down Expand Up @@ -192,34 +198,28 @@ def populate_date_choices():
session=session,
)

@lru_cache(maxsize=100)
def get_selected_datetime_cached(selected_formatted):
"""Convert selected formatted date back to datetime with caching"""
def get_midpoint_datetime():
"""Default - middle of dataset instead of first point"""
df = get_df()
# Find the datetime that matches the formatted string
for dt in df["timewindow_start"].to_list():
if dt.strftime("%Y-%m-%d %H:%M") == selected_formatted:
return dt
return df["timewindow_start"].to_list()[0] # fallback
middle_index = len(df) // 2
return df["timewindow_start"][middle_index]

# this will store line plot values when clicked
clicked_data = reactive.value(None)

def get_selected_datetime():
"""Get date value from when a line plot is clicked on"""
"""Get datetime from plot click, converting string to datetime"""
click_data = clicked_data.get()
if click_data and hasattr(click_data, "xs") and len(click_data.xs) > 0:
# Convert the clicked datetime to the format expected by get_selected_datetime_cached
clicked_datetime = click_data.xs[0]
if hasattr(clicked_datetime, "strftime"):
formatted_datetime = clicked_datetime.strftime("%Y-%m-%d %H:%M")
return get_selected_datetime_cached(formatted_datetime)
clicked_value = click_data.xs[0]

# If it's a string, parse it to datetime
if isinstance(clicked_value, str):
return dateutil_parser.parse(clicked_value)
else:
# If it's already a string, use it directly
return get_selected_datetime_cached(clicked_datetime)
return clicked_value # Already datetime
else:
# Return the first datetime as default
return get_df()["timewindow_start"][0]
return get_midpoint_datetime()

@reactive.calc
def secondary_analysis():
Expand Down
59 changes: 46 additions & 13 deletions analyzers/hashtags_web/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
ShinyContext,
WebPresenterContext,
)
from app.project_context import _get_columns_with_semantic
from app.shiny import page_dependencies

from ..hashtags.interface import COL_AUTHOR_ID, COL_POST, COL_TIME, OUTPUT_GINI
from ..hashtags.interface import COL_TIME, OUTPUT_GINI
from .app import (
analysis_panel,
hashtag_plot_panel,
Expand All @@ -20,29 +21,61 @@
)


def load_and_transform_input_data(
web_context: WebPresenterContext, column_names: list[str]
) -> pl.DataFrame:
"""Load input dataset and apply analysis preprocessing to specified columns"""
project_id = web_context.base.analysis.project_id
df_raw = web_context.store.load_project_input(project_id)

# Get semantic info for all columns
columns_with_semantic = _get_columns_with_semantic(df_raw)
semantic_dict = {col.name: col for col in columns_with_semantic}

# Apply transformations to selected columns
transformed_columns = {}
for col_name in column_names:
if col_name in semantic_dict:
# Apply semantic transformation
transformed_columns[col_name] = semantic_dict[
col_name
].apply_semantic_transform()
else:
# Keep original if no semantic transformation
transformed_columns[col_name] = df_raw[col_name]

return df_raw.with_columns(
[transformed_columns[col_name].alias(col_name) for col_name in column_names]
)


def factory(
web_context: WebPresenterContext,
) -> FactoryOutputContext:
# Load the primary output associated with this project
df_hashtags = pl.read_parquet(web_context.base.table(OUTPUT_GINI).parquet_path)

# load the raw input data used for this project
project_id = web_context.base.analysis.project_id
df_raw = web_context.store.load_project_input(project_id)

# flip mapping to point from raw data to analyzer input schema
column_mapping_inv = {
# Get user-selected column names
orig2inputschema = {
v: k for k, v in web_context.base.analysis.column_mapping.items()
}
df_raw = df_raw.rename(mapping=column_mapping_inv)

if not isinstance(df_raw.schema[COL_TIME], pl.Datetime):
df_raw = df_raw.with_columns(pl.col(COL_TIME).str.to_datetime().alias(COL_TIME))
orig_schema_names = [v for v in orig2inputschema.keys()]

df_raw = df_raw.select(pl.col([COL_AUTHOR_ID, COL_TIME, COL_POST])).sort(
pl.col(COL_TIME)
# Load and apply semantic transformations to
# input data frame
df_raw = load_and_transform_input_data(
web_context=web_context, column_names=orig_schema_names
)

# Rename columns to follow input schema names and select
column_mapping = web_context.base.analysis.column_mapping
df_raw = df_raw.select(
[
pl.col(orig_col).alias(schema_col)
for schema_col, orig_col in column_mapping.items()
]
).sort(pl.col(COL_TIME))

set_df_global_state(
df_input=df_raw,
df_output=df_hashtags,
Expand Down
85 changes: 84 additions & 1 deletion preprocessing/series_semantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,92 @@ def check_type(self, series: pl.Series):
return self.column_type(series.dtype)


def parse_datetime_with_tz(s: pl.Series) -> pl.Series:
"""Parse datetime strings with timezone info (both abbreviations and offsets)"""
import warnings

# Handle timezone abbreviations like "UTC", "EST"
tz_abbrev_regex = r" ([A-Z]{3,4})$" # UTC, EST, etc.

# Handle timezone offsets like "-05:00", "+00:00"
tz_offset_regex = r"[+-]\d{2}:\d{2}$" # -05:00, +00:00, etc.

# Check for multiple different timezones
abbrev_matches = s.str.extract_all(tz_abbrev_regex)
offset_matches = s.str.extract_all(tz_offset_regex)

# Get unique timezone abbreviations
unique_abbrevs = set()
if not abbrev_matches.is_empty():
for match_list in abbrev_matches.to_list():
if match_list: # Not empty
unique_abbrevs.update(match_list)

# Get unique timezone offsets
unique_offsets = set()
if not offset_matches.is_empty():
for match_list in offset_matches.to_list():
if match_list: # Not empty
unique_offsets.update(match_list)

# Warn if multiple different timezones found
total_unique_tz = len(unique_abbrevs) + len(unique_offsets)
if total_unique_tz > 1:
all_tz = list(unique_abbrevs) + list(unique_offsets)
warnings.warn(
f"Multiple timezones found in datetime column: {all_tz}. "
f"Assuming all timestamps represent the same timezone for analysis purposes.",
UserWarning,
)

# Try to remove timezone abbreviations first
result = s.str.replace(tz_abbrev_regex, "")

# Then remove timezone offsets
result = result.str.replace(tz_offset_regex, "")

return result.str.strptime(pl.Datetime(), strict=False)


native_date = SeriesSemantic(
semantic_name="native_date",
column_type=pl.Date, # Matches native Date columns
try_convert=lambda s: s, # No conversion needed
validate_result=lambda s: constant_series(s, True), # Always valid
data_type="datetime",
)

native_datetime = SeriesSemantic(
semantic_name="native_datetime",
column_type=pl.Datetime, # Matches native DateTime columns
try_convert=lambda s: s, # No conversion needed
validate_result=lambda s: constant_series(s, True), # Always valid
data_type="datetime",
)

datetime_string = SeriesSemantic(
semantic_name="datetime",
column_type=pl.String,
try_convert=lambda s: s.str.strptime(pl.Datetime, strict=False),
try_convert=parse_datetime_with_tz,
validate_result=lambda s: s.is_not_null(),
data_type="datetime",
)

date_string = SeriesSemantic(
semantic_name="date",
column_type=pl.String,
try_convert=lambda s: s.str.strptime(pl.Date, strict=False), # Convert to pl.Date
validate_result=lambda s: s.is_not_null(),
data_type="datetime", # Still maps to datetime for analyzer compatibility
)

time_string = SeriesSemantic(
semantic_name="time",
column_type=pl.String,
try_convert=lambda s: s.str.strptime(pl.Time, strict=False), # Convert to pl.Time
validate_result=lambda s: s.is_not_null(),
data_type="time",
)

timestamp_seconds = SeriesSemantic(
semantic_name="timestamp_seconds",
Expand Down Expand Up @@ -107,7 +186,11 @@ def check_type(self, series: pl.Series):
)

all_semantics = [
native_datetime,
native_date,
datetime_string,
date_string,
time_string,
timestamp_seconds,
timestamp_milliseconds,
url,
Expand Down
Loading