DAT535 | Data-intensive systems and algorithm | Project G7

---

# Big-data ETL with Amazon Reviews using Apache Spark and Hadoop
* Bawfeh Kingsley Kometa
* Nourin Mohammad Haider Ali Biswas 

---

## Setting Up

In [1]:
for name in dir():
    if not(name.endswith('_') or name.startswith('_')):
        del globals()[name]

In [2]:
# Basic python libraries
import os
import re
import random
from time import time
from os.path import join, getsize
from os import listdir
# import numpy as np
from datetime import datetime

PROJECT_DIR = '/user/ubuntu/project'

YEAR = 2023

In [3]:
   
def EndSession(spark=None, sc=None):
    if spark is not None:
        spark.stop()
        spark._sc._gateway.shutdown()
        spark._sc._gateway.proc.stdin.close()
    if sc is not None:
        sc.stop()
        sc._gateway = None
        sc._jvm = None


In [4]:
# Customize available cluster resources

num_nodes = 3 # VM instances that hosts data

def spark_conf_init(
      num_cores_per_executor=1, 
      RAM_per_node = 8, # GB
      num_cores_per_node = 4, # cpu cores
      verbose=False
):
      logs = "{}\nAssigning {} core(s) per executor\n{}\n" \
            .format('='*33, num_cores_per_executor, '-'*33)
      num_executors_per_node = num_cores_per_node / num_cores_per_executor
      executor_memory = RAM_per_node / num_executors_per_node
      # To use 80% of available memory for spark executors tasks
      # leaving 20% for spark driver, yarn managers, and other cache files
      total_avail_memmory = 0.8 * (num_nodes *RAM_per_node )
      num_executors = total_avail_memmory / executor_memory
      num_executors_per_node =  num_executors / num_nodes

      # Calculate distribution of executors across nodes
      lst = [int(num_executors_per_node) for j in range(num_nodes)]
      left = int((num_executors_per_node % 1)*num_nodes)
      lst = [lst[j]+1  if j < left else lst[j] for j in range(num_nodes)]

      logs += "Executor memory: {} GB\nNumber of executors per node: {:g}\nDistribution: {}\n" \
            .format(executor_memory, num_executors_per_node, lst)
      if verbose:
            print(logs)
      return logs

# for k in range(1,num_cores_per_node+1):
#       spark_conf_init(k, True)

In [5]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

now = datetime.now()
ts_now = datetime.strftime(now, '%d-%m-%Y_%H_%M')

from pyspark.context import SparkContext

sc = SparkContext('local', 'test')

sc.setLogLevel("ERROR")

# logs = spark_conf_init(num_cores_per_executor=3)
# print(logs)

spark = (SparkSession.builder
        #  .master("yarn")  # default
         .config("spark.driver.memory", "2g")
        #  .config("spark.executor.cores", "1")
        #  .config("spark.executor.memory", "2g")
        #  .config("spark.executor.instances", '3')
         .config("spark.history.fs.cleaner.enabled", 'true')
         .config("spark.history.fs.cleaner.maxAge", '1d')
        #  .config("spark.sql.execution.arrow.pyspark.enabled", "true")
        #  .config("spark.sql.shuffle.partitions", "100")
         .config("spark.dynamicAllocation.enabled", "true")
         .config("spark.dynamicAllocation.shuffleTracking.enabled", "true")
         .config("spark.dynamicAllocation.minExecutors", "1")
         .config("spark.dynamicAllocation.maxExecutors", "12")
         .appName(f"AmazonReviews-{ts_now}")
         .getOrCreate())

def copyInfo(info=''):
    with open(f"amazonReviews/logs-data-{ts_now}.txt", 'a') as fp:
        fp.write(info+'\n')

copyInfo("Date: " + datetime.strftime(now, '%d/%m/%Y %H:%M'))
# copyInfo(logs)

