# Pre Processing Amazon Sentiment Analysis Dataset

In [1]:
import numpy as np 
import pandas as pd 
import polars as pl

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

/kaggle/input/amazon-review-full-csv/amazon_review_full_csv/readme.txt
/kaggle/input/amazon-review-full-csv/amazon_review_full_csv/train.csv
/kaggle/input/amazon-review-full-csv/amazon_review_full_csv/test.csv


In [2]:
!cat /kaggle/input/amazon-review-full-csv/amazon_review_full_csv/readme.txt

Amazon Review Full Score Dataset

Version 3, Updated 09/09/2015

ORIGIN

The Amazon reviews dataset consists of reviews from amazon. The data span a period of 18 years, including ~35 million reviews up to March 2013. Reviews include product and user information, ratings, and a plaintext review. For more information, please refer to the following paper: J. McAuley and J. Leskovec. Hidden factors and hidden topics: understanding rating dimensions with review text. RecSys, 2013.

The Amazon reviews full score dataset is constructed by Xiang Zhang (xiang.zhang@nyu.edu) from the above dataset. It is used as a text classification benchmark in the following paper: Xiang Zhang, Junbo Zhao, Yann LeCun. Character-level Convolutional Networks for Text Classification. Advances in Neural Information Processing Systems 28 (NIPS 2015).


DESCRIPTION

The Amazon reviews full score dataset is constructed by randomly taking 600,000 training samples and 130,000 testing samples for each review score from 

In [3]:
!pip install tiktoken pyspark

Collecting tiktoken
  Downloading tiktoken-0.5.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Downloading tiktoken-0.5.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m43.8 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=5ab9068eb6cd4d78e016f79c331b969a822ace2959310baeb9554943c2931c98
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully 

### Imports and utils

In [142]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType
import tiktoken
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import size, max, monotonically_increasing_id

# utils and constants

PATH_TRAIN_DATA = "/kaggle/input/amazon-review-full-csv/amazon_review_full_csv/train.csv"
PATH_TEST_DATA = "/kaggle/input/amazon-review-full-csv/amazon_review_full_csv/test.csv"

def as_udf(output_annotation):
    def wrapper(func):
        return udf(func, output_annotation)
    return wrapper


@as_udf(ArrayType(IntegerType()))
def encode_rating(rating: str):
    return [
        [1, 0, 0, 0, 0],
        [0, 1, 0, 0, 0],
        [0, 0, 1, 0, 0],
        [0, 0, 0, 1, 0],
        [0, 0, 0, 0, 1],
    ][rating - 1]

@as_udf(ArrayType(IntegerType()))
def encode_text(text):
    gpt2_encoder = tiktoken.get_encoding("gpt2")
    text = text[:-1]
    return gpt2_encoder.encode(text)



### Creating spark session (10gb, 10gb)

In [8]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("EDA") \
    .config("spark.driver.memory", "10g") \
    .config("spark.executor.memory", "10g") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/03 13:04:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Reading and cleaning data

In [67]:
raw_train = (
    spark
    .read.csv(PATH_TRAIN_DATA, header=False, inferSchema=True, sep='","')
)

                                                                                

In [108]:
from pyspark.sql.functions import regexp_replace, col, substring,length, expr
from pyspark.sql.types import ShortType


def clean(data):
    return (
        data
            .toDF("rating", "_c1", "text")
            .select(
                substring("rating", 2, 2).alias("rating"),
                expr("substring(text, 1, length(text) -2 )").alias("text")
            ).select(
                col("rating").cast(ShortType()),
                regexp_replace("text", '""', '"',).alias("text")
            )
    )

def create_encoding(data):
    return data.select(
        encode_rating("rating").alias("rating"),
        encode_text("text").alias("text")
    )


In [109]:
train_encoded = create_encoding(clean(raw_train))
train_encoded.show()

[Stage 116:>                                                        (0 + 1) / 1]

