In [None]:
%pip install kaggle

In [None]:
%pip install pyspark

In [None]:
%pip install nltk

In [None]:
%pip install pandas

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!mkdir -p ~/.kaggle
!cp /content/drive/MyDrive/A_Progetti/AlgoForMassiveDatasets/kaggle.json ~/.kaggle
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d bwandowando/ukraine-russian-crisis-twitter-dataset-1-2-m-rows

# Preprocessing
We're going to preprocess the original dataset from Kaggle to reduce its size and only work on meaningful data for our analysis: 
1. unzip the provided file to work on individual CSV files;
2. filter only English written tweets to build a coherent language base;
3. remove useless columns such as the account description or the number of retweets;
4. possibly remove some stop-words.

In [None]:
import time

start_time = time.time()

## Code imports and globals

In [None]:
import zipfile
import os
import pyspark
import regex
import shutil
import gzip
import nltk
from nltk.tokenize import word_tokenize
import string
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.types import Row
from pyspark.sql.functions import concat_ws
import multiprocessing
import pandas
from typing import List
from datetime import datetime
from nltk.corpus import stopwords
import json


KAGGLE_DATASET_DIRECTORY = "out/ukraine-russian-crisis-twitter-dataset-1-2-m-rows"
KAGGLE_DATASET = "ukraine-russian-crisis-twitter-dataset-1-2-m-rows.zip"
WORKERS_CORES = multiprocessing.cpu_count()
FILTER_LANGUAGE = "en"

TOP_HASHTAGS = ['ukraine', 'russia', 'standwithukraine', 'putin', 'russian', 'mariupol', 'news', 'azovstal', 'ukrainerussianwar', 'ukrainian', 'nato', 'ukrainewar', 'ukraine', 'business', 'ukrainerussiawar', 'kharkiv', 'usa', 'tigray', 'armukrainenow', 'kyiv', 'russianukrainianwar', 'stoprussia', 'slavaukraini', 'standwithukraine', 'stopputinnow', 'stopputin', 'war', 'russiaukrainewar', 'savemariupol', 'zelenskyy', 'anonymous', 'biden', 'russians', 'eu', 'ukraineunderattack', 'kherson', 'standwithukriane', 'scotus', 'helpukraine', 'russianwarcrimes', 'tigraygenocide', 'us', 'nft', 'china', 'zelenskiynft', 'zelensky', 'endtigraysiege', 'safeairliftukraine', 'azov', 'giveaway', 'europe', 'ukraineunderattack', 'oprussia', 'donbass', 'canada', 'bucha', 'moscow', 'nfts', 'syria', 'poland', 'uk', 'russiaukraine', 'breaking', 'india', 'odesa', 'germany', 'ukrainians', 'russianarmy', 'donetsk', 'putinwarcrimes', 'mykolaiv', 'ukrainerussia', 'belarus', 'warcrimes', 'kiev', 'healthy', 'healthyeating', 'donbas', 'ukraineinvasion', 'breakingnews', 'roevwade', 'united24', 'luhansk', 'peace', 'stoprussianaggression', 'supportukraine', 'severodonetsk', 'putinwarcriminal', 'odessa', 'crypto', 'irpin', 'bitcoin', 'noflyzone', 'moskva', 'trump', 'closethesky', 'product', 'freeshipping', 'russiaukraineconflict', 'warinukraine']


def update_top_hashtags():
    global TOP_HASHTAGS_INDEX, TOP_HASHTAGS_REVERSE_INDEX
    zipped_hashtags = list(zip(TOP_HASHTAGS, range(len(TOP_HASHTAGS))))
    TOP_HASHTAGS_INDEX = {key: value for key, value in zipped_hashtags}
    TOP_HASHTAGS_REVERSE_INDEX = {value: key for key, value in zipped_hashtags}

TOP_HASHTAGS_INDEX = dict()
TOP_HASHTAGS_REVERSE_INDEX = dict()
update_top_hashtags()

TOP_HASHTAGS_INDEX_FILENAME = "top_hashtags_index.json"
TOP_HASHTAGS_REVERSE_INDEX_FILENAME = "top_hashtags_reverse_index.json"

## Configuration

In [None]:
DO_PREPROCESS = True
FIND_MOST_COMMON = True

## Stopwords list retrieval

In [None]:
def update_nltk():
    nltk.download('stopwords')
    nltk.download('punkt')

## Dataset extraction