In [6]:
def get_spark_conf():
     # Number of executors
     num_executors = spark.conf.get("spark.executor.instances", "NA")
     executor_cores = spark.conf.get("spark.executor.cores", "NA")
     executor_memory = spark.conf.get("spark.executor.memory", "NA")

     # Driver configuration
     driver_memory = spark.conf.get("spark.driver.memory", "NA")
     spark_master = spark.conf.get("spark.master", "NA")

     logs = f"Executors: {num_executors}\n" \
          + f"Executor Cores: {executor_cores}\n" \
          + f"Executor Memory: {executor_memory}\n" \
          + f"Driver Memory: {driver_memory}\n" \
          + f"Master: {spark_master}\n"
     return logs, num_executors if num_executors=="NA" else int(num_executors)

logs, num_executors = get_spark_conf()
print(logs)
copyInfo(logs)

Executors: NA
Executor Cores: NA
Executor Memory: 512m
Driver Memory: 2g
Master: local



In [7]:
# Load list of sales categories
with open('amazonReviews/reviews.txt', 'r') as fp:
    urls = fp.readlines()

urls = (''.join(urls)).split('\n')
categories = [url.split('/')[-1] for url in urls if len(url.strip())]
print(f'Number of categories: {len(categories)}')

categories_not_copied = []
with open('amazonReviews/info.txt', 'r') as fp:
    while True:
        line = fp.readline()
        if not line:
            break
        if line.__contains__('jsonl.gz'):
            categories_not_copied.append(line.split()[1])

for file in categories_not_copied:
    categories.remove(file)

print(f'Number of categories copied: {len(categories)}')
categories[0]

Number of categories: 34
Number of categories copied: 29


'All_Beauty.jsonl.gz'

## Single category - ETL

In [8]:
# Read a reviews for a single sales category and verify the schema
file_path = join(PROJECT_DIR, categories[0])
df = spark.read.json(file_path)

# df.printSchema()
# df.show(4)

                                                                                

In [9]:
# Define schema for relevant attributes
from pyspark.sql.types import (StructType, StringType, DateType, StructField, IntegerType, FloatType)

schema = StructType([
    StructField(name='asin', dataType=StringType(), nullable=True), 
    StructField(name='date', dataType=DateType(), nullable=True), 
    StructField(name='rating', dataType=FloatType(), nullable=True), 
])


In [10]:
# Select relevant columns and clean up
from pyspark.sql import functions as F

df = (df 
    # convert timestamps to dates
    .withColumn('date', F.to_date(F.from_unixtime(df.timestamp / 1000), 'yyyy-MM-dd HH:mm:ss'))
    # drop columns (images, timestamp)
    .drop('images').drop('timestamp')
    # drop rows without any reviews
    .na.drop(how='all', subset=['text', 'title']))

# Extract asin (product id), date, and rating to for given YEAR
df = df.select(['asin', 'date', 'rating']).where(F.year(df.date)==YEAR).to(schema)
# Include product category
df = df.withColumn('prod_category', F.lit(categories[0].split('.')[0]))

df.show(4)


+----------+----------+------+-------------+
|      asin|      date|rating|prod_category|
+----------+----------+------+-------------+
|B0BFR5WF1R|2023-02-08|   1.0|   All_Beauty|
|B0BL3HSBZB|2023-01-22|   1.0|   All_Beauty|
|B0BSR6WK1Q|2023-03-11|   4.0|   All_Beauty|
|B07TT8JK51|2023-01-05|   4.0|   All_Beauty|
+----------+----------+------+-------------+
only showing top 4 rows



In [11]:
# Monthly reviews map
months = {
    1 : 'Jan',
    2 : 'Feb',
    3 : 'Mar',
    4 : 'Apr',
    5 : 'May',
    6 : 'Jun',
    7 : 'Jul',
    8 : 'Aug',
    9 : 'Sep',
    10 : 'Oct',
    11 : 'Nov',
    12 : 'Dec'
}

