In [1]:
%load_ext dotenv
%dotenv

In [2]:
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.io as pio

from pyspark.sql import SparkSession
import pyspark.pandas as ps
from pyspark.sql import functions as F
from pyspark.sql import Window

# noinspection PyUnresolvedReferences
import human.plotly_template

pio.templates.default = "plotly+human"

In [3]:
df = pd.read_parquet('s3://merged-tweets/testing-sample/test.parquet')

# Dataset exploration

In [None]:
df = ps.from_pandas(df).to_spark()

In [None]:
df.show(10)

In [None]:
df.dtypes

In [None]:
df.count()

In [None]:
(df.replace(float('nan'), None)
 .agg(*[F.expr(f'count({col}) as {col}') for col in df.columns])
).show()

In [None]:
w = Window.partitionBy('id')
df.select('*', F.count('id').over(w).alias('dupeCount'))\
    .where('dupeCount > 1')\
    .drop('dupeCount')\
    .show()

In [None]:
df.groupBy('lang').count().show()

In [None]:
df.groupBy('favorited').count().show()


In [None]:
df.groupBy('retweeted').count().show()

In [None]:
df.groupBy('possibly_sensitive').count().show()


# Descriptive analysis

## Daily Tweets

In [None]:
df_plot = (df.select(F.date_format('created_at','yyyy-MM-dd').alias('created_at'))
 .groupby('created_at')
 .count()
 .withColumnRenamed('count', 'size'))
px.line(df_plot.toPandas().sort_values(by="created_at"), x='created_at', y='size')

## Monthly Tweets

In [None]:
df_plot = (df.select(F.date_format('created_at', 'yyyy-MM').alias('created_at'))
 .groupby('created_at')
 .count()
 .withColumnRenamed('count', 'size'))
px.line(df_plot.toPandas().sort_values(by="created_at"), x='created_at', y='size')

## Retweets and favs distribution

### Including 0

In [None]:
limits, count = df.select('retweet_count').rdd.flatMap(lambda x: x).histogram(list(range(0, 100, 10)))
limits = np.array(limits)
fig = px.histogram(x=(limits[:-1] + limits[1:]) / 2 + 2, y=count, nbins=10, range_x=[0, 100])
fig.update_xaxes(title='retweet_count').update_yaxes(title='count')

In [None]:
limits, count = df.select('favorite_count').rdd.flatMap(lambda x: x).histogram(list(range(0, 100, 10)))
limits = np.array(limits)
fig = px.histogram(x=(limits[:-1] + limits[1:]) / 2 + 2, y=count, nbins=10, range_x=[0, 100])
fig.update_xaxes(title='favorite_count').update_yaxes(title='count')

### Excluding 0

In [None]:
limits, count = df.filter(df.retweet_count > 0).select("retweet_count").rdd.flatMap(lambda x: x).histogram(list(range(0, 100, 10)))
limits = np.array(limits)
fig = px.histogram(x=(limits[:-1] + limits[1:]) / 2 + 2, y=count, nbins=10, range_x=[0, 100])
fig.update_xaxes(title='retweet_count').update_yaxes(title='count')

In [None]:
limits, count = df.filter(df.favorite_count > 0).select("favorite_count").rdd.flatMap(lambda x: x).histogram(list(range(0, 100, 10)))
limits = np.array(limits)
fig = px.histogram(x=(limits[:-1] + limits[1:]) / 2 + 2, y=count, nbins=10, range_x=[0, 100])
fig.update_xaxes(title='favorite_count').update_yaxes(title='count')

# Text Analysis

In [None]:
import spacy
from collections import Counter

In [None]:
nlp = spacy.load('en_core_web_sm')

In [None]:
text_array = np.array(df.select("full_text").collect()).flatten().tolist()

In [None]:
docs = nlp.pipe(text_array, n_process=4) # This will require a better handling -> https://spacy.io/usage/processing-pipelines

In [None]:
# This preprocessing can be done in the dataframe (enabling the comparison between dask and pyspark)
def is_token_allowed(token, pos_tag=None):
    matches_pos = True if pos_tag is None else token.pos_ == pos_tag
    if (not token or not token.text.strip() or
            token.is_stop or token.is_punct or not matches_pos):
        return False
    return True


def preprocess_token(token, lemma=False):
    if lemma:
        return token.lemma_.strip().lower()
    return token.text.strip().lower()


def get_word_lists(docs, lemma=False):
    words = []
    nouns = []
    verbs = []
    for doc in docs:
        for token in doc:
            if is_token_allowed(token):
                words.append(preprocess_token(token, lemma=lemma))

                if is_token_allowed(token, pos_tag='NOUN'):
                    nouns.append(preprocess_token(token, lemma=lemma))

                if is_token_allowed(token, pos_tag='VERB'):
                    verbs.append(preprocess_token(token, lemma=lemma))

    return words, nouns, verbs

## No lemmatization

In [None]:
words, nouns, verbs = get_word_lists(docs)

In [None]:
word_freq = Counter(words)
common_words = word_freq.most_common(50)

noun_freq = Counter(nouns)
common_nouns = noun_freq.most_common(50)

verb_freq = Counter(verbs)
common_verbs = verb_freq.most_common(50)

### Word Count

In [None]:
df_plot = pd.DataFrame(common_words, columns=['word', 'count'])
px.bar(df_plot.sort_values('count'), x='count', y='word', text_auto=True, height=1000)

### Noun count

In [None]:
df_plot = pd.DataFrame(common_nouns, columns=['word', 'count'])
px.bar(df_plot.sort_values('count'), x='count', y='word', text_auto=True, height=1000)

### Verb count

In [None]:
df_plot = pd.DataFrame(common_verbs, columns=['word', 'count'])
px.bar(df_plot.sort_values('count'), x='count', y='word', text_auto=True, height=1000)

## Lemmatization

In [None]:
docs = nlp.pipe(text_array, n_process=4)

In [None]:
words, nouns, verbs = get_word_lists(docs, lemma=True)

In [None]:
word_freq = Counter(words)
common_words = word_freq.most_common(50)

noun_freq = Counter(nouns)
common_nouns = noun_freq.most_common(50)

verb_freq = Counter(verbs)
common_verbs = verb_freq.most_common(50)

### Word Count

In [None]:
df_plot = pd.DataFrame(common_words, columns=['word', 'count'])
px.bar(df_plot.sort_values('count'), x='count', y='word', text_auto=True, height=1000)

### Noun count

In [None]:
df_plot = pd.DataFrame(common_nouns, columns=['word', 'count'])
px.bar(df_plot.sort_values('count'), x='count', y='word', text_auto=True, height=1000)

### Verb count

In [None]:
df_plot = pd.DataFrame(common_verbs, columns=['word', 'count'])
px.bar(df_plot.sort_values('count'), x='count', y='word', text_auto=True, height=1000)