In [1]:
#! gsutil cp gs://3-7oct2022/review/review.zip ./

In [2]:
#! unzip review.zip

In [3]:
#! rm -rf __MACOSX

In [4]:
#! hdfs dfs -put review /user/

In [5]:
sc

In [6]:
from pyspark.sql import functions as sparkf
from pyspark.sql.types import *

In [7]:
loaded_df = spark.read.json('/user/review/*.json')

In [8]:
raw_df = loaded_df

In [9]:
raw_df

DataFrame[__typename: string, absoluteUrl: string, additionalRatings: array<struct<rating:bigint,ratingLabel:string>>, alertStatus: boolean, attribution: string, connectionToSubject: string, createdDate: string, helpfulVotes: bigint, id: bigint, labels: array<string>, language: string, location: struct<__typename:string,additionalNames:struct<abbreviated:string,abbreviatedRaw:string,abbreviatedStateTerritory:string,abbreviatedStateTerritoryRaw:string,geo:string,long:string,longOnlyParent:string,longOnlyParentAbbreviated:string,longOnlyParentStateAbbreviated:string,longParentAbbreviated:string,longParentStateAbbreviated:string,normal:string>,locationId:bigint,name:string,parent:struct<additionalNames:struct<abbreviated:string,abbreviatedRaw:string,abbreviatedStateTerritory:string,abbreviatedStateTerritoryRaw:string,geo:string,long:string,longOnlyParent:string,longOnlyParentAbbreviated:string,longOnlyParentStateAbbreviated:string,longParentAbbreviated:string,longParentStateAbbreviated:st

In [10]:
analysis_df = raw_df.select('id','createdDate','additionalRatings'\
                            ,sparkf.col('location.additionalNames.geo').alias('location_geo')\
                            ,sparkf.col('location.placeType').alias('location_placeType')\
                            ,sparkf.col('location.name').alias('location_name')\
              ,'rating','userProfile.userId'\
                            ,'userProfile.hometown.location.name')\
.withColumnRenamed('rating','rating_col')\
.withColumnRenamed('name','userHometown')\
.withColumn('extracted_additionalRatings',sparkf.explode('additionalRatings'))\
.select(
  'id','createdDate','additionalRatings','extracted_additionalRatings'\
    , sparkf.col("extracted_additionalRatings")["rating"].alias("rating")\
    , sparkf.col("extracted_additionalRatings")["ratingLabel"].alias("ratingLabel")\
    ,'location_geo','location_name','rating_col','userId','userHometown','location_placeType'
)\
.groupBy('id','createdDate','rating_col','location_placeType','location_name','location_geo'\
         ,'userId','userHometown').pivot('ratingLabel').sum('rating')\
.orderBy('id', ascending= False)

In [11]:
analysis_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- createdDate: string (nullable = true)
 |-- rating_col: long (nullable = true)
 |-- location_placeType: string (nullable = true)
 |-- location_name: string (nullable = true)
 |-- location_geo: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- userHometown: string (nullable = true)
 |-- Business service (e.g., internet access): long (nullable = true)
 |-- Check in / front desk: long (nullable = true)
 |-- Cleanliness: long (nullable = true)
 |-- Location: long (nullable = true)
 |-- Rooms: long (nullable = true)
 |-- Service: long (nullable = true)
 |-- Sleep Quality: long (nullable = true)
 |-- Value: long (nullable = true)



In [12]:
sparkf_ReplaceNull = sparkf.udf(lambda x: "UNKNOWN" if x == None else x)

In [13]:
final_df = analysis_df\
.withColumnRenamed('Business service (e.g., internet access)','Business_service')\
.withColumn('userHometown',sparkf_ReplaceNull('userHometown'))\
.withColumn('Cleanliness',sparkf.col('Cleanliness').cast(IntegerType()))\
.withColumn('Location',sparkf.col('Location').cast(IntegerType()))\
.withColumn('Rooms',sparkf.col('Rooms').cast(IntegerType()))\
.withColumn('Service',sparkf.col('Service').cast(IntegerType()))\
.withColumn('Sleep Quality',sparkf.col('Sleep Quality').cast(IntegerType()))\
.withColumn('Value',sparkf.col('Value').cast(IntegerType()))\
.withColumn('Business_service'\
            ,sparkf.col('Business_service').cast(IntegerType()))\
.withColumn('Check in / front desk',sparkf.col('Check in / front desk').cast(IntegerType()))\
.withColumn('createdDate',sparkf.to_timestamp('createdDate',format='yyyy-MM-dd'))


In [14]:
final_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- createdDate: timestamp (nullable = true)
 |-- rating_col: long (nullable = true)
 |-- location_placeType: string (nullable = true)
 |-- location_name: string (nullable = true)
 |-- location_geo: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- userHometown: string (nullable = true)
 |-- Business_service: integer (nullable = true)
 |-- Check in / front desk: integer (nullable = true)
 |-- Cleanliness: integer (nullable = true)
 |-- Location: integer (nullable = true)
 |-- Rooms: integer (nullable = true)
 |-- Service: integer (nullable = true)
 |-- Sleep Quality: integer (nullable = true)
 |-- Value: integer (nullable = true)



In [26]:
final_df.summary().toPandas().transpose()

Unnamed: 0,0,1,2,3,4,5,6,7
summary,count,mean,stddev,min,25%,50%,75%,max
id,43183,4.0402640215809464E8,2.2605525133336526E8,3306595,194357470,365483294,619624130,814517704
rating_col,43183,4.069332839311766,1.1614037736179492,1,4,4,5,5
location_placeType,43183,,,ACCOMMODATION,,,,ACCOMMODATION
location_name,43183,,,12month Hostel,,,,๋Jao Sua Residence
location_geo,43183,,,Ban Kata,,,,Wichit
userId,42361,,,0006174239E9D91C0937746EC0FE4169,,,,FFFE91E09AAA87C7778FBC516C23E760
userHometown,43183,,,1 Deciembre,,,,Östersund
Business_service,243,3.3621399176954734,1.3759019389590954,1,3,3,4,8
Check in / front desk,323,3.7275541795665634,1.5643790741324202,1,3,4,5,10


In [33]:
final_df.groupBy('location_name').count().orderBy(sparkf.col('count').desc()).show(truncate=False)

+---------------------------------------+-----+
|location_name                          |count|
+---------------------------------------+-----+
|Kata Beach Center Hotel                |104  |
|The Marq                               |97   |
|Baan Suk-Kho Boutique Inn              |97   |
|Tenta Nakara                           |94   |
|Beshert Guesthouse                     |91   |
|Sea Pearl Beach Tropical Oasis         |88   |
|Baan Puri Serviced Apartments          |87   |
|DDC House                              |87   |
|Tanamas House                          |86   |
|Bhukitta Boutique Hotel                |86   |
|Phuket Heritage Hotel                  |86   |
|Holiday Inn Resort Phuket, an IHG hotel|86   |
|The Bird Cage                          |86   |
|Be My Guest Boutique Hotel             |84   |
|SoleLuna Hotel                         |81   |
|Rome Place Hotel                       |81   |
|Salathai Resort                        |81   |
|Baan Chayna Hotel                      

In [15]:
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)


