# Data Preparation

In [1]:
%load_ext lab_black
%load_ext autoreload
%autoreload 2

In [2]:
import os
import sys
import zipfile
from datetime import datetime
from glob import glob
from typing import Dict, List, Union

import boto3
import pandas as pd

In [3]:
PROJ_ROOT = os.path.join(os.pardir)
src_dir = os.path.join(PROJ_ROOT, "src")
sys.path.append(src_dir)

In [4]:
%aimport file_utils
from file_utils import create_zip_file

%aimport pandas_utils
from pandas_utils import read_parquet

%aimport s3_utils
from s3_utils import (
    download_file_from_s3,
    extract_zip_file,
    get_s3_bucket_file_list,
    upload_file_to_s3,
)

%aimport tweet_combine_utils
from tweet_combine_utils import (
    get_hourly_folders_per_day,
    get_hourly_files_names,
    process_files_per_hour,
)

## About

This notebook will combine all the streamed tweets per hour into a single `parquet` file that can be used in data processing to prepare the data for use in machine learning model development. Such a file will be created for every hour of every day that tweets were streamed.

Before exporting to a `.parquet` file, the combined hourly streamed tweets, including all collected and extracted metadata will be stored in a `pandas` DataFrame. Since tweets will be combined at the hourly level, the combined hourly `DataFrame`s **for this choices of subject** are small enough to fit in to single-node memory and so memory-bound tools such as `pandas` can be used to hold this hourly data before it is exported to the `.parquet` format.

## User Inputs

In [5]:
# raw data on S3
path_to_folder = "/datasets/twitter/kinesis-demo/"
years_wanted = [2021] + [2022]

# processed data
processed_data_dir = "../data/processed"  # (S3) "datasets/twitter/kinesis-demo/processed" or (locally) "data/processed"
proc_zip_fname = "combined_data.zip"

# List of headers for all streamed twitter attributes
headers = [
    "id",
    "geo",
    "coordinates",
    "place",
    "contributors",
    "is_quote_status",
    "quote_count",
    "reply_count",
    "retweet_count",
    "favorite_count",
    "favorited",
    "retweeted",
    "created_at",
    "source",
    "in_reply_to_user_id",
    "in_reply_to_screen_name",
    "source_text",
    "place_id",
    "place_url",
    "place_place_type",
    "place_name",
    "place_full_name",
    "place_country_code",
    "place_country",
    "place_bounding_box_type",
    "place_bounding_box_coordinates",
    "place_attributes",
    "coords_type",
    "coords_lon",
    "coords_lat",
    "geo_type",
    "geo_lon",
    "geo_lat",
    "user_name",
    "user_screen_name",
    "user_followers",
    "user_friends",
    "user_listed",
    "user_favourites",
    "user_statuses",
    "user_protected",
    "user_verified",
    "user_contributors_enabled",
    "user_joined",
    "user_location",
    "retweeted_tweet",
    "tweet_text_urls",
    "tweet_text_hashtags",
    "tweet_text_usernames",
    "num_urls_in_tweet_text",
    "num_users_in_tweet_text",
    "num_hashtags_in_tweet_text",
    "text",
]

# # keep tweets with text containing terms
# case_sensitive_tweet_search_terms = ["NASA", "ESA", "CSA", "Kepler"]
# tweet_search_terms = [
#     "aeronautics",
#     # "webb",  # picks up random astronomers unrelated to the mission
#     "goddard",
#     "propulsion",
#     # "telescope",  # picks up topics not related to the mission
#     "exoplanet",
#     # 'launch',
#     # 'astronomy',  # picks up random astronomers unrelated to the mission
#     # 'astrophysics',  # picks up random astrophysicists unrelated to the mission
#     # "laboratory",  # picks up random astronomers unrelated to the mission
#     "jwst",
#     # "exploration",  # picks up topics not related to space exploration
#     # " mission",  # picks up topics not related to space exploration
#     "spacecraft",
# ]
# joined_tweet_search_terms = [
#     "james webb",
#     "space telescope",
#     "webb space",
#     "webb telescope",
#     "jet propulsionlab",
#     "canadian space agency",
#     "european space agency",
#     "national aeronautics",
#     "shuttle launch",
#     "space shuttle",
#     "goddard space flightcenter",
#     "johnson space center",
#     "ames research center",
#     "marshall space flightcenter",
#     "glenn research center",
#     "ball aerospace",
#     "harris corporation",
#     "space telescope science institute",
#     "billochs",
#     "johnmather",
#     "northrop grumman",
#     "lockheed martin",
#     "stephen hawking",
#     "dark matter",
#     "dark energy",
#     "hubble space",
#     "hubble telescope",
# ]
# # remove tweets with text containing terms
# crypto_terms = [
#     "crypto",
#     # "token",
#     "koistarter",
#     "daostarter",
#     "decentralized",
#     # "services",
#     # "pancakeswap",
#     # "eraxnft",
#     # "browsing",
#     # "kommunitas",
#     # "hosting",
#     # "internet",
#     # "exipofficial",
#     # "servers",
#     # "wallet",
#     # "liquidity",
#     # "rewards",
#     # "floki",
#     # "10000000000000linkstelegram",
#     "dogecoin",
#     "czbinance",
#     # "watch",
#     "binance",
#     "dogelonmars",
#     "cryptocurrency",
#     # "money",
#     # "danheld",
#     # "cybersecurity",
#     "ethereum",
#     "bitcrush",
#     "vvsorigin",
# ]
# video_games_terms = [
#     # "gamejoin",
#     "arcade",
#     "dreamcast",
#     "sega",
#     "xbox",
#     "wii",
#     "ps4",
# ]
# non_english_terms = [
#     "webuye",
#     "bungoma",
#     "ethereum",
#     "pay someone",
#     "seungkwan",
#     "woozi",
#     "hoshi",
#     "kasama",
#     "nung",
#     "lahat",
#     "jinsoul",
#     "brunisoul",
#     "loona",
#     "taas",
#     "nung",
# ]
# misc_unwanted_terms = [
#     "nft",
#     "volcano detected",
#     "block-2",
#     "tanzanite",
#     "gemstonecarat",
#     "popescu",
#     "breeding",
#     "nairobi",
#     "pay someone",
#     "homeworkpay",
#     "homework",
#     "photocards",
#     "essay",
#     # "hbomax",
# ]
# religious_terms = [
#     "scriptures",
#     "methusealah",
#     "testament",
#     "yahweh",
#     "god",
#     "mullah",
#     "allah",
#     "clergy",
#     "mercy",
#     "morality",
#     "muslims,",
#     "hindus",
#     "buddhist",
#     "catholics",
#     "christians",
#     "atheist",
# ]
# inappropriate_terms = [
#     "prostitution",
#     "musembe",
#     "mo-greene",
#     "running scared2012",
#     "running scared 2012",
#     "massacres",
#     "eric ephriam chavez",
#     "drugs",
#     "bin laden",
#     "saddam",
#     "perished",
#     "whore",
#     "nasty",
#     "nazist",
#     "antifa",
#     "proud boys",
# ]
# min_num_words_tweet = 10

