In [1]:

import pyspark
from pyspark.sql import SparkSession, types
from pyspark.conf import SparkConf
from pyspark.context import SparkContext 
from pyspark.sql import functions as F
from pyspark.sql.functions import to_date

In [None]:
../

In [None]:
credentials_location = '/home/alexey/.google/credentials/google_credentials.json'

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [None]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [2]:
# Initialize SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


In [4]:

import os
fact_file_path = os.path.expanduser('~/astro/include/data/All_Beauty.parquet')
meta_file_path = os.path.expanduser('~/astro/include/data/meta_All_Beauty.parquet')


'C:\\Users\\richg/astro/include/data/All_Beauty.parquet'

In [3]:
review_file_path = 'C:/Users/richg/coding/de/AmazonProductReviewAnalysisPipeline/astro/include/data/gz/review/All_Beauty.jsonl.gz'
meta_file_path = 'C:/Users/richg/coding/de/AmazonProductReviewAnalysisPipeline/astro/include/data/gz/meta/meta_All_Beauty.jsonl.gz'

review_schema = types.StructType([
    types.StructField("rating", types.DoubleType(), True),
    types.StructField("title", types.StringType(), True),
    types.StructField("text", types.StringType(), True),
    # types.StructField("images", types.ArrayType(types.StructType([
    #     types.StructField("attachment_type", types.StringType(), True),
    #     types.StructField("large_image_url", types.StringType(), True),
    #     types.StructField("medium_image_url", types.StringType(), True),
    #     types.StructField("small_image_url", types.StringType(), True)
    # ]), True), True),    
    types.StructField("asin", types.StringType(), True),
    types.StructField("parent_asin", types.StringType(), True),
    types.StructField("user_id", types.StringType(), True),
    types.StructField("timestamp", types.LongType(), True),
    types.StructField("verified_purchase", types.BooleanType(), True),
    types.StructField("helpful_vote", types.LongType(), True)
])
meta_schema = types.StructType([
    types.StructField("average_rating", types.DoubleType(), True),
    types.StructField("description", types.ArrayType(types.StringType(), True), True),
    types.StructField("features", types.ArrayType(types.StringType(), True), True),
    types.StructField("main_category", types.StringType(), True),
    types.StructField("parent_asin", types.StringType(), True),
    types.StructField("price", types.DoubleType(), True),
    types.StructField("rating_number", types.LongType(), True),
    types.StructField("store", types.StringType(), True),
    types.StructField("title", types.StringType(), True),
])


read parquet

In [4]:
df_meta = spark.read \
    .schema(meta_schema) \
    .option("lineSep", "\n")\
    .json(meta_file_path)

In [6]:
df_review = spark.read \
        .schema(review_schema) \
        .option("lineSep", "\n")\
        .json(review_file_path)

In [7]:

review_output_file_path = 'C:/Users/richg/coding/de/AmazonProductReviewAnalysisPipeline/astro/include/data/parquet/review'

df_review \
    .repartition(4) \
    .write.parquet(review_output_file_path, mode= 'overwrite')
    

In [10]:
# df_all_beauty = df_all_beauty.withColumn("date", F.to_date(F.col("timestamp")))
# df_all_beauty = df_all_beauty.withColumn("timestamp", (F.col("timestamp") / 1000).cast(types.TimestampType()))
# ts = df_all_beauty.selectExpr("make_timestamp(YEAR, MONTH, DAY, HOUR, MINUTE, SECOND) as MAKE_TIMESTAMP")

df_review = df_review\
    .withColumnRenamed("title", "review_title")\
    .withColumnRenamed("rating", "user_rating")



In [59]:
df_meta.show()

+--------------+--------------------+--------------------+-------------+-----------+-----+-------------+--------------------+--------------------+
|average_rating|         description|            features|main_category|parent_asin|price|rating_number|               store|               title|
+--------------+--------------------+--------------------+-------------+-----------+-----+-------------+--------------------+--------------------+
|           4.8|                  []|                  []|   All Beauty| B01CUPMQZE| null|           10|     Howard Products|Howard LC0008 Lea...|
|           4.5|                  []|                  []|   All Beauty| B076WQZGPM| null|            3|              Yes To|Yes to Tomatoes D...|
|           4.4|                  []|                  []|   All Beauty| B000B658RI| null|           26|Levine Health Pro...|Eye Patch Black A...|
|           3.1|                  []|                  []|   All Beauty| B088FKY3VD| null|          102|            Ch

In [94]:
df_avg_helpful = df_all_beauty.groupBy("asin").agg(F.avg("helpful_vote").alias("avg_helpful_vote"))
df_avg_helpful.show()

+----------+-------------------+
|      asin|   avg_helpful_vote|
+----------+-------------------+
|B08ZSR9HLF|0.39285714285714285|
|B094VG2GKR|                0.0|
|B07K1LGWYB| 0.2857142857142857|
|B0B8DB17VT|0.22727272727272727|
|B085M95TX2| 0.5818181818181818|
|B07YS9W97B| 0.4444444444444444|
|B00TK0VV68|                0.0|
|B00VUIVWA8|                0.0|
|B07MMWXPLJ| 0.7225433526011561|
|B07HCR8QDN|                0.0|
|B07M6LSZVF|              0.125|
|B07Y9KVKFK|                0.0|
|B07SPDDDZT|                1.5|
|B07K1DRCYR|  1.464071856287425|
|B000FEF1V4| 1.0928338762214984|
|B087GD4BJ1|               0.15|
|B08CZD385F|                0.0|
|B01G77ISJE| 0.6666666666666666|
|B0779PMNDY|                1.0|
|B072WRK6T3|0.29411764705882354|
+----------+-------------------+
only showing top 20 rows



In [24]:
from pyspark.sql.functions import broadcast

df_review_product = df_review\
    .join(broadcast(df_meta),df_meta.parent_asin == df_review.parent_asin)\
    .drop(df_meta.parent_asin)
    .dropDuplicates()


In [58]:
#count the number of original data rows
n1 = df_meta.count()
print("number of original data rows: ", n1)
#count the number of data rows after deleting duplicated data
n2 = df_meta.dropDuplicates().count()
print("number of data rows after deleting duplicated data: ", n2)
n3 = n1 - n2
print("number of duplicated data: ", n3)

number of original data rows:  112590
number of data rows after deleting duplicated data:  112590
number of duplicated data:  0


In [27]:
df_review_productNoMissingValue = df_review_product.dropDuplicates().dropna(
    how="any", subset=["user_rating", "text"])
numberOfMissingValueAny = n1 - df_review_product.count()
print("number of missing value rows: ", numberOfMissingValueAny)

number of missing value rows:  0