In [16]:
bucket

'22july2022'

In [17]:
project

'bigdatainpractice1'

In [18]:
# Output Parameters.
output_dataset = 'tourism_dataset'
output_table = 'tourism_table'

In [19]:
# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
#partitions = range(word_counts.getNumPartitions())
output_files = output_directory + '/part-*'


output_files

final_df.write.option("header",True).mode('overwrite').format('csv').save(output_directory)


In [20]:
output_files

'gs://22july2022/hadoop/tmp/bigquery/pyspark_output/part-*'

In [21]:
#! gsutil ls gs://dataproc-staging-us-central1-883550361886-6am8uglv/hadoop/tmp/bigquery/pyspark_output/

In [22]:
import subprocess

In [23]:
# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
    'bq load --source_format=CSV  '
    '--replace '
    '--autodetect '
    '{dataset}.{table} {files} '.format(
        dataset=output_dataset, table=output_table, files=output_files
    ).split())

0

+---------------------------------------+-----+
|location_name                          |count|
+---------------------------------------+-----+
|Kata Beach Center Hotel                |104  |
|The Marq                               |97   |
|Baan Suk-Kho Boutique Inn              |97   |
|Tenta Nakara                           |94   |
|Beshert Guesthouse                     |91   |
|Sea Pearl Beach Tropical Oasis         |88   |
|Baan Puri Serviced Apartments          |87   |
|DDC House                              |87   |
|Phuket Heritage Hotel                  |86   |
|Holiday Inn Resort Phuket, an IHG hotel|86   |
|Tanamas House                          |86   |
|The Bird Cage                          |86   |
|Bhukitta Boutique Hotel                |86   |
|Be My Guest Boutique Hotel             |84   |
|SoleLuna Hotel                         |81   |
|Salathai Resort                        |81   |
|Rome Place Hotel                       |81   |
|WE Hotel at Sansabai                   