upload_to_s3 = True
cleanup_local_files = True

# columns for EDA
vcols = [
    "is_quote_status",
    "quote_count",
    "reply_count",
    # "retweet_count",
    "favorite_count",
    "favorited",
    "retweeted",
    "source_text",
    "user_followers",
    "user_friends",
    "user_favourites",
    "user_verified",
    "retweeted_tweet",
]

In [6]:
s3_bucket_name = os.getenv("AWS_S3_BUCKET_NAME", "")

try:
    session = boto3.Session(profile_name="default")
    s3_client = session.client("s3")
    aws_region = session.region_name
    print("Retrieved AWS credentials from ~/.ssh/aws file")
except Exception as e:
    if str(e) == "The config profile (default) could not be found":
        aws_region = os.getenv("AWS_REGION")
        s3_client = boto3.client("s3", region_name=aws_region)
        print("Retrieved AWS credentials from .env file")

Retrieved AWS credentials from .env file


In [7]:
# joined_tweet_search_terms_no_spaces = [
#     t.replace(" ", "") for t in joined_tweet_search_terms
# ]
# unwanted_partial_strings_list = (
#     crypto_terms
#     + religious_terms
#     + inappropriate_terms
#     + video_games_terms
#     + misc_unwanted_terms
#     + non_english_terms
# )
dtypes_dict = {
    "id": pd.StringDtype(),
    "geo": pd.StringDtype(),
    "coordinates": pd.StringDtype(),
    "place": pd.StringDtype(),
    "contributors": pd.StringDtype(),  # pd.BooleanDtype(),
    "is_quote_status": pd.StringDtype(),  # pd.BooleanDtype(),
    "quote_count": pd.Int32Dtype(),
    "reply_count": pd.Int32Dtype(),
    "retweet_count": pd.Int32Dtype(),
    "favorite_count": pd.Int32Dtype(),
    "favorited": pd.StringDtype(),  # pd.BooleanDtype(),
    "retweeted": pd.StringDtype(),  # pd.BooleanDtype(),
    "source": pd.StringDtype(),
    "in_reply_to_user_id": pd.StringDtype(),
    "in_reply_to_screen_name": pd.StringDtype(),
    "source_text": pd.StringDtype(),
    "place_id": pd.StringDtype(),
    "place_url": pd.StringDtype(),
    "place_place_type": pd.StringDtype(),
    "place_name": pd.StringDtype(),
    "place_full_name": pd.StringDtype(),
    "place_country_code": pd.StringDtype(),
    "place_country": pd.StringDtype(),
    "place_bounding_box_type": pd.StringDtype(),
    "place_bounding_box_coordinates": pd.StringDtype(),
    "place_attributes": pd.StringDtype(),
    "coords_type": pd.StringDtype(),
    "coords_lon": pd.StringDtype(),
    "coords_lat": pd.StringDtype(),
    "geo_type": pd.StringDtype(),
    "geo_lon": pd.StringDtype(),
    "geo_lat": pd.StringDtype(),
    "user_name": pd.StringDtype(),
    "user_screen_name": pd.StringDtype(),
    "user_followers": pd.Int32Dtype(),
    "user_friends": pd.Int32Dtype(),
    "user_listed": pd.Int32Dtype(),
    "user_favourites": pd.Int32Dtype(),
    "user_statuses": pd.Int32Dtype(),
    "user_protected": pd.StringDtype(),  # pd.BooleanDtype(),
    "user_verified": pd.StringDtype(),  # pd.BooleanDtype(),
    "user_contributors_enabled": pd.StringDtype(),
    "user_location": pd.StringDtype(),
    "retweeted_tweet": pd.StringDtype(),
    "tweet_text_urls": pd.StringDtype(),
    "tweet_text_hashtags": pd.StringDtype(),
    "tweet_text_usernames": pd.StringDtype(),
    "num_urls_in_tweet_text": pd.Int32Dtype(),
    "num_users_in_tweet_text": pd.Int32Dtype(),
    "num_hashtags_in_tweet_text": pd.Int32Dtype(),
    "text": pd.StringDtype(),
    # "contains_wanted_text": pd.BooleanDtype(),
    # "contains_wanted_text_case_sensitive": pd.BooleanDtype(),
    # "contains_multi_word_wanted_text": pd.BooleanDtype(),
    # "contains_crypto_terms": pd.BooleanDtype(),
    # "contains_religious_terms": pd.BooleanDtype(),
    # "contains_inappropriate_terms": pd.BooleanDtype(),
    # "contains_video_games_terms": pd.BooleanDtype(),
    # "contains_misc_unwanted_terms": pd.BooleanDtype(),
    # "contains_non_english_terms": pd.BooleanDtype(),
}

