Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions analyzers/hashtags/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@

COL_AUTHOR_ID = "user_id"
COL_TIME = "time"
COL_HASHTAGS = "hashtags"
COL_POST = "text"

OUTPUT_GINI = "gini_coef"
OUTPUT_COL_TIMESPAN = "time_span"
OUTPUT_GINI = "hashtag_analysis"
OUTPUT_COL_USERS = "users"
OUTPUT_COL_TIMESPAN = "timewindow_start"
OUTPUT_COL_GINI = "gini"
OUTPUT_COL_COUNT = "count"
OUTPUT_COL_HASHTAGS = COL_HASHTAGS
OUTPUT_COL_HASHTAGS = "hashtags"

interface = AnalyzerInterface(
id="hashtags",
version="0.1.0",
name="hashtags",
name="Hashtag analysis",
short_description="Computes the gini coefficient over hashtag usage",
long_description="""
Analysis of hashtags measures the extent of online coordination among social media users
Expand Down Expand Up @@ -55,10 +56,10 @@
],
),
InputColumn(
name=COL_HASHTAGS,
name=COL_POST,
data_type="text",
description="The column containing the hashtags associated with the message",
name_hints=["hashtags", "tags", "topics", "keywords"],
description="The column containing the tweet and hashtags associated with the message",
name_hints=["text", "tweet", "post", "tweet_post", "message"],
),
InputColumn(
name=COL_TIME,
Expand All @@ -82,6 +83,7 @@
columns=[
OutputColumn(name=OUTPUT_COL_TIMESPAN, data_type="datetime"),
OutputColumn(name=OUTPUT_COL_GINI, data_type="float"),
OutputColumn(name=OUTPUT_COL_USERS, data_type="text"),
OutputColumn(name=OUTPUT_COL_COUNT, data_type="integer"),
OutputColumn(name=OUTPUT_COL_HASHTAGS, data_type="text"),
],
Expand Down
107 changes: 56 additions & 51 deletions analyzers/hashtags/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from collections import Counter
from itertools import accumulate

import polars as pl
Expand All @@ -7,86 +6,92 @@

from .interface import (
COL_AUTHOR_ID,
COL_HASHTAGS,
COL_POST,
COL_TIME,
OUTPUT_COL_COUNT,
OUTPUT_COL_GINI,
OUTPUT_COL_HASHTAGS,
OUTPUT_COL_TIMESPAN,
OUTPUT_COL_USERS,
OUTPUT_GINI,
)

# let's look at the hashtags column
COLS_ALL = [COL_AUTHOR_ID, COL_TIME, COL_HASHTAGS]

NULL_CHAR = "[]" # this is taken as the null character for hashtags


def gini(x):
def gini(x: pl.Series) -> float:
"""
Parameters
----------
x : list[str]
List of values for which to compute the Gini coefficient
x : pl.Series
polars Series containing values for which to compute the Gini coefficient

Returns
-------
float
Gini coefficient
Gini coefficient (between 0.0 and 1.0)
"""
x_counts = Counter(x).values()
sorted_x = x.value_counts().sort(by="count", descending=False)[:, 1].to_list()

sorted_x = sorted(x_counts)
n = len(sorted_x)
cumx = list(accumulate(sorted_x))

return (n + 1 - 2 * sum(cumx) / cumx[-1]) / n


def main(context: PrimaryAnalyzerContext):
def hashtag_analysis(data_frame: pl.DataFrame, every="1h") -> pl.DataFrame:
if not isinstance(data_frame.schema[COL_TIME], pl.Datetime):
data_frame = data_frame.with_columns(
pl.col(COL_TIME).str.to_datetime().alias(COL_TIME)
)

input_reader = context.input()
df_input = input_reader.preprocess(pl.read_parquet(input_reader.parquet_path))
# define the expressions
has_hashtag_symbols = pl.col(COL_POST).str.contains("#").any()
extract_hashtags = pl.col(COL_POST).str.extract_all(r"(#\S+)")

# assign None to messages with no hashtags
df_input = df_input.with_columns(
pl.when(pl.col(COL_HASHTAGS) == NULL_CHAR)
.then(None)
.otherwise(
pl.col(COL_HASHTAGS)
.str.strip_chars("[]")
.str.replace_all("'", "")
.str.replace_all(" ", "")
.str.split(",")
) # split hashtags into List[str]
.name.keep()
)
# if hashtag symbol is detected, extract with regex
if data_frame.select(has_hashtag_symbols).item():
df_input = data_frame.with_columns(extract_hashtags).filter(
pl.col(COL_POST) != []
)

# select columns
df_input = df_input.select(pl.col(COLS_ALL))
else: # otherwise, we assume str: "['hashtag1', 'hashtag2', ...]"
raise ValueError(f"The data in {COL_POST} column appear to have no hashtags.")

df_agg = (
df_input.filter(pl.col(COL_HASHTAGS).is_not_null())
.select(
pl.col(COL_TIME),
pl.col(COL_HASHTAGS),
)
.sort(COL_TIME)
.group_by_dynamic(COL_TIME, every="1h") # this could be a parameter
# select columns and sort
df_input = df_input.select(pl.col([COL_AUTHOR_ID, COL_TIME, COL_POST])).sort(
pl.col(COL_TIME)
)

# compute gini per timewindow
df_out = (
df_input.explode(pl.col(COL_POST))
.with_columns(pl.col(COL_TIME).dt.truncate(every).alias(OUTPUT_COL_TIMESPAN))
.group_by(OUTPUT_COL_TIMESPAN)
.agg(
pl.col(COL_HASHTAGS).explode().alias(OUTPUT_COL_HASHTAGS),
pl.col(COL_HASHTAGS).explode().count().alias(OUTPUT_COL_COUNT),
pl.col(COL_HASHTAGS)
.explode()
.map_elements(
lambda x: gini(x.to_list()),
return_dtype=pl.Float32,
returns_scalar=True,
)
pl.col(COL_AUTHOR_ID).alias(OUTPUT_COL_USERS),
pl.col(COL_POST).alias(OUTPUT_COL_HASHTAGS),
pl.col(COL_POST).count().alias(OUTPUT_COL_COUNT),
pl.col(COL_POST)
.map_batches(gini, returns_scalar=True, return_dtype=pl.Float64)
.alias(OUTPUT_COL_GINI),
)
)

print("Output preview:")
print(df_agg.head())
# convert datetime back to string
df_out = df_out.with_columns(
pl.col(OUTPUT_COL_TIMESPAN).dt.to_string("%Y-%m-%d %H:%M:%S")
)

return df_out


def main(context: PrimaryAnalyzerContext):
input_reader = context.input()
df_input = input_reader.preprocess(pl.read_parquet(input_reader.parquet_path))

# window hard-coded to 1hr for now
df_out = hashtag_analysis(
data_frame=df_input,
every="12h",
)

df_agg.write_parquet(context.output(OUTPUT_GINI).parquet_path)
df_out.write_parquet(context.output(OUTPUT_GINI).parquet_path)
3 changes: 3 additions & 0 deletions analyzers/hashtags/test_data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import os

test_data_dir = os.path.dirname(os.path.abspath(__file__))
Loading