In [None]:
from azure.storage.blob import BlobServiceClient
import pandas as pd
import polars as pl
from gensim.models import Word2Vec
import json

with open("keys.json", "r") as f:
    data = json.load(f)

connection_string = data.get("Connection_string")
container_name = "containerforregular"
csv_blob_name = "HPC_NEW/HateSpeechDatasetBalanced.csv"
model_blob_name = "HPC_NEW/word2vec.model"
vectors_blob_name = "HPC_NEW/word2vec.model.wv.vectors.npy"
syn1_blob_name = "HPC_NEW/word2vec.model.syn1neg.npy"
local_model_file = "downloaded_model.model"
local_vectors_file = "downloaded_model.model.wv.vectors.npy"
local_syn1_file = "downloaded_model.model.syn1neg.npy"

blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)

def download_blob_to_dataframe(blob_name):
    blob_client = container_client.get_blob_client(blob_name)
    download_stream = blob_client.download_blob()
    df = pd.read_csv(download_stream)
    return df

df = download_blob_to_dataframe(csv_blob_name)
print("CSV DataFrame:")
print(df.head(3))

def download_blob_to_file(blob_name, local_file_path):
    blob_client = container_client.get_blob_client(blob_name)
    with open(local_file_path, "wb") as f:
        download_stream = blob_client.download_blob()
        f.write(download_stream.readall())

download_blob_to_file(model_blob_name, local_model_file)
download_blob_to_file(vectors_blob_name, local_vectors_file)
download_blob_to_file(syn1_blob_name, local_syn1_file)

model = Word2Vec.load(local_model_file)
print("Model loaded successfully!")

df = pl.from_pandas(df)

CSV DataFrame:
                                             Content  Label
0  denial of normal the con be asked to comment o...      1
1  just by being able to tweet this insufferable ...      1
2  that is retarded you too cute to be single tha...      1
Model loaded successfully!


In [4]:
text = df.with_columns(pl.col('Content').str.to_lowercase())

type(text)

polars.dataframe.frame.DataFrame

In [6]:
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from typing import List
import time
import nltk

nltk.download('punkt_tab')
nltk.download('stopwords')
nltk.download('wordnet')


stop_words = set(stopwords.words('english'))
exception_words = {'no', 'not', 'never'}
filtered_stopwords = stop_words - exception_words
lemmatizer = WordNetLemmatizer()
def lemmatize_word(text):
    return [lemmatizer.lemmatize(word, pos="v") for word in text]

def remove_stopwords(text: str):
    return [word for word in text if word not in filtered_stopwords]


def preprocess(df: pl.DataFrame) -> pl.DataFrame:
    start = time.time()
    xdf = df.lazy().with_columns(pl.col("Content").str.to_lowercase().alias("Content").str.split(by=' ').alias("Tokens_Content"))

    xdf = xdf.with_columns(
        pl.col("Tokens_Content").map_elements(
            lambda batch: [
                lemmatizer.lemmatize(word, pos="v")
                for word in batch
                if word not in filtered_stopwords and not word.isdigit()
            ],
            return_dtype=pl.List(pl.Utf8)
        ).alias("Processed_Content")).collect()
    end = time.time()
    print(f"Time taken : {(end-start):.3f}")
    return xdf