In [None]:
# def create_zip_file(
#     file_search_pattern: str, processed_data_dir: str, proc_data_zip_fname: str
# ):
#     """Create zipped file."""
#     os.chdir(processed_data_dir)
#     if not os.path.exists(proc_data_zip_fname):
#         ZipFile = zipfile.ZipFile(proc_data_zip_fname, "w")
#         for f in glob(file_search_pattern):
#             ZipFile.write(f, compress_type=zipfile.ZIP_DEFLATED)
#         ZipFile.close()
#         processed_data_fpath = os.path.join(processed_data_dir, proc_data_zip_fname)
#         print(f"Created zip file at {processed_data_fpath}")
#     os.chdir("../../")
#     os.chdir("notebooks")


# def upload_file_to_s3(
#     aws_region: str,
#     processed_data_dir: str,
#     fname: str,
#     s3_bucket_name: str,
#     s3_key: str,
# ) -> None:
#     """Upload file to key in S3 bucket."""
#     s3_resource = boto3.resource("s3", region_name=aws_region)
#     s3_resource.meta.client.upload_file(
#         f"{processed_data_dir}/{fname}",
#         s3_bucket_name,
#         s3_key,
#     )


# def download_file_from_s3(
#     s3_bucket_name: str,
#     path_to_folder: str,
#     data_dir: str,
#     fname: str,
#     aws_region: str,
# ) -> None:
#     """Download file from ."""
#     dest_filepath = os.path.join(data_dir, fname)
#     s3_filepath_key = s3_client.list_objects_v2(
#         Bucket=s3_bucket_name,
#         Delimiter="/",
#         Prefix=f"{path_to_folder[1:]}processed/",
#     )["Contents"][0]["Key"]
#     start = datetime.now()
#     print(
#         f"Started downloading processed data zip file from {s3_filepath_key} to "
#         f"{dest_filepath} at {start.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}..."
#     )
#     s3 = boto3.resource("s3", region_name=aws_region)
#     s3.meta.client.download_file(
#         s3_bucket_name,
#         s3_filepath_key,
#         dest_filepath,
#     )
#     duration = (datetime.now() - start).total_seconds()
#     print(f"Done downloading in {duration:.3f} seconds.")


# def extract_zip_file(dest_filepath: str, data_dir: str) -> None:
#     """."""
#     start = datetime.now()
#     print(
#         "Started extracting filtered data parquet files from "
#         f"processed data zip file to {data_dir} at "
#         f"{start.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}..."
#     )
#     zip_ref = zipfile.ZipFile(dest_filepath)
#     zip_ref.extractall(data_dir)
#     zip_ref.close()
#     duration = (datetime.now() - start).total_seconds()
#     print(f"Done extracting in {duration:.3f} seconds.")

In [None]:
# def drop_blank_tweets(df: pd.DataFrame, subset: List[str] = ["text"]) -> pd.DataFrame:
#     """Drop tweets with no text."""
#     df_no_nans = df.dropna(subset=subset)
#     num_rows_dropped = len(df) - len(df_no_nans)
#     print(f"Dropped {num_rows_dropped:,} tweets from raw data")
#     return df_no_nans


# def get_raw_masks(
#     df,
#     tweet_search_terms_list: List[str],
#     case_sensitive_tweet_search_terms_list: List[str],
#     joined_tweet_search_terms_no_spaces_list: List[str],
# ) -> List[pd.Series]:
#     """Get masks for tweets with types of wanted terms in text."""
#     lowercase_mask = (
#         df["text"].str.lower().str.contains("|".join(tweet_search_terms_list))
#     )
#     case_mask = df["text"].str.contains(
#         "|".join(case_sensitive_tweet_search_terms_list)
#     )
#     joined_case_mask = (
#         df["text"]
#         .str.lower()
#         .str.replace(" ", "")
#         .str.contains("|".join(joined_tweet_search_terms_no_spaces_list))
#     )
#     print("Created masks to filter raw data based on wanted text in tweets")
#     return [lowercase_mask, case_mask, joined_case_mask]