# Custom function to compute linear regression coefficient on monthly reviews
# ---> written in a way that supports batch operations 
def linRegCoef(df, Sy=sum(months), n=12):
    Sx = sum([df[col] for (_, col) in months.items()])
    Sxx = sum([F.pow(df[col], 2) for (_, col) in months.items()])
    Sxy = sum([df[col]*k for (k, col) in months.items()])
    coef = (n*Sxy - Sx*Sy) \
        / (n*Sxx - F.pow(Sx, 2))
    return F.round(coef, 2)

def total_reviews(df):
    total_reviews = sum([df[col] for (_, col) in months.items()])
    return F.round(total_reviews, 1)


In [12]:
# Map each month using one-hot encoding
for k, col in months.items():
    df = df.withColumn(col, (F.month(df.date) == k).cast(IntegerType()))

df.show(3)

+----------+----------+------+-------------+---+---+---+---+---+---+---+---+---+---+---+---+
|      asin|      date|rating|prod_category|Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec|
+----------+----------+------+-------------+---+---+---+---+---+---+---+---+---+---+---+---+
|B0BFR5WF1R|2023-02-08|   1.0|   All_Beauty|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|B0BL3HSBZB|2023-01-22|   1.0|   All_Beauty|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|B0BSR6WK1Q|2023-03-11|   4.0|   All_Beauty|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|
+----------+----------+------+-------------+---+---+---+---+---+---+---+---+---+---+---+---+
only showing top 3 rows



In [13]:
# Duplicate date attribute
df = df.withColumn('date_', df.date)

# Define map for each attribute to aggregate
agg_func = dict((month, 'sum') for month in months.values())
agg_func['rating'] = 'mean'
agg_func['date'] = 'min'
agg_func['date_'] = 'max'
agg_func['prod_category'] = 'max'

# Aggregate data using groupBy and agg_func
df = df.groupBy('asin').agg(agg_func)

# Rename columns
for col in df.columns[1:]:
    # extract string enclosed in brackets
    newcol = col[col.find('(')+1:col.find(')')]
    # rename column
    df = df.withColumnRenamed(col, newcol)

df = (df.withColumnRenamed('rating', 'avg_rating')
        .withColumnRenamed('date', 'first_review_date')
        .withColumnRenamed('date_', 'last_review_date'))

# Sort columns
df = df.select(
    ['asin', 'avg_rating', 'prod_category', 'first_review_date', 
     'last_review_date']+list(months.values())
)

# df.show(3)
# Compute 
# - total reviews (Jan + Feb + ... + Dec) 
# - linear regression coefficient (slope) on montly reviews
df = (df.withColumn('linRegCoeff', linRegCoef(df))
        .withColumn('total_reviews', total_reviews(df)))

df = df.withColumn('avg_rating', F.round(df.avg_rating, 1))
df = df.filter(df.total_reviews > 12).orderBy('total_reviews', ascending=False)

df.show(5)



[Stage 3:>                                                          (0 + 1) / 1]

+----------+----------+-------------+-----------------+----------------+---+---+---+---+---+---+---+---+---+---+---+---+-----------+-------------+
|      asin|avg_rating|prod_category|first_review_date|last_review_date|Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec|linRegCoeff|total_reviews|
+----------+----------+-------------+-----------------+----------------+---+---+---+---+---+---+---+---+---+---+---+---+-----------+-------------+
|B09XBSDCXP|       3.4|   All_Beauty|       2023-01-02|      2023-06-14| 83| 36| 44| 13|  9|  1|  0|  0|  0|  0|  0|  0|      -0.11|          186|
|B0BFWBKRSG|       4.4|   All_Beauty|       2023-01-03|      2023-05-04| 26| 60| 30|  8|  1|  0|  0|  0|  0|  0|  0|  0|      -0.14|          125|
|B0BNWXRQ18|       4.0|   All_Beauty|       2023-01-04|      2023-07-18| 15| 33| 53| 12|  4|  0|  1|  0|  0|  0|  0|  0|      -0.14|          118|
|B0BF9TRWV1|       4.6|   All_Beauty|       2023-01-16|      2023-08-29| 13| 11| 43| 10|  9|  2|  7|  1|  0|  0|  0|  

                                                                                