[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\SIDDHARTH\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\SIDDHARTH\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\SIDDHARTH\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [7]:
processed_df = preprocess(df)

Time taken : 156.791


In [8]:
processed_df

Content,Label,Tokens_Content,Processed_Content
str,i64,list[str],list[str]
"""denial of normal the con be as…",1,"[""denial"", ""of"", … ""retard""]","[""denial"", ""normal"", … ""retard""]"
"""just by being able to tweet th…",1,"[""just"", ""by"", … ""vagina""]","[""able"", ""tweet"", … ""vagina""]"
"""that is retarded you too cute …",1,"[""that"", ""is"", … ""life""]","[""retard"", ""cute"", … ""life""]"
"""thought of a real badass mongo…",1,"[""thought"", ""of"", … ""be""]","[""think"", ""real"", … ""soon""]"
"""afro american basho""",1,"[""afro"", ""american"", ""basho""]","[""afro"", ""american"", ""basho""]"
…,…,…,…
"""i mute this telecasting and pl…",1,"[""i"", ""mute"", … ""cunt""]","[""mute"", ""telecast"", … ""cunt""]"
"""but hell yeah he s not a bache…",1,"[""but"", ""hell"", … ""singer""]","[""hell"", ""yeah"", … ""singer""]"
"""great video musician but s not…",1,"[""great"", ""video"", … ""professional""]","[""great"", ""video"", … ""professional""]"
"""not great pop video yeah he s …",1,"[""not"", ""great"", … ""singer""]","[""not"", ""great"", … ""singer""]"


In [None]:
from gensim.models import Word2Vec
import numpy as np

vector_size = model.vector_size
vocab = set(model.wv.index_to_key)

def embed_word(text, vector_size=vector_size, vocab=vocab):
    embeddings = [
        (model.wv[word].astype(np.float32) if word in vocab else np.zeros(vector_size, dtype=np.float32))
        for word in text
    ]
    del vector_size
    del vocab
    return embeddings

In [None]:
import polars as pl
import numpy as np

schema = {
    'Content': pl.Utf8,
    'Label': pl.Int64,
    'Tokens_Content': pl.List(pl.Utf8),
    'Processed_Content': pl.List(pl.Utf8),
    'Vector_Content': pl.List(pl.Array(pl.Float32, 300))
}

initial_data = {
    'Content': ["sample text"],
    'Label': [1],
    'Tokens_Content': [["token1", "token2"]],
    'Processed_Content': [["processed_token1", "processed_token2"]],
    'Vector_Content': [[[0.1] * 300]]
}

df = pl.DataFrame(initial_data)
df = df.with_columns(
    pl.col("Vector_Content").map_elements(
        lambda x: [np.array(i, dtype=np.float32) for i in x], return_dtype=pl.List(pl.Array(pl.Float32, 300
                                                                                            )))
)

empty_df = df.slice(0,0)
print(empty_df.schema)


Schema({'Content': String, 'Label': Int64, 'Tokens_Content': List(String), 'Processed_Content': List(String), 'Vector_Content': List(Array(Float32, shape=(300,)))})


In [17]:
import gc
gc.collect()

1482

In [22]:
import gc
import psutil
import polars as pl

def print_memory_usage(message=""):
    memory = psutil.virtual_memory().percent
    print(f"{message} - Memory Usage: {memory}%")

batch_size = len(processed_df) // 5

print_memory_usage("Before batch processing")

for i in range(5):
    print_memory_usage(f"Before batch {i+1}")

    cdf = processed_df.slice(batch_size*i, batch_size)
    x = (cdf.lazy()
         .with_columns(pl.col("Processed_Content")
                       .map_elements(embed_word, return_dtype=pl.List(pl.Array(pl.Float32, 300)))
                       .alias("Vector_Content"))
         .collect())
    empty_df.vstack(x, in_place=True)
    print_memory_usage(f"After processing batch {i+1}")
    del cdf
    del x
    gc.collect()

    print_memory_usage(f"After garbage collection for batch {i+1}")
    print(f'Round {i+1} processed')
gc.collect()
print_memory_usage("After all batches processed")


if len(empty_df) < len(processed_df):
    cdf = processed_df.slice(batch_size * 5, len(processed_df) % 5)
    x = (cdf.lazy()
         .with_columns(pl.col("Processed_Content")
                       .map_elements(embed_word, return_dtype=pl.List(pl.Array(pl.Float32, 300)))
                       .alias("Vector_Content"))
         .collect())

    empty_df.vstack(x, in_place=True)

Before batch processing - Memory Usage: 36.5%
Before batch 1 - Memory Usage: 36.5%
After processing batch 1 - Memory Usage: 55.7%
After garbage collection for batch 1 - Memory Usage: 55.7%
Round 1 processed
Before batch 2 - Memory Usage: 55.7%
After processing batch 2 - Memory Usage: 59.7%
After garbage collection for batch 2 - Memory Usage: 59.4%
Round 2 processed
Before batch 3 - Memory Usage: 59.4%
After processing batch 3 - Memory Usage: 59.5%
After garbage collection for batch 3 - Memory Usage: 59.7%
Round 3 processed
Before batch 4 - Memory Usage: 59.7%
After processing batch 4 - Memory Usage: 66.3%
After garbage collection for batch 4 - Memory Usage: 66.1%
Round 4 processed
Before batch 5 - Memory Usage: 66.1%
After processing batch 5 - Memory Usage: 51.9%
After garbage collection for batch 5 - Memory Usage: 52.2%
Round 5 processed
After all batches processed - Memory Usage: 52.1%


In [None]:
import pathlib

path : pathlib.Path = "vectorized_df.parquet"

empty_df.write_parquet(path)


In [None]:
from azure.storage.blob import BlobServiceClient
import json
import pathlib

with open("keys.json", "r") as f:
    data = json.load(f)

connection_string = data.get("Connection_string")
container_name = "containerforregular/Parquet Files"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)

import os
local_file_path : pathlib.Path = "vectorized_df.parquet"
blob_name = os.path.basename(local_file_path)

blob_client = container_client.get_blob_client(blob_name)

with open(local_file_path, "rb") as data:
    blob_client.upload_blob(data, overwrite=False)

print(f"File {blob_name} uploaded to container {container_name}.")

File vectorized_df.parquet uploaded to container containerforregular/Parquet Files.