# def add_search_term_boolean_columns(
#     df: pd.DataFrame,
#     lowercase_mask: pd.Series,
#     case_mask: pd.Series,
#     joined_case_mask: pd.Series,
#     crypto_terms_list: List[str],
#     religious_terms_list: List[str],
#     inappropriate_terms_list: List[str],
#     video_games_terms_list: List[str],
#     misc_unwanted_terms_list: List[str],
#     non_english_terms_list: List[str],
# ) -> pd.DataFrame:
#     """Add boolean columns based on presence of wanted and unwanted terms in tweet text."""
#     df = (
#         df.assign(contains_wanted_text=lowercase_mask)
#         .assign(contains_wanted_text_case_sensitive=case_mask)
#         .assign(contains_multi_word_wanted_text=joined_case_mask)
#         .assign(
#             contains_crypto_terms=df["text"].str.contains("|".join(crypto_terms_list))
#         )
#         .assign(
#             contains_religious_terms=df["text"].str.contains(
#                 "|".join(religious_terms_list)
#             )
#         )
#         .assign(
#             contains_inappropriate_terms=df["text"].str.contains(
#                 "|".join(inappropriate_terms_list)
#             )
#         )
#         .assign(
#             contains_video_games_terms=df["text"].str.contains(
#                 "|".join(video_games_terms_list)
#             )
#         )
#         .assign(
#             contains_misc_unwanted_terms=df["text"].str.contains(
#                 "|".join(misc_unwanted_terms_list)
#             )
#         )
#         .assign(
#             contains_non_english_terms=df["text"].str.contains(
#                 "|".join(non_english_terms_list)
#             )
#         )
#     )
#     print("Created boolean columns to indicate presence of unwanted terms in tweets")
#     terms_str = []
#     pcts_total = []
#     for c in df.columns[df.columns.str.endswith("_terms")]:
#         pct_of_total = (df[c].sum() / len(df)) * 100
#         term_type = c.replace("contains_", "").replace("_terms", "")
#         term_str = f"{term_type}={pct_of_total:.3f}"
#         terms_str.append(term_str)
#         pcts_total.append(pct_of_total)
#     term_str_full = " | ".join(terms_str) + f" | total unwanted={sum(pcts_total):.3f}"
#     print(term_str_full)
#     return df


# def apply_masks(
#     df: pd.DataFrame,
#     case_mask: pd.Series,
#     lowercase_mask: pd.Series,
#     joined_case_mask: pd.Series,
#     unwanted_partial_strings_list: List[str],
# ) -> pd.DataFrame:
#     """Apply masks for only keeping tweets based on wanted terms in text."""
#     df = df.loc[case_mask | lowercase_mask | joined_case_mask]
#     unwanted_mask = df["text"].str.contains("|".join(unwanted_partial_strings_list))
#     df = df.loc[~unwanted_mask]
#     print(f"Kept {len(df):,} tweets after filtering raw data with masks")
#     return df


# def filter_by_num_words_in_tweet(
#     df: pd.DataFrame, min_num_tweet_words_wanted: int
# ) -> pd.DataFrame:
#     """Filter tweets based on number of words in text."""
#     min_num_words_mask = (
#         df["text"].str.split(" ").str.len() >= min_num_tweet_words_wanted
#     )
#     print(
#         f"Kept {len(df.loc[min_num_words_mask]):,} tweets with more than "
#         f"approximately {min_num_tweet_words_wanted:,} words per tweet"
#     )
#     df = df.loc[min_num_words_mask]
#     return df


# def filter_tweets_based_on_content(
#     df_raw: pd.DataFrame,
#     # tweet_search_terms: List[str],
#     # case_sensitive_tweet_search_terms: List[str],
#     # joined_tweet_search_terms_no_spaces: List[str],
#     # crypto_terms: List[str],
#     # religious_terms: List[str],
#     # inappropriate_terms: List[str],
#     # video_games_terms: List[str],
#     # misc_unwanted_terms: List[str],
#     # non_english_terms: List[str],
#     # min_num_words_tweet: int,
# ) -> pd.DataFrame:
#     """Filter tweets based on terms in text and approximate number in words."""
#     start = datetime.now()
#     print(
#         "Filtering Tweets - Starting time = "
#         f"{start.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}..."
#     )
#     df = df_raw.pipe(drop_blank_tweets, subset=["text"])
#     # lowercase_mask, case_mask, joined_case_mask = get_raw_masks(
#     #     df_raw,
#     #     tweet_search_terms,
#     #     case_sensitive_tweet_search_terms,
#     #     joined_tweet_search_terms_no_spaces,
#     # )
#     # df = (
#     #     df_raw.pipe(
#     #         add_search_term_boolean_columns,
#     #         lowercase_mask=lowercase_mask,
#     #         case_mask=case_mask,
#     #         joined_case_mask=joined_case_mask,
#     #         crypto_terms_list=crypto_terms,
#     #         religious_terms_list=religious_terms,
#     #         inappropriate_terms_list=inappropriate_terms,
#     #         video_games_terms_list=video_games_terms,
#     #         misc_unwanted_terms_list=misc_unwanted_terms,
#     #         non_english_terms_list=non_english_terms,
#     #     )
#     #     # .pipe(
#     #     #     apply_masks,
#     #     #     case_mask=case_mask,
#     #     #     lowercase_mask=lowercase_mask,
#     #     #     joined_case_mask=joined_case_mask,
#     #     #     unwanted_partial_strings_list=unwanted_partial_strings_list,
#     #     # )
#     #     # .pipe(
#     #     #     filter_by_num_words_in_tweet, min_num_tweet_words_wanted=min_num_words_tweet
#     #     # )
#     # )
#     end = datetime.now()
#     duration = (end - start).total_seconds()
#     print(
#         "Done filtering at "
#         f"{end.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]} ({duration:.3f} seconds)."
#     )
#     return df

In [None]:
# def get_hourly_folders_per_day(
#     s3_bucket_name: str,
#     path_to_folder: str,
#     years_wanted: List[int],
#     verbose: bool = False,
# ) -> List[str]:
#     """Get list of S3 hourly data folders, per day of streamed data."""
#     list_of_hourly_dirs = []
#     for year in years_wanted:
#         monthly_prefixes = s3_client.list_objects_v2(
#             Bucket=s3_bucket_name,
#             Prefix=f"{path_to_folder[1:]}{year}/",
#             Delimiter="/",
#         )["CommonPrefixes"]
#         # print(monthly_prefixes)

#         for monthly_prefix in monthly_prefixes:
#             daily_prefixes = s3_client.list_objects_v2(
#                 Bucket=s3_bucket_name,
#                 Prefix=monthly_prefix["Prefix"],
#                 Delimiter="/",
#             )["CommonPrefixes"]
#             # print(monthly_prefix, daily_prefixes)

