In [154]:
import os
import polars as pl

# Load Data

Loop over the review path to count the amount of files  existting

In [155]:
path_metadata = "../data/raw/metadata4_processed.parquet" 
reviews_path = '../data/raw/reviews'
processed_path = "../data/processed/processed.parquet"

with os.scandir(reviews_path) as reviewFiles:
    reviewFiles = [file.name for file in reviewFiles if file.is_file()]
print(reviewFiles)

['reviews40.parquet', 'reviews41.parquet', 'reviews42.parquet', 'reviews43.parquet', 'reviews44.parquet', 'reviews45.parquet', 'reviews46.parquet', 'reviews47.parquet', 'reviews48.parquet', 'reviews49.parquet']


Load the Metadata file and the review files

In [156]:
df_metadata = pl.read_parquet(path_metadata, columns=["asin"])
df_reviews = pl.DataFrame()
for f in reviewFiles:
    n = str.format("../data/raw/reviews/{}", f)
    print("reading " + n)
    df_aux = pl.read_parquet(n,columns=['asin','reviewerID', 'overall', 'unixReviewTime'])
    print("Adding..." + n)
    df_reviews = df_reviews.vstack(df_aux)
    n = ""

reading ../data/raw/reviews/reviews40.parquet
Adding...../data/raw/reviews/reviews40.parquet
reading ../data/raw/reviews/reviews41.parquet
Adding...../data/raw/reviews/reviews41.parquet
reading ../data/raw/reviews/reviews42.parquet
Adding...../data/raw/reviews/reviews42.parquet
reading ../data/raw/reviews/reviews43.parquet
Adding...../data/raw/reviews/reviews43.parquet
reading ../data/raw/reviews/reviews44.parquet
Adding...../data/raw/reviews/reviews44.parquet
reading ../data/raw/reviews/reviews45.parquet
Adding...../data/raw/reviews/reviews45.parquet
reading ../data/raw/reviews/reviews46.parquet
Adding...../data/raw/reviews/reviews46.parquet
reading ../data/raw/reviews/reviews47.parquet
Adding...../data/raw/reviews/reviews47.parquet
reading ../data/raw/reviews/reviews48.parquet
Adding...../data/raw/reviews/reviews48.parquet
reading ../data/raw/reviews/reviews49.parquet
Adding...../data/raw/reviews/reviews49.parquet


In [157]:
print(df_metadata.shape)
print(df_reviews.shape)

(1000000, 1)
(26735974, 4)


# Join the 2 dataset created

In [158]:
join_keys = ["asin"]
df_joined = df_metadata.join(df_reviews, join_keys)

In [159]:
df_joined.head()

asin,reviewerID,overall,unixReviewTime
str,str,str,str
"""B00JC39W1Y""","""A88X07GEKZBJ5""","""5.0""","""1409184000"""
"""B00JC39W1Y""","""A16R5JWQN9BDQ4…","""3.0""","""1402531200"""
"""B00JC39W1Y""","""A1FHH3W45TY7XN…","""4.0""","""1401753600"""
"""B00JC39W1Y""","""AROFTQI5I04DB""","""5.0""","""1401235200"""
"""B00JC39W1Y""","""A3FR27C0VGOHQX…","""2.0""","""1400371200"""


In [160]:
df_joined.shape

(1465270, 4)

# Some preprecessing

Remove duplicated rows if exist

In [161]:
df_joined = df_joined.unique()

Check for null values if exist

In [162]:
null_values = df_joined.null_count()

In [163]:
null_values

asin,reviewerID,overall,unixReviewTime
u32,u32,u32,u32
0,0,0,0


Cast the overall column datatype

In [164]:
df_joined = df_joined.with_columns(( pl.col("overall").str.extract(r"[+-]?([1-9]*[.])?[1-9]+", 0)).alias("rating")).drop("overall")
df_joined.with_columns(pl.col("rating").cast(pl.Int16))

asin,reviewerID,unixReviewTime,rating
str,str,str,i16
"""B00JC39W1Y""","""A1FHH3W45TY7XN…","""1401753600""",4
"""B00JC39W1Y""","""AROFTQI5I04DB""","""1401235200""",5
"""B00JC39W1Y""","""A2F2PEGBKO3OLY…","""1392940800""",2
"""B00JC39W1Y""","""A3MIPPR6CPQF48…","""1392508800""",5
"""B00JC39W1Y""","""A7C1PPBQQV4JU""","""1532908800""",2
"""B00JC39W1Y""","""ALHZXPXMADVKD""","""1532649600""",5
"""B00JC39W1Y""","""ABWZ0X880U8P6""","""1532217600""",4
"""B00JC39W1Y""","""A1IYT42FUYV0PP…","""1532217600""",3
"""B00JC39W1Y""","""A1N2YT63RMPFIW…","""1532131200""",5
"""B00JC39W1Y""","""A1ZGJLLW43ACI4…","""1531958400""",5


In [165]:
df_joined.shape

(1464459, 4)

# Save files

In [167]:
df_joined.write_parquet(processed_path)