In [None]:
def dataset_extraction(archive: str, output_directory: str) -> List[str]:
    new_files = list()
    with zipfile.ZipFile(archive, "r") as zip_ref:

        #return [os.path.join(output_directory, csv_name) for csv_name, _ in [os.path.splitext(g_name) for g_name in zip_ref.namelist()[:2]]]

        if not os.path.isdir(output_directory):
            zip_ref.extractall(output_directory)
            #new_files = zip_ref.namelist()
        # else:
        for gzip_name in zip_ref.namelist():
            csv_name, extension = os.path.splitext(gzip_name)
            if os.path.isfile(os.path.join(output_directory, csv_name)):
                continue
            if not os.path.isfile(os.path.join(output_directory, gzip_name)):
                zip_ref.extract(gzip_name, path=output_directory)
            csv_path = os.path.join(output_directory, csv_name)
            with gzip.open(os.path.join(output_directory, gzip_name), 'r') as gzip_file, open(csv_path, 'wb') as csv_file:
                shutil.copyfileobj(gzip_file, csv_file)
            new_files.append(csv_path)
    return new_files

## Spark initialization

In [None]:
def init_spark():
    print(f"Available CPU cores/workers: {WORKERS_CORES}")
    print("Initializing spark...", end=' ', flush=True)
    spark = (
        pyspark.sql.SparkSession.builder
        .master(f"local[{WORKERS_CORES}]")
        .appName("Sparkiodi")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("OFF")
    print("Spark initialized")
    return spark

## Dataset reduction

In [None]:
def read_dataframe(path: List[str], spark, header: bool = True, language: str = None):
    if header:
        starting_df = (
            spark
            .read
            .option("header", True)
            .option("multiLine", True)
            .csv(path)
            # .csv(f"{KAGGLE_DATASET_DIRECTORY}/*.csv")  # Read all files
            # .csv(f"{KAGGLE_DATASET_DIRECTORY}/UkraineCombinedTweetsDeduped_FEB28_part2.csv")    # Read the smallest file
            # .csv(f"{KAGGLE_DATASET_DIRECTORY}/0505_to_0507_UkraineCombinedTweetsDeduped.csv")  # Read the biggest file
        )
    else:
        starting_df = (
            spark
            .read
            .option("header", False)
            .option("multiLine", True)
            .csv(path)
            # .csv(f"{KAGGLE_DATASET_DIRECTORY}/*.csv")  # Read all files
            # .csv(f"{KAGGLE_DATASET_DIRECTORY}/UkraineCombinedTweetsDeduped_FEB28_part2.csv")    # Read the smallest file
            # .csv(f"{KAGGLE_DATASET_DIRECTORY}/0505_to_0507_UkraineCombinedTweetsDeduped.csv")  # Read the biggest file
        )

    if language:
        broadcast_language = spark.sparkContext.broadcast(FILTER_LANGUAGE)
        starting_df = starting_df.where(
            starting_df.language == broadcast_language.value)

    return starting_df.select("text")

### Entities removal and hashtags extraction

In [None]:
hashtag_regex_str = r"(?:\#+)([\w_]+)"  # hashtags
regex_str = [
    (r'(?:@[\w_]+)', ''),  # @-mentions
    (r'http[s]?://(?:[a-z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-f][0-9a-f]))+', ''), # URLs
    (r'[^\w\s]', ' '),  # punctuation
    (r'\s+', ' ')  # whitespaces
]

hashtag_regex = regex.compile(hashtag_regex_str)
regex = [(hashtag_regex, '')] + [(regex.compile(compiled[0]), compiled[1])
                                 for compiled in regex_str]   # Keep hashtag_regex as the first applied regex


def clean_text(text: str) -> List[str]:
    text = text.lower()

    for reg in regex:
        text = reg[0].sub(reg[1], text)

    text_list = set([
        word.rstrip() for word in word_tokenize(text)
        if word not in string.punctuation
    ])

    return list(text_list)


def from_indices_to_boolean_vector(indices):
    pads = [0 for _ in range(len(TOP_HASHTAGS))]
    for index in indices:
        pads[index] = 1
    return pads


def padding_hashtags(hashtags, indexes, compress=False):
    if compress:
        pads = list()
        for tag in hashtags:
          if tag in indexes:
              pads.append(indexes[tag])
        pads.sort()
    else:
        pads = [0 for _ in range(len(indexes))]
        for tag in hashtags:
            if tag in indexes:
                pads[indexes[tag]] = 1
    return pads


def clean_dataframe(df, spark):
    remover = StopWordsRemover(stopWords=stopwords.words('english'))
    remover.setInputCol("tweet")
    remover.setOutputCol("filtered_tweet")

    tweets_hashtags_rdd = (
        df.rdd
        .map(lambda row: (clean_text(row.text), [ht.lower() for ht in hashtag_regex.findall(row.text)]))
    )

    if FIND_MOST_COMMON:
        global TOP_HASHTAGS
        TOP_HASHTAGS = (
            tweets_hashtags_rdd
            .flatMap(lambda row: row[1])
            .map(lambda row: (row, 1))
            .reduceByKey(lambda x, y: x + y)
            .map(lambda row: (row[1], row[0]))
            .sortByKey(ascending=False)
            .map(lambda row: row[1])
            .take(100)
        )
        update_top_hashtags()

    broadcast_top_hashtags_index = spark.sparkContext.broadcast(TOP_HASHTAGS_INDEX)
        
    df = (
        tweets_hashtags_rdd
        .map(lambda row: (row[0], padding_hashtags(row[1], broadcast_top_hashtags_index.value, True)))
        .toDF(["tweet", "hashtags"])
    )
    df = (
        remover
        .transform(df)
        .select("filtered_tweet", "hashtags")
        .withColumn("filtered_tweet", concat_ws(" ", "filtered_tweet"))
        .withColumn("hashtags", concat_ws(" ", "hashtags"))
    )
    df = df.where((df.filtered_tweet != ""))

    return df

## Preprocessed dataset storing

In [None]:
def write_preprocessed(df, output_directory: str) -> str:
    print("Writing top hashtags...")
    with open(TOP_HASHTAGS_INDEX_FILENAME, "w") as fp:
        json.dump(TOP_HASHTAGS_INDEX, fp)
    with open(TOP_HASHTAGS_REVERSE_INDEX_FILENAME, "w") as fp:
        json.dump(TOP_HASHTAGS_REVERSE_INDEX, fp)
    print("Writing all the CSVs...")
    (df
        # .coalesce(WORKERS_CORES)
        .write
        .mode("overwrite")
        .option("header", False)
        .csv(output_directory)
     )

    for dirpath, dirnames, filenames in os.walk(output_directory):
        for filename in filenames:
            name, extension = os.path.splitext(filename)
            if extension == ".csv":
                continue
            filepath = os.path.join(dirpath, filename)
            try:
                os.remove(filepath)
            except:
                pass

In [None]:
def compress(path: str) -> str:
    print("Creating archive directory...")
    if os.path.isfile(path):
        name, ext = os.path.splitext(path)
        output_archive = f"{name}.zip"
        with zipfile.ZipFile(output_archive, "w", compression=zipfile.ZIP_DEFLATED) as zf:
            zf.write(path, os.path.basename(path))
    else:
        output_archive = shutil.make_archive(path, 'zip', path)
    return output_archive

In [None]:
def create_single_csv(folders: List[str], overwrite: bool = False) -> str:
    single_filename = 'out/dataset.csv'
    filemode = 'wb' if overwrite else 'ab'
    with open(single_filename, filemode) as output_file:
        for directory in folders:
            for dirpath, dirnames, filenames in os.walk(directory):
                for filename in filenames:
                    name, extension = os.path.splitext(filename)
                    if extension != ".csv":
                        continue
                    filepath = os.path.join(dirpath, filename)
                    with open(filepath, 'rb') as csv:
                        shutil.copyfileobj(csv, output_file)
    return single_filename

## Start preprocessing

In [None]:
def preprocess() -> None:
    new_files = dataset_extraction(KAGGLE_DATASET, KAGGLE_DATASET_DIRECTORY)
    if not len(new_files):
        print("No new files to process")
        return

    update_nltk()
    if not ('spark' in locals() or 'spark' in globals()):
        spark = init_spark()
    df = read_dataframe(new_files, spark, language=FILTER_LANGUAGE)
    df = clean_dataframe(df, spark)
    output_directory = f"out/preprocessed_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}"
    write_preprocessed(df, output_directory)
    csv = create_single_csv([output_directory])
    compress(csv)

if DO_PREPROCESS:
    preprocess()

In [None]:
end_time = time.time()
print(f"Start time: {start_time}")
print(f"End time: {end_time}")
print(f"Elapsed time: {time.strftime('%H:%M:%S', time.gmtime(end_time - start_time))}")

In [None]:
!head out/dataset.csv