#             for daily_prefix in daily_prefixes:
#                 hourly_prefixes = s3_client.list_objects_v2(
#                     Bucket=s3_bucket_name,
#                     Prefix=daily_prefix["Prefix"],
#                     Delimiter="/",
#                 )["CommonPrefixes"]
#                 # print(
#                 #     monthly_prefix,
#                 #     # daily_prefixes,
#                 #     hourly_prefixes,
#                 # )
#                 list_of_hourly_dirs.append(hourly_prefixes)
#     list_of_hourly_dirs_flat = [sl["Prefix"] for l in list_of_hourly_dirs for sl in l]
#     print(f"Found {len(list_of_hourly_dirs_flat):,} hourly folders")
#     if verbose:
#         for hourly_dirs in list_of_hourly_dirs_flat:
#             print(hourly_dirs)
#     return list_of_hourly_dirs_flat


# def get_hourly_file_name(first_object_key: Dict) -> str:
#     """Get name of hourly object on S3."""
#     fname = (
#         first_object_key["Key"]
#         .split("twitter_delivery_stream")[0]
#         .split("kinesis-demo")[1]
#         .replace("/", "")
#     )
#     return fname


# def get_hourly_files_names(s3_bucket_name: str, list_of_hourly_dirs_flat: List[str]):
#     """Get year, month, date and hour per hour for hourly S3 objects."""
#     keys_yyyymmdd = [
#         get_hourly_file_name(
#             first_object_key=s3_client.list_objects_v2(
#                 Bucket=s3_bucket_name, Prefix=list_of_hourly_dirs
#             )["Contents"][0]
#         )
#         for list_of_hourly_dirs in list_of_hourly_dirs_flat
#     ]
#     file_names = pd.Series(keys_yyyymmdd)
#     assert file_names.sort_values().equals(file_names)
#     assert len(file_names) == len(list_of_hourly_dirs_flat)
#     return file_names


# def save_to_parquet(
#     df: pd.DataFrame, filepath: str, storage_options: Union[None, Dict[str, str]] = None
# ) -> None:
#     """Save DataFrame to .parquet.gzip file."""
#     print(f"Saving to parquet file {os.path.basename(filepath)}...")
#     df.to_parquet(filepath, index=False, storage_options=storage_options)
#     print("Done.")


# def read_parquet(
#     filepath: str,
#     columns: Union[None, List[str]] = None,
#     storage_options: Union[None, Dict[str, str]] = None,
#     verbose: bool = False,
# ) -> pd.DataFrame:
#     """Read DataFrame from .parquet.gzip file."""
#     df = pd.read_parquet(filepath, columns=columns, storage_options=storage_options)
#     if verbose:
#         print(f"Read data from parquet file {os.path.basename(filepath)}")
#     return df


# def get_s3_bucket_file_list(s3_bucklet_name: str, processed_data_dir: str) -> List[str]:
#     """Get list of all filepaths in folder in S3 bucket."""
#     print(f"Getting list of files in {processed_data_dir} in S3 bucket...")
#     s3_processed_data_filepaths = [
#         f"s3://{s3_bucket_name}/{c['Key']}"
#         for c in s3_client.list_objects_v2(
#             Bucket=s3_bucket_name,
#             Delimiter="/",
#             Prefix=f"{processed_data_dir}/",
#         )["Contents"]
#     ]
#     print("Done.")
#     return s3_processed_data_filepaths


# def convert_file_contents_to_df(file_contents_all_flat: List) -> pd.DataFrame:
#     """Read contents of streamed file into single-row DataFrame."""
#     nested_list_of_records = []
#     for file_body in file_contents_all_flat:
#         list_of_records = file_body.decode("utf-8").split("\n")[:-1]
#         nested_list_of_records.append(list_of_records)
#     df = pd.DataFrame(
#         [record.split("\t")[:-1] for sl in nested_list_of_records for record in sl]
#     )
#     return df


# def add_column_headers(df: pd.DataFrame, headers: List[str]) -> pd.DataFrame:
#     """Add column headers to DataFrame."""
#     num_rows, num_cols = df.shape
#     print(f"Raw Data contains {num_rows:,} rows and {num_cols:,} columns")
#     # df_mismatched = df[~df.iloc[:, -2:].isna().all(axis=1)]
#     # mismatched_rows = len(df_mismatched)
#     # with pd.option_context("display.max_columns", None):
#     #     display(df_mismatched.style.set_caption(f"{mismatched_rows:,} Mismatched rows"))
#     if num_cols == 54:
#         df = df.loc[df.iloc[:, -1:].isna().all(axis=1)].drop(columns=[53])
#     elif num_cols == 55:
#         df = df.loc[df.iloc[:, -2:].isna().all(axis=1)].drop(columns=[53, 54])
#     num_new_rows = len(df)
#     print(f"Dropped {(num_rows - num_new_rows):,} mismatched rows out of {num_rows:,}")
#     assert df.shape[1] == len(headers)
#     df.columns = headers
#     return df


