# Finalise twitter
Finalises the twitter data.
And persists it to s3.
See docs for more information.

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import datetime

import pandas as pd
import tentaclio

from phoenix.common import artifacts, run_datetime
from phoenix.common import utils
from phoenix.tag import export
from phoenix.tag import finalise
from phoenix.tag import object_filters

In [None]:
utils.setup_notebook_output()
utils.setup_notebook_logging()

In [None]:
# Parameters
# See phoenix/common/run_datetime.py expected format of parameter
RUN_DATETIME = None

# See phoenix/common/artifacts/registry_environment.py expected format of parameter
ARTIFACTS_ENVIRONMENT_KEY = "local"

# Filters for batch
YEAR_FILTER = 2021
# Without zero padding
MONTH_FILTER = 8
OBJECT_TYPE = "tweets"

# OUTPUT
FINAL_URL_TWEETS = None

In [None]:
if RUN_DATETIME:
    run_dt = run_datetime.from_file_safe_str(RUN_DATETIME)
else:
    run_dt = run_datetime.create_run_datetime_now()
    
url_config = {
    "YEAR_FILTER": int(YEAR_FILTER),
    "MONTH_FILTER": int(MONTH_FILTER),
    "OBJECT_TYPE": OBJECT_TYPE
}
art_url_reg = artifacts.registry.ArtifactURLRegistry(run_dt, ARTIFACTS_ENVIRONMENT_KEY, artifacts.registry_mappers.get_default_mappers())
TAGGING_RUNS_URL_TWEETS_PULLED = art_url_reg.get_url("tagging_runs-tweets_pulled", url_config)
TAGGING_RUNS_URL_PIPELINE_BASE = art_url_reg.get_url("tagging_runs-pipeline_base", url_config)
TAGGING_RUNS_URL_OBJECTS_TENSIONS = art_url_reg.get_url("tagging_runs-objects_tensions", url_config)
TAGGING_RUNS_URL_LANGUAGE_SENTIMENT_OBJECTS = art_url_reg.get_url("tagging_runs-language_sentiment_objects", url_config)
TAGGING_RUNS_URL_TWEETS_FINAL = art_url_reg.get_url("tagging_runs-tweets_final", url_config)
if not FINAL_URL_TWEETS:
    FINAL_URL_TWEETS = art_url_reg.get_url("final-tweets", url_config)

In [None]:
# Display params.
print(
TAGGING_RUNS_URL_TWEETS_PULLED,
TAGGING_RUNS_URL_PIPELINE_BASE,
TAGGING_RUNS_URL_OBJECTS_TENSIONS,
TAGGING_RUNS_URL_LANGUAGE_SENTIMENT_OBJECTS,
TAGGING_RUNS_URL_TWEETS_FINAL,
FINAL_URL_TWEETS,
run_dt.dt,
sep='\n',
)

In [None]:
# %env DASK_CLUSTER_IP=

In [None]:
utils.dask_global_init()

In [None]:
tweets_df = artifacts.dataframes.get(TAGGING_RUNS_URL_TWEETS_PULLED).dataframe

In [None]:
tweets_df.head()

In [None]:
objects = artifacts.dataframes.get(TAGGING_RUNS_URL_OBJECTS_TENSIONS).dataframe

In [None]:
objects.head()

In [None]:
language_sentiment_objects = artifacts.dataframes.get(TAGGING_RUNS_URL_LANGUAGE_SENTIMENT_OBJECTS).dataframe
language_sentiment_objects.head()

In [None]:
tweets_final = finalise.join_objects_to_tweets(objects, language_sentiment_objects, tweets_df)

In [None]:
tweets_final.head()

In [None]:
tweets_final.dtypes

In [None]:
_ = artifacts.dataframes.persist(FINAL_URL_TWEETS, tweets_final)

In [None]:
_ = artifacts.dataframes.persist(TAGGING_RUNS_URL_TWEETS_FINAL, tweets_final)

In [None]:
artifacts.dataframes.read_schema(TAGGING_RUNS_URL_TWEETS_FINAL)