In [None]:
%load_ext lab_black

In [None]:
import os

os.chdir("..")

In [None]:
import numpy as np
import pandas as pd
import pickle as pkl
import shutil

from scipy.sparse import csr_array

from tqdm import tqdm
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import monotonically_increasing_id, col, max

In [None]:
dataset_names = ["kos", "nips", "nytimes", "enron", "pubmed"]

docword_paths = [
    os.path.join("resources", "data", f"docword.{name}.txt.gz")
    for name in dataset_names
]

vocab_paths = [
    os.path.join("resources", "data", f"vocab.{name}.txt") for name in dataset_names
]

vocab_mapping_paths = [
    os.path.join("resources", "data", f"{name}_map.pkl") for name in dataset_names
]

In [None]:
docword_path = docword_paths[2]
dataset_name = dataset_names[2]
vocab_mapping_path = vocab_mapping_paths[2]

In [None]:
# csv segmentation

spark = SparkSession.builder.appName("spark").getOrCreate()
schema = StructType(
    [
        StructField("doc_id", IntegerType(), True),
        StructField("word_id", IntegerType(), True),
        StructField("count", IntegerType(), True),
    ]
)
df = (
    spark.read.format("csv")
    .option("delimiter", True)
    .option("header", False)
    .option("delimiter", " ")
    .schema(schema)
    .load(docword_path)
)
df = df.withColumn("id", monotonically_increasing_id())
df = df.where(df.id > 2)
df = df.drop("id")

n = df.select("doc_id").distinct().count() // 10000
df = df.withColumn("part_col", col("doc_id") % n)
temp_path = "temp"
df.write.format("csv").partitionBy("part_col").save(temp_path)

In [None]:
unified_vocab_path = "resources/data/vocab_unified.csv"
max_word_id = pd.read_csv(unified_vocab_path).shape[0] - 1

In [None]:
# Converts all records with word counts of given doc to
# Sparse vector
def flatten_group(group):
    ids = group["word_id"].values
    counts = group["count"].values
    bow_vector = np.zeros(max_word_id)
    bow_vector[ids - 1] = counts
    bow_vector = csr_array(bow_vector)
    return bow_vector

In [None]:
with open(vocab_mapping_path, "rb") as f:
    mapping = pkl.load(f)

In [None]:
out_df = pd.DataFrame(columns=["doc_id", "bow_vector"])

for d in tqdm(os.listdir(temp_path)):
    if not d.startswith("part_col="):
        continue
    path = os.path.join(temp_path, d)
    csv_path = list(filter(lambda dir: dir.endswith(".csv"), os.listdir(path)))[0]
    temp_df = pd.read_csv(
        os.path.join(path, csv_path),
        header=None,
    )
    temp_df.columns = ["doc_id", "word_id", "count"]
    temp_df["word_id"] = temp_df["word_id"].map(mapping)
    temp_df = (
        temp_df.groupby("doc_id", as_index=False)
        .apply(flatten_group)
        .reset_index(drop=True)
    )
    temp_df.columns = ["doc_id", "bow_vector"]
    out_df = pd.concat([out_df, temp_df]).reset_index(drop=True)
    del [temp_df]

In [None]:
out_df.to_pickle(f"resources/data/converted.{dataset_name}.pkl")

In [None]:
shutil.rmtree(temp_path)