## Multiple category - ETL

In [14]:
# Extraction step for a given sales category
def extraction(category, year=None, df=None):
    # Read file with json library
    if isinstance(category, list):
        file_path = [join(PROJECT_DIR, file) for file in category]
    else:
        file_path = join(PROJECT_DIR, category)
    
    if df is None:
        df = spark.read.json(file_path)
        # Select relevant columns and clean up
        df = (df 
            # convert timestamps to dates
            .withColumn('date', F.to_date(F.from_unixtime(df.timestamp / 1000), 'yyyy-MM-dd HH:mm:ss'))
            # drop columns (images, timestamp)
            .drop('images').drop('timestamp')
            # drop rows without any reviews
            .na.drop(how='all', subset=['text', 'title']))
    # Extract asin (product id), date, and rating to for given YEAR
    if year is None:
        df = df.select(['asin', 'date', 'rating']).to(schema)
    else:
        df = df.select(['asin', 'date', 'rating']).where(F.year(df.date)==year).to(schema)
    # Include product category
    if isinstance(category, str):
        df = df.withColumn('prod_category', F.lit(category.split('.')[0]))

    return df

In [15]:
# Transformation step
def transformation(df):
    # Map each month using one-hot encoding
    for k, col in months.items():
        df = df.withColumn(col, (F.month(df.date) == k).cast(IntegerType()))

    # Duplicate date attribute
    df = df.withColumn('date_', df.date)

    # Aggregate data using groupBy and agg_func
    map = dict([(key, val) for (key, val) in agg_func.items() if key in df.columns])
    df = df.groupBy(df['asin']).agg(map)

    # Rename columns
    for col in df.columns[1:]:
        newcol = col[col.find('(')+1:col.find(')')]
        df = df.withColumnRenamed(col, newcol)
        
    df = (df.withColumnRenamed('rating', 'avg_rating')
            .withColumnRenamed('date', 'first_review_date')
            .withColumnRenamed('date_', 'last_review_date'))

    # Sort columns
    short_listed_columns = [
        col for col in ['asin', 'avg_rating', 'prod_category', 
                        'first_review_date', 'last_review_date'] 
            if col in df.columns
    ]
    df = df.select(short_listed_columns + list(months.values()))
    
    df = df.withColumn('avg_rating', F.round(df.avg_rating, 1))
    
    # Compute 
    # - total monthly reviews 
    # - linear regression coefficient (slope) on monthly reviews
    df = (df.withColumn('linReg', linRegCoef(df))
            .withColumn('total_reviews', total_reviews(df)))
    # Keep rows with at least 12 total reviews, and sort in descending order
    df = (df.filter(df.total_reviews >= 12)
          .orderBy('total_reviews', ascending=False))
    return df

### Extract

In [16]:
if False:
    from pyspark.storagelevel import StorageLevel

    clock = time()

    df = extraction(categories[0], year=YEAR)

    for category in categories[1:]:
        df = df.union( extraction(category, year=YEAR) )

    df.show()
        
    duration = time() - clock

    logs = f"Total execution time: ___{duration/60:.2f}__ minutes\n{'--'*33}\n"
    print(logs)
    copyInfo("Extraction: "+logs)
    
# df.cache()
# numPartitions = df.rdd.getNumPartitions()
# logs = f"Number of partitions after extraction: {numPartitions}\n" \
#         + f"Total number of rows: {df.count()}"
# print(logs)
# copyInfo(logs)


### Transform

In [17]:
# # Decrease/increase the number of partitions in extracted df, 
# # in proportion to available number of executors
# # df = df.repartition(num_cores_per_node*num_nodes*10) 

# df = df.coalesce(num_executors*num_nodes*2) 
# df.rdd.getNumPartitions()

