Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions analyzers/hashtags/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@
COL_TIME = "time"
COL_HASHTAGS = "hashtags"

OUTPUT_COL_USERS = "users"
OUTPUT_GINI = "gini_coef"
OUTPUT_COL_TIMESPAN = "time_span"
OUTPUT_COL_TIMESPAN = "timewindow_start"
OUTPUT_COL_GINI = "gini"
OUTPUT_COL_COUNT = "count"
OUTPUT_COL_HASHTAGS = COL_HASHTAGS

interface = AnalyzerInterface(
id="hashtags",
version="0.1.0",
name="hashtags",
name="Hashtags analysis",
short_description="Computes the gini coefficient over hashtag usage",
long_description="""
Analysis of hashtags measures the extent of online coordination among social media users
Expand Down Expand Up @@ -81,6 +82,7 @@
name="Gini coefficient over time",
columns=[
OutputColumn(name=OUTPUT_COL_TIMESPAN, data_type="datetime"),
OutputColumn(name=OUTPUT_COL_USERS, data_type="text"),
OutputColumn(name=OUTPUT_COL_GINI, data_type="float"),
OutputColumn(name=OUTPUT_COL_COUNT, data_type="integer"),
OutputColumn(name=OUTPUT_COL_HASHTAGS, data_type="text"),
Expand Down
90 changes: 46 additions & 44 deletions analyzers/hashtags/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from collections import Counter
from itertools import accumulate

import polars as pl
Expand All @@ -12,6 +11,7 @@
OUTPUT_COL_COUNT,
OUTPUT_COL_GINI,
OUTPUT_COL_HASHTAGS,
OUTPUT_COL_USERS,
OUTPUT_GINI,
)

Expand All @@ -21,72 +21,74 @@
NULL_CHAR = "[]" # this is taken as the null character for hashtags


def gini(x):
def gini(x: pl.Series) -> float:
"""
Parameters
----------
x : list[str]
List of values for which to compute the Gini coefficient
x : pl.Series
polars Series containing values for which to compute the Gini coefficient

Returns
-------
float
Gini coefficient
Gini coefficient (between 0.0 and 1.0)
"""
x_counts = Counter(x).values()
sorted_x = x.value_counts().sort(by="count", descending=False)[:, 1].to_list()

sorted_x = sorted(x_counts)
n = len(sorted_x)
cumx = list(accumulate(sorted_x))

return (n + 1 - 2 * sum(cumx) / cumx[-1]) / n


def main(context: PrimaryAnalyzerContext):

input_reader = context.input()
df_input = input_reader.preprocess(pl.read_parquet(input_reader.parquet_path))

# assign None to messages with no hashtags
df_input = df_input.with_columns(
pl.when(pl.col(COL_HASHTAGS) == NULL_CHAR)
.then(None)
.otherwise(
pl.col(COL_HASHTAGS)
.str.strip_chars("[]")
.str.replace_all("'", "")
.str.replace_all(" ", "")
.str.split(",")
) # split hashtags into List[str]
.name.keep()
def hashtag_analysis(data_frame: pl.DataFrame, every="1h") -> pl.DataFrame:
# define the expressions
has_hashtag_symbol = pl.col(COL_HASHTAGS).str.contains("#").any()
extract_hashtags = pl.col(COL_HASHTAGS).str.extract_all(r"(#\S+)")
extract_hashtags_by_split = (
pl.col(COL_HASHTAGS)
.str.strip_chars("[]")
.str.replace_all("'", "")
.str.replace_all(" ", "")
.str.split(",")
)

# select columns
df_input = df_input.select(pl.col(COLS_ALL))

df_agg = (
df_input.filter(pl.col(COL_HASHTAGS).is_not_null())
.select(
pl.col(COL_TIME),
pl.col(COL_HASHTAGS),
# if hashtag symbol is detected, extract with regex
if data_frame.select(has_hashtag_symbol).item():
df_input = data_frame.with_columns(extract_hashtags).filter(
pl.col(COL_HASHTAGS) != []
)
.sort(COL_TIME)
.group_by_dynamic(COL_TIME, every="1h") # this could be a parameter
else: # otherwise, we assume str: "['hashtag1', 'hashtag2', ...]"
df_input = data_frame.filter(pl.col(COL_HASHTAGS) != "[]").with_columns(
extract_hashtags_by_split
)

# select columns and sort
df_input = df_input.select(pl.col(COLS_ALL)).sort(pl.col(COL_TIME))

df_out = (
df_input.explode(pl.col(COL_HASHTAGS))
.with_columns(window_start=pl.col(COL_TIME).dt.truncate(every))
.group_by("window_start")
.agg(
pl.col(COL_HASHTAGS).explode().alias(OUTPUT_COL_HASHTAGS),
pl.col(COL_HASHTAGS).explode().count().alias(OUTPUT_COL_COUNT),
pl.col(COL_AUTHOR_ID).alias(OUTPUT_COL_USERS),
pl.col(COL_HASHTAGS).alias(OUTPUT_COL_HASHTAGS),
pl.col(COL_HASHTAGS).count().alias(OUTPUT_COL_COUNT),
pl.col(COL_HASHTAGS)
.explode()
.map_elements(
lambda x: gini(x.to_list()),
return_dtype=pl.Float32,
returns_scalar=True,
)
.map_batches(gini, returns_scalar=True)
.alias(OUTPUT_COL_GINI),
)
)

print("Output preview:")
print(df_agg.head())
return df_out


def main(context: PrimaryAnalyzerContext):
input_reader = context.input()
df_input = input_reader.preprocess(pl.read_parquet(input_reader.parquet_path))

df_agg = hashtag_analysis(
data_frame=df_input,
)

df_agg.write_parquet(context.output(OUTPUT_GINI).parquet_path)
3 changes: 3 additions & 0 deletions analyzers/hashtags/test_data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import os

test_data_dir = os.path.dirname(os.path.abspath(__file__))
Loading
Loading