In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType, ArrayType, LongType
import pyspark.sql.functions as f
import re
import requests
from bs4 import BeautifulSoup

In [2]:
spark = SparkSession \
    .builder \
    .appName("Load Review Files") \
    .config("spark.jars", "/opt/spark/jars/spark-bigquery-with-dependencies_2.12-0.20.0.jar") \
    .getOrCreate()

In [3]:
# Update to your GCS bucket
gcs_bucket = f'amazon_reviews_bucket'
gcs_filepath = f'gs://amazon_reviews_bucket'

In [4]:
# Define schema of files to parse
schema = StructType([ 
    StructField("asin",StringType(),True), 
    StructField("image",ArrayType(StringType()),True), 
    StructField("overall",DoubleType(),True),
    StructField("reviewText",StringType(),True),
    StructField("reviewTime",StringType(),True),
    StructField("reviewerID",StringType(),True),
    StructField("reviewerName",StringType(),True),
    StructField("summary",StringType(),True),
    StructField("unixReviewTime",LongType(),True),
    StructField("verified",BooleanType(),True),
    StructField("vote",StringType(),True)
  ])

In [13]:
files

['Books_5.json.gz',
 'CDs_and_Vinyl_5.json.gz',
 'Cell_Phones_and_Accessories_5.json.gz',
 'Clothing_Shoes_and_Jewelry_5.json.gz',
 'Digital_Music_5.json.gz',
 'Electronics_5.json.gz',
 'Gift_Cards_5.json.gz',
 'Grocery_and_Gourmet_Food_5.json.gz',
 'Home_and_Kitchen_5.json.gz',
 'Industrial_and_Scientific_5.json.gz',
 'Kindle_Store_5.json.gz',
 'Luxury_Beauty_5.json.gz',
 'Magazine_Subscriptions_5.json.gz',
 'Movies_and_TV_5.json.gz',
 'Musical_Instruments_5.json.gz',
 'Office_Products_5.json.gz',
 'Patio_Lawn_and_Garden_5.json.gz',
 'Pet_Supplies_5.json.gz',
 'Prime_Pantry_5.json.gz',
 'Software_5.json.gz',
 'Sports_and_Outdoors_5.json.gz',
 'Tools_and_Home_Improvement_5.json.gz',
 'Toys_and_Games_5.json.gz',
 'Video_Games_5.json.gz']

In [5]:
# URL to scrape to get files to download
url = "https://nijianmo.github.io/amazon/index.html"
html = requests.get(url)

if html.ok:
    soup = BeautifulSoup(html.content, 'html.parser')  

output_final = []
files = []
links = soup.find_all('a',string='5-core')#.find('5-core')#.find_all('td', id='5-core')
for link in links:
    url = link.get('href')
    file = url.split('/')[-1]
    print(url)
    print(url.split('/')[-1])
    spark.sparkContext.addFile(url)
    files.append(file)

http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/AMAZON_FASHION_5.json.gz
AMAZON_FASHION_5.json.gz
http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/All_Beauty_5.json.gz
All_Beauty_5.json.gz
http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/Appliances_5.json.gz
Appliances_5.json.gz
http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/Arts_Crafts_and_Sewing_5.json.gz
Arts_Crafts_and_Sewing_5.json.gz
http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/Automotive_5.json.gz
Automotive_5.json.gz
http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/Books_5.json.gz
Books_5.json.gz
http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/CDs_and_Vinyl_5.json.gz
CDs_and_Vinyl_5.json.gz
http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/Cell_Phones_and_Accessories_5.json.gz
Cell_Phones_and_Accessories_5.json.gz
http://deepyeti.ucsd.edu/jianmo/amazon/categoryFilesSmall/Clothing_Shoes_and_Jewelry_5.json.gz
Clothing_Shoes_and_Jewelry_5.json.gz
http:/

In [9]:
del files[0]

In [10]:
files

['Books_5.json.gz',
 'CDs_and_Vinyl_5.json.gz',
 'Cell_Phones_and_Accessories_5.json.gz',
 'Clothing_Shoes_and_Jewelry_5.json.gz',
 'Digital_Music_5.json.gz',
 'Electronics_5.json.gz',
 'Gift_Cards_5.json.gz',
 'Grocery_and_Gourmet_Food_5.json.gz',
 'Home_and_Kitchen_5.json.gz',
 'Industrial_and_Scientific_5.json.gz',
 'Kindle_Store_5.json.gz',
 'Luxury_Beauty_5.json.gz',
 'Magazine_Subscriptions_5.json.gz',
 'Movies_and_TV_5.json.gz',
 'Musical_Instruments_5.json.gz',
 'Office_Products_5.json.gz',
 'Patio_Lawn_and_Garden_5.json.gz',
 'Pet_Supplies_5.json.gz',
 'Prime_Pantry_5.json.gz',
 'Software_5.json.gz',
 'Sports_and_Outdoors_5.json.gz',
 'Tools_and_Home_Improvement_5.json.gz',
 'Toys_and_Games_5.json.gz',
 'Video_Games_5.json.gz']

In [11]:
for file in files:    
    df = spark.read.json("file://"+SparkFiles.get(file),schema)
    df = df.dropDuplicates() 
    df = df.withColumn('review_wordCount', f.size(f.split(f.col('reviewText'), ' ')))
    df.registerTempTable("dataframe")
    sql_script = f"""select 
              '{file}' as category,
              asin || '-' || reviewerID || row_number() OVER (PARTITION BY asin, reviewerID ORDER BY unixReviewTime asc) as review_ID,
              asin as product_ID,
              reviewerID as reviewer_ID,
              overall as rating_out_of_5,
              summary as review_summary,
              reviewText as review_text,
              review_wordCount as review_word_count,     
              '{url}' as source_url
            from dataframe"""
    output = spark.sql(sql_script)
    output.write \
      .format("bigquery") \
      .option("temporaryGcsBucket",gcs_bucket) \
      .mode("append") \
      .save("amazon_reviews.categoryFilesSmall")
    output.unpersist()

Py4JJavaError: An error occurred while calling o82.save.
: java.lang.RuntimeException: Failed to write to BigQuery
	at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:94)
	at com.google.cloud.spark.bigquery.BigQueryInsertableRelation.insert(BigQueryInsertableRelation.scala:43)
	at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:112)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at com.google.cloud.spark.bigquery.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.scala:89)
	... 34 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 2.0 failed 1 times, most recent failure: Lost task 1.0 in stage 2.0 (TID 202) (84305616fa01 executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
	... 56 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	... 1 more
Caused by: java.lang.OutOfMemoryError: Java heap space
