In [1]:
import time as t

In [2]:
start_time = t.time()

In [3]:
#! gsutil cp gs://23feb2023/tripadvisor/* ./

In [4]:
#! sudo apt-get install p7zip

In [5]:
#! p7zip -d hotel-20211115.7z

In [31]:
#! ls -l /hotel/tripadvisor/review/

In [6]:
#! gsutil -m cp -r /hotel/tripadvisor/review gs://23feb2023/tripadvisor/

In [8]:
sc

In [9]:
from pyspark.sql import functions as sparkf
from pyspark.sql.types import *

In [10]:
loaded_df = spark.read.json('gs://23feb2023/tripadvisor/review/*.json')

                                                                                

In [11]:
#loaded_df.count()

In [12]:
#loaded_df.rdd.getNumPartitions()

In [13]:
raw_df = loaded_df

In [14]:
analysis_df = raw_df.select('id','createdDate','additionalRatings'\
                            ,sparkf.col('location.additionalNames.geo').alias('location_geo')\
                            ,sparkf.col('location.placeType').alias('location_placeType')\
                            ,sparkf.col('location.name').alias('location_name')\
              ,'rating','userProfile.userId'\
                            ,'userProfile.hometown.location.name')\
.withColumnRenamed('rating','rating_col')\
.withColumnRenamed('name','userHometown')\
.withColumn('extracted_additionalRatings',sparkf.explode('additionalRatings'))\
.select(
  'id','createdDate','additionalRatings','extracted_additionalRatings'\
    , sparkf.col("extracted_additionalRatings")["rating"].alias("rating")\
    , sparkf.col("extracted_additionalRatings")["ratingLabel"].alias("ratingLabel")\
    ,'location_geo','location_name','rating_col','userId','userHometown','location_placeType'
)\
.groupBy('id','createdDate','rating_col','location_placeType','location_name','location_geo'\
         ,'userId','userHometown').pivot('ratingLabel').sum('rating')\
.orderBy('id', ascending= False)

                                                                                

In [15]:
#analysis_df.count()

In [16]:
sparkf_ReplaceNull = sparkf.udf(lambda x: "UNKNOWN" if x == None else x)

In [17]:
final_df = analysis_df\
.withColumnRenamed('Business service (e.g., internet access)','Business_service')\
.withColumn('userHometown',sparkf_ReplaceNull('userHometown'))\
.withColumn('Cleanliness',sparkf.col('Cleanliness').cast(IntegerType()))\
.withColumn('Location',sparkf.col('Location').cast(IntegerType()))\
.withColumn('Rooms',sparkf.col('Rooms').cast(IntegerType()))\
.withColumn('Service',sparkf.col('Service').cast(IntegerType()))\
.withColumn('Sleep Quality',sparkf.col('Sleep Quality').cast(IntegerType()))\
.withColumn('Value',sparkf.col('Value').cast(IntegerType()))\
.withColumn('Business_service'\
            ,sparkf.col('Business_service').cast(IntegerType()))\
.withColumn('Check in / front desk',sparkf.col('Check in / front desk').cast(IntegerType()))\
.withColumn('createdDate',sparkf.to_timestamp('createdDate',format='yyyy-MM-dd'))


In [18]:
final_df.groupBy('Sleep Quality').count().orderBy('Sleep Quality').show(100)



+-------------+-----+
|Sleep Quality|count|
+-------------+-----+
|         null|33953|
|            1| 1742|
|            2| 1533|
|            3| 4840|
|            4|10490|
|            5|19441|
|            6|   54|
|            8|  100|
|           10|  138|
+-------------+-----+



                                                                                

In [19]:
final_df.groupBy('Service').count().show(100)