if False:
    clock = time()

    df = transformation(df)

    df.show()
    duration  = time()-clock

    logs = f"Total execution time: ___{duration/60:.2f}___ minutes\n" #\
            # + f"Total number of rows: {df.count()}"
    print(logs)
    copyInfo("Transformation: "+logs)
    
# numPartitions = df.rdd.getNumPartitions()

# logs = f"Number of partions after transformation: {numPartitions}"
# print(logs)
# copyInfo(logs)


### Load

In [18]:
# Save data 
if False:
    clock = time()

    # if numPartitions < num_nodes:
    #     # To enforce data balance when writing data 
    #     df.repartition(num_nodes).write.csv(f"amazonReviews{YEAR}.csv", mode="overwrite")
    # else:
    #     df.write.csv(f"amazonReviews{YEAR}.csv", mode="overwrite")

    df.write.csv(f"amazonReviews{YEAR}.csv", mode="overwrite")

    logs = f"Total execution time: ___{(time()-clock)/60:.2f}__ minutes"
    print(logs)
    copyInfo("Load: "+logs)

In [19]:
# # Read data
# clock = time()

# df2 = spark.read.csv(f"amazonReviews{YEAR}.csv", sep = ',', header = True, schema=df.schema)
# df2.show()

# logs = f"Total execution time: ___{(time()-clock)/60:.2f}__ minutes"
# print(logs)
# copyInfo("ReLoad: "+logs)

In [20]:
# clock = time()
# numPartitions = df2.rdd.getNumPartitions()
# print(f'Time to compute numPartitions: ___{(time()-clock)/60:.2f}___ minutes')

# logs = f"Number of partitions in re-loaded data: {numPartitions}"
# print(logs)
# copyInfo(logs)

### Extract - Transform - Load

In [21]:
if True:
    clock = time()

    df = extraction(categories[0], year=YEAR)
    for category in categories[1:]:
        df = df.union( extraction(category, year=YEAR) )


    df = transformation(df)

    df.write.csv(f"amazonReviews{YEAR}.csv", mode="overwrite")
        
    duration = time() - clock

    logs = f"Total execution time: ___{duration/60:.2f}__ minutes\n{'--'*33}\n"
    print(logs)
    copyInfo("Extraction+Transform+Load: "+logs)

if True:
    clock = time()
    df.show()
    logs = f"Total execution time: ___{(time()-clock)/60:.2f}__ minutes"
    print(logs)

                                                                                

Total execution time: ___58.18__ minutes
------------------------------------------------------------------





+----------+----------+--------------------+-----------------+----------------+----+---+---+---+---+---+---+---+---+---+---+---+------+-------------+
|      asin|avg_rating|       prod_category|first_review_date|last_review_date| Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec|linReg|total_reviews|
+----------+----------+--------------------+-----------------+----------------+----+---+---+---+---+---+---+---+---+---+---+---+------+-------------+
|B0BMBC1FJX|       4.8|        Pet_Supplies|       2023-01-01|      2023-09-10| 402|365|782|937|667|454|356|188|  4|  0|  0|  0| -0.01|         4155|
|B00T0C9XRK|       3.5|Beauty_and_Person...|       2023-01-01|      2023-09-07| 663|730|723|370|205|104|117| 62|  2|  0|  0|  0| -0.01|         2976|
|B09C6LW4XW|       4.3|        Kindle_Store|       2023-01-01|      2023-09-06| 951|651|563|272|112| 87| 75| 16|  2|  0|  0|  0| -0.01|         2729|
|B08DVFZTTG|       4.5|Health_and_Household|       2023-01-01|      2023-09-11| 801|655|516|251|144|

                                                                                

In [22]:
# Evaluate how ETL scales with data volumes, 
# by adding one category at a time, year fixed at 2023