# def process_single_hour_files(
#     q: int,
#     list_of_hourly_dirs: List[str],
#     headers: List[str],
#     file_name: str,
#     len_flat_list_of_hourly_dirs: List[str],
#     # tweet_search_terms: List[str],
#     # case_sensitive_tweet_search_terms: List[str],
#     # joined_tweet_search_terms_no_spaces: List[str],
#     # crypto_terms: List[str],
#     # religious_terms: List[str],
#     # inappropriate_terms: List[str],
#     # video_games_terms: List[str],
#     # misc_unwanted_terms: List[str],
#     # non_english_terms: List[str],
#     # min_num_words_tweet: int,
#     s3_bucket_name: str,
#     processed_data_dir: str,
#     aws_region: str,
#     dtypes_dict: Dict,
#     verbose: bool = False,
# ) -> None:
#     """Process single hour of file objects on S3."""
#     # extract
#     objects_hourly_all = s3_client.list_objects_v2(
#         Bucket=s3_bucket_name, Prefix=list_of_hourly_dirs
#     )
#     file_contents_list = []
#     for k, file_obj_dict in enumerate(objects_hourly_all["Contents"], 1):
#         file_body = s3_client.get_object(
#             Bucket=s3_bucket_name, Key=file_obj_dict.get("Key")
#         )["Body"].read()
#         if verbose:
#             print(
#                 f"Dir {q}/{len_flat_list_of_hourly_dirs} - "
#                 f"{list_of_hourly_dirs}, reading object "
#                 f"{k}/{len(objects_hourly_all['Contents'])}"
#             )
#         file_contents_list.append(file_body)
#     print(
#         f"Dir {q}/{len_flat_list_of_hourly_dirs} - {list_of_hourly_dirs} contains "
#         f"{len(file_contents_list):,} file objects"
#     )
#     # transform
#     df = (
#         convert_file_contents_to_df(file_contents_list)
#         .pipe(add_column_headers, headers=headers)
#         # convert to datetime
#         .assign(created_at=lambda df: pd.to_datetime(df["created_at"]))
#         .assign(user_joined=lambda df: pd.to_datetime(df["user_joined"]))
#         # drop blank tweets
#         .pipe(drop_blank_tweets, subset=["text"])
#         # # filter based on text in tweet
#         # .pipe(
#         #     filter_tweets_based_on_content,
#         #     tweet_search_terms=tweet_search_terms,
#         #     case_sensitive_tweet_search_terms=case_sensitive_tweet_search_terms,
#         #     joined_tweet_search_terms_no_spaces=joined_tweet_search_terms_no_spaces,
#         #     crypto_terms=crypto_terms,
#         #     religious_terms=religious_terms,
#         #     inappropriate_terms=inappropriate_terms,
#         #     video_games_terms=video_games_terms,
#         #     misc_unwanted_terms=misc_unwanted_terms,
#         #     non_english_terms=non_english_terms,
#         #     # min_num_words_tweet=min_num_words_tweet,
#         # )
#         # change datatypes
#         .astype(dtypes_dict)
#     )
#     # load
#     if "data/" in processed_data_dir:
#         # save to .parquet.gzip
#         filepath = f"{processed_data_dir}/{file_name}.parquet.gzip"
#         storage_options = None
#     else:
#         filepath = (
#             f"s3://{s3_bucket_name}/{processed_data_dir}/{file_name}.parquet.gzip"
#         )
#         storage_options = {
#             "key": os.getenv("AWS_ACCESS_KEY_ID"),
#             "secret": os.getenv("AWS_SECRET_ACCESS_KEY"),
#         }
#     save_to_parquet(df, filepath, storage_options)


# def process_files_per_hour(
#     s3_bucket_name: str,
#     flat_list_of_hourly_dirs: List[str],
#     headers: List[str],
#     file_names: List[str],
#     # tweet_search_terms: List[str],
#     # case_sensitive_tweet_search_terms: List[str],
#     # joined_tweet_search_terms_no_spaces: List[str],
#     # crypto_terms: List[str],
#     # religious_terms: List[str],
#     # inappropriate_terms: List[str],
#     # video_games_terms: List[str],
#     # misc_unwanted_terms: List[str],
#     # non_english_terms: List[str],
#     # min_num_words_tweet: int,
#     processed_data_dir: str,
#     proc_zip_fname: str,
#     path_to_folder: str,
#     aws_region: str,
#     dtypes_dict: Dict,
#     cleanup_local_files: bool = False,
#     upload_to_s3: bool = False,
#     verbose: bool = False,
# ) -> None:
#     """Run ETL workflow to process hourly streamed tweets into separate parquet file."""
#     for q, (file_name, list_of_hourly_dirs) in enumerate(
#         zip(file_names, flat_list_of_hourly_dirs), 1
#     ):
#         process_single_hour_files(
#             q,
#             list_of_hourly_dirs,
#             headers,
#             file_name,
#             len(flat_list_of_hourly_dirs),
#             # tweet_search_terms,
#             # case_sensitive_tweet_search_terms,
#             # joined_tweet_search_terms_no_spaces,
#             # crypto_terms,
#             # religious_terms,
#             # inappropriate_terms,
#             # video_games_terms,
#             # misc_unwanted_terms,
#             # non_english_terms,
#             # min_num_words_tweet,
#             s3_bucket_name,
#             processed_data_dir,
#             aws_region,
#             dtypes_dict,
#             verbose,
#         )
#         if q < len(flat_list_of_hourly_dirs):
#             print()

#     # (if saved locally) zip all processed data files, upload to S3, delete local files
#     if "data/" in processed_data_dir:
#         if upload_to_s3:
#             # create zip of all .parquet.gzip processed data files
#             curr_dir = os.getcwd()
#             create_zip_file("*.parquet.gzip", processed_data_dir, proc_zip_fname)
#             # upload zip file to S3 bucket
#             try:
#                 assert os.getcwd() == curr_dir
#                 upload_file_to_s3(
#                     aws_region,
#                     processed_data_dir,
#                     proc_zip_fname,
#                     s3_bucket_name,
#                     f"{path_to_folder[1:-1]}/processed/{proc_zip_fname}",
#                 )
#                 print("\nUploaded zipped file to S3 bucket")
#             except AssertionError as e:
#                 print(
#                     f"\n{str(e)}: Incorrect working directory. "
#                     "Did not upload zipped file to S3 bucket."
#                 )