23/02/26 13:43:03 WARN org.apache.spark.deploy.yarn.YarnAllocator: Container from a bad node: container_1677412593959_0005_01_000001 on host: cluster-dbca-m.us-central1-a.c.tidal-eon-374408.internal. Exit status: 137. Diagnostics: [2023-02-26 13:43:03.445]Container killed on request. Exit code is 137
[2023-02-26 13:43:03.445]Container exited with a non-zero exit code 137. 
[2023-02-26 13:43:03.446]Killed by external signal
.
23/02/26 13:43:03 WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 1 for reason Container from a bad node: container_1677412593959_0005_01_000001 on host: cluster-dbca-m.us-central1-a.c.tidal-eon-374408.internal. Exit status: 137. Diagnostics: [2023-02-26 13:43:03.445]Container killed on request. Exit code is 137
[2023-02-26 13:43:03.445]Container exited with a non-zero exit code 137. 
[2023-02-26 13:43:03.446]Killed by external signal
.
23/02/26 13:43:03 ERROR org.apache.spark.scheduler.cluste

+-------+-----+
|Service|count|
+-------+-----+
|   null|  375|
|      1| 4065|
|      6|  101|
|      3| 8033|
|      5|39081|
|      4|17131|
|      8|  143|
|     10|  307|
|      2| 3055|
+-------+-----+



                                                                                

In [20]:
final_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- createdDate: timestamp (nullable = true)
 |-- rating_col: long (nullable = true)
 |-- location_placeType: string (nullable = true)
 |-- location_name: string (nullable = true)
 |-- location_geo: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- userHometown: string (nullable = true)
 |-- Business_service: integer (nullable = true)
 |-- Check in / front desk: integer (nullable = true)
 |-- Cleanliness: integer (nullable = true)
 |-- Location: integer (nullable = true)
 |-- Rooms: integer (nullable = true)
 |-- Service: integer (nullable = true)
 |-- Sleep Quality: integer (nullable = true)
 |-- Value: integer (nullable = true)



In [21]:
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)


In [22]:
bucket

'23feb2023'

In [23]:
project

'tidal-eon-374408'

In [24]:
# Output Parameters.
output_dataset = 'tourism_dataset'
output_table = 'tourism_table'

In [25]:
# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
#partitions = range(word_counts.getNumPartitions())
output_files = output_directory + '/part-*'


output_files

final_df.write.option("header",True).mode('overwrite').format('csv').save(output_directory)


                                                                                

In [26]:
output_files

'gs://23feb2023/hadoop/tmp/bigquery/pyspark_output/part-*'

In [27]:
! gsutil ls -l gs://23feb2023/hadoop/tmp/bigquery/pyspark_output/part-*

   2665588  2023-02-26T13:48:18Z  gs://23feb2023/hadoop/tmp/bigquery/pyspark_output/part-00000-720f16c5-afc7-49b4-8971-ac58195a635c-c000.csv
   2520200  2023-02-26T13:48:18Z  gs://23feb2023/hadoop/tmp/bigquery/pyspark_output/part-00001-720f16c5-afc7-49b4-8971-ac58195a635c-c000.csv
   2529990  2023-02-26T13:48:18Z  gs://23feb2023/hadoop/tmp/bigquery/pyspark_output/part-00002-720f16c5-afc7-49b4-8971-ac58195a635c-c000.csv
   2566194  2023-02-26T13:48:18Z  gs://23feb2023/hadoop/tmp/bigquery/pyspark_output/part-00003-720f16c5-afc7-49b4-8971-ac58195a635c-c000.csv
     98674  2023-02-26T13:48:20Z  gs://23feb2023/hadoop/tmp/bigquery/pyspark_output/part-00004-720f16c5-afc7-49b4-8971-ac58195a635c-c000.csv
TOTAL: 5 objects, 10380646 bytes (9.9 MiB)


In [28]:
t.time()-start_time

1036.0935039520264

In [29]:
import subprocess

In [30]:
# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
    'bq load --source_format=CSV  '
    '--replace '
    '--autodetect '
    '{dataset}.{table} {files} '.format(
        dataset=output_dataset, table=output_table, files=output_files
    ).split())

Waiting on bqjob_rd3ffca51837e3dd_000001868dfb5c37_1 ... (2s) Current status: DONE   


0