+---------------+--------------------+
|         rating|                text|
+---------------+--------------------+
|[0, 0, 1, 0, 0]|[38, 1015, 428, 2...|
|[0, 0, 0, 0, 1]|[40, 2911, 257, 1...|
|[0, 0, 0, 0, 1]|[40, 1101, 3555, ...|
|[0, 0, 0, 1, 0]|[464, 2647, 286, ...|
|[0, 0, 0, 0, 1]|[34784, 262, 6000...|
|[0, 0, 0, 0, 1]|[1858, 338, 257, ...|
|[1, 0, 0, 0, 0]|[1212, 318, 257, ...|
|[0, 0, 0, 1, 0]|[40, 373, 257, 62...|
|[1, 0, 0, 0, 0]|[32, 1844, 7030, ...|
|[1, 0, 0, 0, 0]|[40, 4724, 345, 4...|
|[1, 0, 0, 0, 0]|[40, 1254, 314, 4...|
|[0, 0, 0, 1, 0]|[2215, 345, 3285,...|
|[0, 0, 0, 0, 1]|[45675, 4283, 654...|
|[0, 0, 1, 0, 0]|[1026, 1718, 2048...|
|[0, 1, 0, 0, 0]|[82, 4340, 389, 8...|
|[0, 0, 1, 0, 0]|[13295, 286, 3852...|
|[1, 0, 0, 0, 0]|[27202, 621, 3762...|
|[0, 0, 0, 1, 0]|[28292, 12150, 69...|
|[0, 0, 0, 1, 0]|[40, 2982, 257, 3...|
|[0, 0, 1, 0, 0]|[40, 2630, 257, 1...|
+---------------+--------------------+
only showing top 20 rows



                                                                                

## Determining necessary padding (max context window)

In [113]:
train_encoded.select(max(size("text"))).show()



+---------------+
|max(size(text))|
+---------------+
|            615|
+---------------+



                                                                                

In [114]:
raw_test = (
    spark
    .read.csv(PATH_TEST_DATA, header=False, inferSchema=True, sep='","')
)
test_encoded = create_encoding(clean(raw_test))
test_encoded.select(max(size("text"))).show()

                                                                                

+---------------+
|max(size(text))|
+---------------+
|            624|
+---------------+



So let's go with 624.

In [120]:
@as_udf(ArrayType(IntegerType()))
def pad_text_encoding(encoded_text):
    padding = (624 - len(encoded_text))*[0]
    return encoded_text + padding

In [125]:
test_encoded.select("rating", pad_text_encoding("text")).take(1)

                                                                                

[Row(rating=[1, 0, 0, 0, 0], pad_text_encoding(text)=[1212, 2746, 743, 307, 12876, 329, 10081, 48648, 3858, 11, 475, 314, 1101, 4075, 290, 651, 1088, 43158, 287, 616, 1693, 532, 9835, 1043, 777, 4283, 654, 11686, 510, 866, 416, 616, 42415, 0, 1892, 4599, 3228, 28186, 25, 467, 351, 262, 3210, 19794, 49774, 11, 1160, 12, 1270, 11, 4283, 1303, 1157, 3510, 1828, 13, 30932, 1104, 11, 14768, 510, 290, 3607, 502, 644, 314, 761, 13, 5747, 5166, 286, 777, 635, 26197, 355, 314, 11615, 284, 2834, 606, 510, 477, 262, 640, 13, 4599, 374, 1638, 590, 14, 14774, 4896, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 

### verifying padding

In [127]:
test_encoded.select(pad_text_encoding("text").alias("text")).select(max(size("text"))).show()



+---------------+
|max(size(text))|
+---------------+
|            624|
+---------------+



                                                                                

In [128]:
train_encoded.select(pad_text_encoding("text").alias("text")).select(max(size("text"))).show()



+---------------+
|max(size(text))|
+---------------+
|            624|
+---------------+



                                                                                

### saving data

In [131]:
train_encoded_padded = train_encoded.select("rating", pad_text_encoding("text").alias("text"))
train_encoded_padded.coalesce(1).write.parquet("train2.parquet")

                                                                                

In [132]:
test_encoded_padded = test_encoded.select("rating", pad_text_encoding("text").alias("text"))
test_encoded_padded.coalesce(1).write.parquet("test2.parquet")

                                                                                

In [134]:
test = spark.read.parquet("/kaggle/working/test2.parquet/part-00000-ebbbf710-3eb2-4499-9e87-91a28c4ab2ea-c000.snappy.parquet")
train = spark.read.parquet("/kaggle/working/train2.parquet/part-00000-1b3dd809-091f-45c7-a4d3-ec538ad27c2e-c000.snappy.parquet") 

In [140]:
test.limit(100).offset(1).show()



+---------------+--------------------+
|         rating|                text|
+---------------+--------------------+
|[0, 0, 0, 1, 0]|[1212, 318, 257, ...|
|[0, 1, 0, 0, 0]|[40, 5839, 530, 2...|
|[0, 1, 0, 0, 0]|[40, 373, 6568, 2...|
|[0, 1, 0, 0, 0]|[40, 716, 257, 12...|
|[0, 1, 0, 0, 0]|[40, 1842, 262, 3...|
|[0, 1, 0, 0, 0]|[40, 2314, 10743,...|
|[0, 0, 1, 0, 0]|[1212, 3807, 351,...|
|[0, 0, 0, 0, 1]|[40, 1043, 257, 4...|
|[0, 0, 1, 0, 0]|[464, 1492, 318, ...|
|[0, 0, 1, 0, 0]|[40, 3688, 257, 1...|
|[0, 0, 1, 0, 0]|[1026, 338, 257, ...|
|[0, 1, 0, 0, 0]|[464, 8689, 3918,...|
|[1, 0, 0, 0, 0]|[1212, 983, 4433,...|
|[0, 0, 0, 0, 1]|[40, 8155, 428, 3...|
|[0, 0, 0, 0, 1]|[3666, 4957, 5543...|
|[0, 1, 0, 0, 0]|[1212, 983, 318, ...|
|[1, 0, 0, 0, 0]|[45, 7156, 37045,...|
|[0, 0, 0, 1, 0]|[3666, 604, 1941,...|
|[0, 0, 1, 0, 0]|[40, 1752, 8155, ...|
|[1, 0, 0, 0, 0]|[1212, 40094, 318...|
+---------------+--------------------+
only showing top 20 rows



                                                                                

In [164]:
n_batches = 650_000 // 500_000

batches = test.randomSplit([
    (i+1)/n_batches for i in range(n_batches)
])