#         if cleanup_local_files:
#             # delete locally exported parquet files
#             list(
#                 map(
#                     os.remove,
#                     [
#                         os.path.join(processed_data_dir, f"{f}.parquet.gzip")
#                         for f in file_names
#                     ],
#                 )
#             )
#     print("Deleted local .parquet.gzip files with filtered data.")
#     # delete local zip file
#     os.remove(os.path.join(processed_data_dir, proc_zip_fname))
#     print("Deleted local .zip file created from all filtered data files.")

## Get Data

Get a nested list of hourly S3 object prefixes per day for which tweets were streamed into the bucket

In [8]:
%%time
list_of_hourly_dirs_flat = get_hourly_folders_per_day(
    s3_client, s3_bucket_name, path_to_folder, years_wanted
)

Found 225 hourly folders
CPU times: user 103 ms, sys: 11.8 ms, total: 115 ms
Wall time: 962 ms


**Notes**
1. In this nested list, each sublist consists of hourly S3 object prefixes per day. Each prefix has the format `datasets/twitter/kinesis-demo/<yyyy>/<mm>/<dd>/`.

Get a flat list of hourly object prefixes covering all the days of streamed tweets in the S3 bucket

In [9]:
%%time
file_names =  get_hourly_files_names(s3_client, s3_bucket_name, list_of_hourly_dirs_flat)

CPU times: user 4.5 s, sys: 52.4 ms, total: 4.55 s
Wall time: 14.5 s


## Combine Data Per Hour

Run an ETL workflow to process hourly file objects in the S3 bucket
- extract all hourly file objects into a single `pandas` `DataFrame`
- process the hourly data
  - convert to `datetime`, as required
  - add column names
  - drop rows in the data where the tweet does not contain text (i.e. drop blank tweets)
- export the processed `DataFrame` to a `.parquet` file

This ETL workflow will give a single `.parquet` file for every hour of every day on which tweets were streamed. All `.parquet` files are then
- combined into a single `.zip` file, which is then
  - uploaded to the `datasets/twitter/kinesis-demo/processed` prefix in the S3 bucket
  - deleted locally
- deleted locally

In [10]:
%%time
process_files_per_hour(
    s3_client,
    s3_bucket_name,
    list_of_hourly_dirs_flat,
    headers,
    file_names.tolist(),
    # tweet_search_terms,
    # case_sensitive_tweet_search_terms,
    # joined_tweet_search_terms_no_spaces,
    # crypto_terms,
    # religious_terms,
    # inappropriate_terms,
    # video_games_terms,
    # misc_unwanted_terms,
    # non_english_terms,
    # min_num_words_tweet,
    processed_data_dir,
    proc_zip_fname,
    path_to_folder,
    aws_region,
    dtypes_dict,
    cleanup_local_files,
    upload_to_s3,
    verbose=True
)

Dir 1/225 - datasets/twitter/kinesis-demo/2021/12/30/17/, reading object 1/15
Dir 1/225 - datasets/twitter/kinesis-demo/2021/12/30/17/, reading object 2/15
Dir 1/225 - datasets/twitter/kinesis-demo/2021/12/30/17/, reading object 3/15
Dir 1/225 - datasets/twitter/kinesis-demo/2021/12/30/17/, reading object 4/15
Dir 1/225 - datasets/twitter/kinesis-demo/2021/12/30/17/, reading object 5/15
Dir 1/225 - datasets/twitter/kinesis-demo/2021/12/30/17/, reading object 6/15
Dir 1/225 - datasets/twitter/kinesis-demo/2021/12/30/17/, reading object 7/15
Dir 1/225 - datasets/twitter/kinesis-demo/2021/12/30/17/, reading object 8/15
Dir 1/225 - datasets/twitter/kinesis-demo/2021/12/30/17/, reading object 9/15
Dir 1/225 - datasets/twitter/kinesis-demo/2021/12/30/17/, reading object 10/15
Dir 1/225 - datasets/twitter/kinesis-demo/2021/12/30/17/, reading object 11/15
Dir 1/225 - datasets/twitter/kinesis-demo/2021/12/30/17/, reading object 12/15
Dir 1/225 - datasets/twitter/kinesis-demo/2021/12/30/17/, rea

**Notes**
1. Since file objects per hour are being combined into a single `DataFrame`, it is possible to use `pandas` (in-memory) for processing hourly data as only a few thousand file objects were created per hour (based on the search filters set during streaming).

## (OPTIONAL) Retrieve `.zip` File, Extract and Load into Memory

The `.zip` file created above is now downloaded from S3 and extracted. Its contents, the hourly `.parquet` files are then read into separate `pandas` DataFrame and combined into a single `pandas` DataFrame

In [11]:
%%time
if "data/" in processed_data_dir:
    proc_zip_fpath = os.path.join(processed_data_dir, proc_zip_fname)
    if not os.path.exists(proc_zip_fpath):
        download_file_from_s3(
            s3_client,
            s3_bucket_name,
            processed_data_dir,
            proc_zip_fname,
            aws_region,
            f"{path_to_folder[1:]}processed/",
        )
        extract_zip_file(proc_zip_fpath, processed_data_dir)
    proc_files = glob(f"{processed_data_dir}/*.parquet.gzip")
    storage_options = None
