Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions analyzers/hashtags/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
AnalyzerInput,
AnalyzerInterface,
AnalyzerOutput,
AnalyzerParam,
InputColumn,
OutputColumn,
)
from analyzer_interface.params import TimeBinningParam

COL_AUTHOR_ID = "user_id"
COL_TIME = "time"
COL_POST = "text"

PARAM_TIME_WINDOW = "time_window"

OUTPUT_GINI = "hashtag_analysis"
OUTPUT_COL_USERS = "users"
OUTPUT_COL_TIMESPAN = "timewindow_start"
Expand Down Expand Up @@ -76,6 +80,29 @@
),
]
),
params=[
AnalyzerParam(
id=PARAM_TIME_WINDOW,
human_readable_name="Time window of analysis",
description="""
The duration over which to compute the gini coefficient. After
selecting a time window (e.g. 12 hours), the dataset will be
chunked into equal-duration (e.g. 12-hour long) time windows. The gini
coefficient is computed over hashtags found in each time window chunk.

This value determines the time granularity of the analysis. The optimal
value depends on the time span of the dataset. Generally, you want to
choose time window long enough that there is a meaningful number of hashtags
in that time period (if there are none, gini will be 0 for those time windows).

For example, if the full dataset spans multiple years, it makes sense
to choose a time window that spans multiple days if not months. Conversely,
if a dataset spans a few weeks, it makes sense to choose a time window of
a few hours or one day etc.
""",
type=TimeBinningParam(),
)
],
outputs=[
AnalyzerOutput(
id=OUTPUT_GINI,
Expand Down
5 changes: 4 additions & 1 deletion analyzers/hashtags/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
OUTPUT_COL_TIMESPAN,
OUTPUT_COL_USERS,
OUTPUT_GINI,
PARAM_TIME_WINDOW,
)


Expand Down Expand Up @@ -88,10 +89,12 @@ def main(context: PrimaryAnalyzerContext):
input_reader = context.input()
df_input = input_reader.preprocess(pl.read_parquet(input_reader.parquet_path))

time_window_param = context.params.get(PARAM_TIME_WINDOW)

# window hard-coded to 1hr for now
df_out = hashtag_analysis(
data_frame=df_input,
every="12h",
every=time_window_param.to_polars_truncate_spec(), # returns '12h', '5d' etc.
)

df_out.write_parquet(context.output(OUTPUT_GINI).parquet_path)
2 changes: 2 additions & 0 deletions analyzers/hashtags/test_hashtags_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import polars as pl

from analyzer_interface.params import TimeBinningValue
from preprocessing.series_semantic import datetime_string, identifier, text_catch_all
from testing import CsvTestData, JsonTestData, test_primary_analyzer

Expand Down Expand Up @@ -88,6 +89,7 @@ def test_hashtag_analyzer():
COL_POST: text_catch_all,
},
),
params={"time_window": TimeBinningValue(unit="hour", amount=12)},
outputs={
OUTPUT_GINI: JsonTestData(
os.path.join(test_data_dir, "hashtag_test_output.json")
Expand Down