if False: 
    # Collect sizes
    category_sizes = []
    for category in categories:
        size = !hdfs dfs -du -h /user/ubuntu/project/{category}
        size1, unit1 = size[0].split()[0], size[0].split()[1]
        # convert into GB
        GB = float(size1) / 1e3 if unit1 == 'M' else float(size1)
        category_sizes.append(GB)

    # iterate chunks of categories to increase data by about 5G each time
    durations = []
    checkpoints = [6, 9, 15, 17, 21, 24, 27, 29]
    for k in checkpoints:
        clock = time()
        
        df = extraction(categories[:k], year=YEAR)
        df = transformation(df)
        df.write.csv(f"amazonReviews{YEAR}.csv", mode="overwrite")
        
        durations.append((time() - clock) / 60)
        logs = f"Exec. time for first {k} categories: {durations[-1]}"
        print(logs)
        copyInfo(logs)

    copyInfo("# Evaluate how ETL scales with data volumes, \n" + \
    f"# by adding one category at a time, year fixed at {YEAR}")

    logs = f"Execution times: {durations}\n" \
            + f"Data sizes: {[sum(category_sizes[:k]) for k in checkpoints]}"
    print(logs)
    copyInfo(logs)

## Multiple category & years

In [23]:
# Measure data volume in terms of record count for each year
if False:  # change to True to run (once)
    years = range(2017, 2024)

    data_sizes = []

    for k in range(len(years)-1,-1,-1):
        print(f" Years == {years[k]}")
        dfk = df.select(['asin', 'date', 'rating']) \
                .where(F.year(df.date)==years[k]).cache()
        data_sizes.append(dfk.count())
        dfk.unpersist()
        
    logs = f"Data sizes: {data_sizes}"
    print(logs)
    copyInfo(logs)
    
# Estimate extraction-transformation cost as function of data volumes
if False:  # change to True to run (once)
    durations = []
    years = range(2017, 2024)

    for k in range(len(years)-1,-1,-1):
        print(f" Years >= {years[k]}")
        clock = time()

        df = extraction(categories)
        dfk = df.select(['asin', 'date', 'rating']) \
                .where(F.year(df.date)>=years[k])
        transformation(dfk).show(1)

        durations.append((time() - clock) / 60)
        logs = f"Exec. year {years[k]}: {durations[-1]}"
        print(logs)
        copyInfo(logs)

    logs = f"Execution times: {durations}\n" \
    + f"Years: {list(years)}"
    print(logs)
    copyInfo(logs)

### Measure seasonality using correlations between two years

In [24]:
# Transformation cost involved in finding correlations between two adjacent years
def rowwise_corr(df, year1, year2):
    suffix1 = str(year1)
    suffix2 = str(year2)
    innerprod = sum([df[col+suffix1]*df[col+suffix2] for (_, col) in months.items()])
    L1 = sum([df[col+suffix1]*df[col+suffix1] for (_, col) in months.items()])
    L2 = sum([df[col+suffix2]*df[col+suffix2] for (_, col) in months.items()])
    corr = innerprod / F.sqrt(L1*L2)
    return F.round(corr, 2)

def extraction_transformation(year=YEAR, Df=None):
    df = extraction(categories, year, Df)
    df = transformation(df)

    for col in df.columns[1:]:
        df = df.withColumnRenamed(col, col+str(year))

    return df

In [25]:
if False:
    clock = time()

    year1 = 2022
    year2 = 2023

    df = extraction(categories)

    df1 = extraction_transformation(year1, df)
    df2 = extraction_transformation(year2, df)

    if df1.rdd.getNumPartitions() != df2.rdd.getNumPartitions():
        print("Enforce co-partioning before applying join operator!")
        df1 = df1.coalesce(num_nodes*int(num_executors))
        df2 = df2.coalesce(num_nodes*int(num_executors))
        
    df = df1.join(df2, on='asin', how='inner')

    df = df.withColumn(f'corr({year1}, {year2})', rowwise_corr(df, year1, year2))

    df.show()

    logs = f"Total execution time: ___{(time()-clock)/60:.2f}__ minutes"
    print(logs)
    copyInfo("Extraction+Transformation ({year1}, {year2}) | "+logs)

In [26]:
EndSession(spark=spark)