else:
    storage_options = {
        "key": os.getenv("AWS_ACCESS_KEY_ID"),
        "secret": os.getenv("AWS_SECRET_ACCESS_KEY"),
    }
    proc_files = get_s3_bucket_file_list(s3_bucket_name, processed_data_dir)
dfs_processed = [read_parquet(f, None, storage_options, False) for f in proc_files]
df = pd.concat(dfs_processed, ignore_index=True)
display(
    df[["id", "text", "user_joined", "created_at"]]
    .isna()
    .sum()
    .to_frame()
    .T.add_suffix("__nans")
    .assign(num_rows=len(df))
)
with pd.option_context("display.max_rows", None):
    display(
        df.isna()
        .sum()
        .rename("nans")
        .to_frame()
        .merge(
            df.dtypes.rename("dtype").to_frame(),
            left_index=True,
            right_index=True,
            how="left",
        )
    )
with pd.option_context("display.max_columns", None):
    display(
        df[
            [
                "id",
                "created_at",
                "source_text",
                "user_name",
                "user_screen_name",
                "user_joined",
                "text",
            ]
        ].head().style.set_caption(f"Loaded {len(df):,} rows of processed data")
    )

Started downloading processed data zip file from datasets/twitter/kinesis-demo/processed/combined_data.zip to ../data/processed/combined_data.zip at 2022-10-25 23:47:01.847...
Done downloading in 5.319 seconds.
Started extracting filtered data parquet files from processed data zip file to ../data/processed at 2022-10-25 23:47:07.170...
Done extracting in 1.019 seconds.


Unnamed: 0,id__nans,text__nans,user_joined__nans,created_at__nans,num_rows
0,0,0,0,0,1155499


Unnamed: 0,nans,dtype
id,0,string
geo,0,string
coordinates,0,string
place,0,string
contributors,0,string
is_quote_status,0,string
quote_count,0,Int32
reply_count,0,Int32
retweet_count,0,Int32
favorite_count,0,Int32


Unnamed: 0,id,created_at,source_text,user_name,user_screen_name,user_joined,text
0,1478924374472200194,2022-01-06 03:00:22+00:00,Mars Mission Tweeter,Mars Mission Images Bot 🤖,MarsMissionImgs,2018-12-07 05:41:37+00:00,image taken on at 3:50:12.268 PM with
1,1478924376565329923,2022-01-06 03:00:23+00:00,Twitter for iPhone,Jonathan Procknow,jon_procknow,2017-05-02 18:29:09+00:00,"We just finished deploying our sunshield today, but wait, there's more! secondary mirror is planned to be unfolded tomorrow, Jan. 5th, in the morning (Eastern time). Read more at the blog:"
2,1478924383175458816,2022-01-06 03:00:24+00:00,Twitter for iPhone,kaitlyn,kapricornkait,2009-08-24 04:02:30+00:00,Transiting Pisces Moon conjunct Jupiter tonight
3,1478924386451218432,2022-01-06 03:00:25+00:00,Twitter Web App,Poyo,PoyoDaBoyo,2016-06-14 17:54:30+00:00,"I find it really funny that by nerfing a character (KF2 MK Shuttle Loop limit) that was barely winning tourneys cause only two top players mained him, one of which banned (me)We made the char that everyone hated from day 1 the best again ROFLLLLLLLLLLLLLLLL"
4,1478924386551820291,2022-01-06 03:00:25+00:00,Twitter for Android,VIP (Very Important Plant) 🇲🇾,PuffyCurry,2010-06-11 09:51:14+00:00,Neptune's War Council learns of Roman Emperor Caligula's impending invasion


CPU times: user 11.2 s, sys: 1.35 s, total: 12.6 s
Wall time: 13.8 s


**Notes**
1. For the tweets meeting the required subject for this project, combining all the hourly `.parquet` files into a **single** in-memory `pandas` DataFrame is sufficient. The ability to use a memory-bound framework to hold this (hourly) data depends on the
   - particular **subject chosen for this project**
   - filters applied to remove tweets with irrelevant subjects (crypto, video games, etc.)

   For a different choice of subject or filters, this may not always be possible and out-of-memory `DataFrame`s will be needed to store the hourly data before it is exported to disk (in a `.parquet` file). In a later notebook (`5-process-data.ipynb`), `PySpark` will be used to load all the `.parquet` files into a single PySpark DataFrame, which will be used for text processing.

Export a sample (approximately 0.12%) of all the streamed data to a `*.xlsx` file in order to manually label the subject of each tweet. The labeled data will be used to build up a list that can efficiently capture tweets that are relevant to this project

In [12]:
df["created_at"] = df["created_at"].dt.tz_localize(None)

In [13]:
# 1348 / 1155499 = 0.11667%
(
    df[["id", "created_at", "text"]]
    .sample(n=1348)
    .to_excel(f"{processed_data_dir}/sample_to_label_tweet_subject.xlsx", index=False)
)

## Links

1. JWST
   - [Countries that built the JWST](https://www.webb.nasa.gov/content/about/faqs/faq.html#countries)
   - [JWST Team (space agencies and NASA centers)](https://www.nasa.gov/mission_pages/webb/team/index.html)
   - [JWST could prove Stephen Hawking's theory](https://www.republicworld.com/science/space/james-webb-space-telescope-may-provide-data-to-prove-stephen-hawkings-dark-matter-theory-articleshow.html)
   - [team of project managers and project scientists](https://jwst.nasa.gov/content/meetTheTeam/index.html)

---

<span style="float:left;">
    &#169; 2021 | <a href="https://github.com/elsdes3">@elsdes3</a> (MIT)
</span>

<span style="float:right;">
    <a href="./4-filter-data/notebooks/4_filter_data.ipynb">4 - Filter Data >></a>
</span>