Skip to content

Commit

Permalink
126 twitter data (#620)
Browse files Browse the repository at this point in the history
* Added a script file to process json archive files into more unified parquet files focused on tweet reply rows for further processing.

* Added README file for Twitter data collection.

* Re did code for processing json into standardized parquet files.

* Added file to process parquet files into a conversation tree jsonl file.

* Added requirements and ran pre-commit.
  • Loading branch information
Jmete committed Jan 21, 2023
1 parent 5f3f077 commit d15d835
Show file tree
Hide file tree
Showing 4 changed files with 457 additions and 0 deletions.
80 changes: 80 additions & 0 deletions scripts/data-collection/twitter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Twitter data collection for Open Assistant

Conversations on Twitter can be an interesting and useful source of data for our
model to learn from. Certain twitter threads may contain helpful prompts and
replies, in a similar fashion to how we want our model to be able to respond to
prompts in a useful way.

Thus, these scripts are intended to process twitter data from a variety of
sources, process them into cleaner and more useful formats, and then combine the
various outputs into a unified training set that can be fed to our model as a
conversation, or at least as a prompt with replies.

**Note: Based on issue #126**

## Possible Data Paths

- Twitterstream archive: https://archive.org/details/twitterstream These are
large .tar files with compressed json files inside. However, some data points
such as reply counts seem to always be 0 due to limitations when scraping the
Twitter API.
- Alternative APIs such as snscrape, twint, etc. These alternative APIs often
are harder to use than the official Twitter API but can often bypass API
limits which can make it useful for larger scale data collection. The downside
is potentially slower speed, and less features.
- The official Twitter API

## Currently Completed Items

- Downloaded various archive files (both are .tar, but each have a different
format of json compression. One used .gz, and the other.bz2). Each json file
is roughly 2000 rows of tweets. There are thousands of these compressed json
files. Managing the IO of opening lots of small files is one of the
challenges, which is why future steps will consolidate data into larger easier
to process files.
- Wrote script that can loop through the compressed json files, cleans them up a
bit by removing truncated tweets or tweets that aren't replies. The script
then standardizes the columns, and exports the polars dataframes into parquet
files for future processing. Note: Using polars instead of pandas due to
performance reasons.
- Wrote scripts that process the large dump of tweets into conversation threads
using the tree and node architecture. This results in aroun 17K conversation
threads bassed on a dump of 90M tweets.
- Script can output the conversation threads into a jsonl file for further
filtering or use in models.

## Main Issue

- The issue is that we can easily scrape replies, but there is no guarantee the
original tweet is in the archive file. Furthermore, the archives are large so
they would need to be kept completely in-memory or in a db to reference. We
still need to decide if we want to try to mine the archive to piece together
the conversations, or we can take the list of replied tweets and loop through
those and use alternative apis to fetch the original tweet text, and then
match it with the confirmed replies already in our archive to generate the
prompt/replies data. Currently, my script can extract conversations based on
the dump, but it is a small percentage of the overall dump, and there is no
guarantee of the quality of the tweets.
- The tweet quality is the other major issue. We can get conversations through
the currently made scripts, but they most likely don't match a useful
instruction -> fulfilment. We are trying to filter the tweets through various
means such as matching useful hashtags, or by using cosine similarity against
known instructions.
- The modern Twitter API has conversation_id as a field which can be a way to
gather all tweets in a thread sort of automatically although there is
pagination limits. The main issue with this is it seems hard to search for it
using alternative APIs.

## TODO

- Write scripts to filter existing conversations into useful instructions ->
fulfilment with hashtags or cosine similarity.
- Train model to detect if text is a suitable instruction. This could then be
run through the conversations (or full tweet dump) to simplify the process.
Related to issue #143.
- Write script that matches the original tweets and their text with the archive
data to create the prompt/reply dataset. (Optional)
- Decide on final output format and storage options for the dataset. Currently
in JSONL with tree / node architecture as python dicts which is acceptable I
believe.
- Alternatively: Store processed tweets into DB or alternative option.(Optional)
3 changes: 3 additions & 0 deletions scripts/data-collection/twitter/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
numpy==1.21.5
polars==0.15.14
tqdm==4.64.0
141 changes: 141 additions & 0 deletions scripts/data-collection/twitter/twitter_create_convs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import json
from pathlib import Path

import polars as pl
from tqdm import tqdm

# Sets up paths
# TODO: Source paths from env file
path_string = "PUT THE PATH HERE TO WHERE YOU STORED THE PARQUET FILES"
folder_path = Path(path_string)
processed_folder_path = folder_path / "processed"
output_path = folder_path / "twitter-conv-trees.jsonl"

# Get parq files
parq_files = sorted(processed_folder_path.rglob("*.parquet"))

wanted_cols = [
"timestamp_ms",
"id",
"text",
"truncated",
"in_reply_to_status_id",
"in_reply_to_user_id",
"is_quote_status",
"quote_count",
"reply_count",
"retweet_count",
"favorite_count",
"filter_level",
"lang",
"possibly_sensitive",
"hashtags",
"user_id",
"user_verified",
"user_followers_count",
"user_statuses_count",
]

# Load parqs into list. Using Polars for performance reasons.
df_list = []
for p in parq_files:
df_list.append(pl.read_parquet(p, columns=wanted_cols))

# Create major dataframe.
# This can be done incrementally if RAM is constrained by modifying the above code.
p_df = pl.concat(df_list)

# Clean up the reference just in case to help with memory if needed.
del df_list

# Get tweets that are replies to other tweets
p_df_replies_only = p_df.filter(pl.col("in_reply_to_status_id").is_null().is_not())

# Group by replied to status id to see the most replied to statuses. This can take some time.
p_df_group_reply_to_status = p_df_replies_only.groupby("in_reply_to_status_id").count().sort("count", reverse=True)

# Save output of grouping the top replied to statuses
group_reply_parq = folder_path / "group_reply_parq.parquet"
p_df_group_reply_to_status.write_parquet(group_reply_parq)

# Join the main dataframe with the top replies to find tweets that have replies.
p_join = p_df.join(p_df_group_reply_to_status, left_on="id", right_on="in_reply_to_status_id", how="inner")

# Save output of tweets that have replies
tweets_that_have_replies_path = folder_path / "tweets_that_have_replies.parquet"
p_join.write_parquet(tweets_that_have_replies_path)

# Save output of tweets that are replies to other tweets
tweets_that_are_replies_path = folder_path / "tweets_that_are_replies.parquet"
p_df_replies_only.write_parquet(tweets_that_are_replies_path)

# Filter the tweets that have replies to ones that aren't replies to others.
# Also filter for only english for now.
# This gives the root tweets that have replies but are the start of a conversation.
origin_tweets = p_join.filter((pl.col("in_reply_to_status_id").is_null()) & (pl.col("lang") == "en"))


# Helper functions and classes below for the next steps


def role_decide(user_id, prompt_user):
if user_id == prompt_user:
return "prompter"
else:
return "assistant"


class ConversationTreeNode:
def __init__(self, tweet_id, prompt_user, from_df, children_df, metadata=None):

if metadata:
self.metadata = metadata
else:
self.metadata = from_df.filter(pl.col("id") == tweet_id).to_dicts()[0]

self.metadata["prompt_user"] = prompt_user
self.role = role_decide(self.metadata["user_id"], prompt_user)
self.children = None
self.text = self.metadata["text"]
del self.metadata["text"]
self.get_children(tweet_id=tweet_id, children_df=children_df)

def get_children(self, tweet_id, children_df):
children_dicts = children_df.filter(pl.col("in_reply_to_status_id") == tweet_id).to_dicts()
if len(children_dicts) > 0:
children = [
ConversationTreeNode(
tweet_id=c["id"],
prompt_user=self.metadata["prompt_user"],
from_df=children_df,
children_df=children_df,
metadata=c,
)
for c in children_dicts
]
self.children = children


class ConversationTree:
def __init__(self, tweet_id, prompt_user, from_df, children_df, r_metadata=None):

self.root = ConversationTreeNode(
tweet_id=tweet_id, prompt_user=prompt_user, from_df=from_df, children_df=children_df, metadata=r_metadata
)
self.metadata = None


# Create conversation trees
conv_tree_list = [
ConversationTree(
tweet_id=r["id"], prompt_user=r["user_id"], from_df=origin_tweets, children_df=p_df_replies_only, r_metadata=r
)
for r in tqdm(origin_tweets.to_dicts())
]

# Write conversation trees to jsonl file.
# Might need to clean up the last newline.
with open(output_path, "w") as output:
for t in tqdm(conv_tree_list):
json.dump(obj=t, fp=output, default=lambda x: x.__dict__)
output.write("\n")

0 comments on commit d15d835

Please sign in to comment.