In [1]:
sc

In [2]:
from pyspark.sql import functions as sparkf
from pyspark.sql.types import *

In [3]:
loaded_df = spark.read.json('gs://usjqbewjtps/json/*.json')

In [4]:
#loaded_df.count()

In [5]:
#loaded_df.rdd.getNumPartitions()

In [6]:
raw_df = loaded_df

In [7]:
analysis_df = raw_df.select('id','createdDate','additionalRatings'\
                            ,sparkf.col('location.additionalNames.geo').alias('location_geo')\
                            ,sparkf.col('location.placeType').alias('location_placeType')\
                            ,sparkf.col('location.name').alias('location_name')\
              ,'rating','userProfile.userId'\
                            ,'userProfile.hometown.location.name')\
.withColumnRenamed('rating','rating_col')\
.withColumnRenamed('name','userHometown')\
.withColumn('extracted_additionalRatings',sparkf.explode('additionalRatings'))\
.select(
  'id','createdDate','additionalRatings','extracted_additionalRatings'\
    , sparkf.col("extracted_additionalRatings")["rating"].alias("rating")\
    , sparkf.col("extracted_additionalRatings")["ratingLabel"].alias("ratingLabel")\
    ,'location_geo','location_name','rating_col','userId','userHometown','location_placeType'
)\
.groupBy('id','createdDate','rating_col','location_placeType','location_name','location_geo'\
         ,'userId','userHometown').pivot('ratingLabel').sum('rating')\
.orderBy('id', ascending= False)

In [8]:
#analysis_df.count()

In [9]:
sparkf_ReplaceNull = sparkf.udf(lambda x: "UNKNOWN" if x == None else x)

In [10]:
final_df = analysis_df\
.withColumnRenamed('Business service (e.g., internet access)','Business_service')\
.withColumn('userHometown',sparkf_ReplaceNull('userHometown'))\
.withColumn('Cleanliness',sparkf.col('Cleanliness').cast(IntegerType()))\
.withColumn('Location',sparkf.col('Location').cast(IntegerType()))\
.withColumn('Rooms',sparkf.col('Rooms').cast(IntegerType()))\
.withColumn('Service',sparkf.col('Service').cast(IntegerType()))\
.withColumn('Sleep Quality',sparkf.col('Sleep Quality').cast(IntegerType()))\
.withColumn('Value',sparkf.col('Value').cast(IntegerType()))\
.withColumn('Business_service'\
            ,sparkf.col('Business_service').cast(IntegerType()))\
.withColumn('Check in / front desk',sparkf.col('Check in / front desk').cast(IntegerType()))\
.withColumn('createdDate',sparkf.to_timestamp('createdDate',format='yyyy-MM-dd'))


In [14]:
final_df.groupBy('Sleep Quality').count().orderBy('Sleep Quality').show(100)

+-------------+-----+
|Sleep Quality|count|
+-------------+-----+
|         null|33953|
|            1| 1742|
|            2| 1533|
|            3| 4840|
|            4|10490|
|            5|19441|
|            6|   54|
|            8|  100|
|           10|  138|
+-------------+-----+



In [12]:
final_df.groupBy('Service').count().show(100)

+-------+-----+
|Service|count|
+-------+-----+
|   null|  375|
|      1| 4065|
|      6|  101|
|      3| 8033|
|      5|39081|
|      4|17131|
|      8|  143|
|     10|  307|
|      2| 3055|
+-------+-----+



In [15]:
final_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- createdDate: timestamp (nullable = true)
 |-- rating_col: long (nullable = true)
 |-- location_placeType: string (nullable = true)
 |-- location_name: string (nullable = true)
 |-- location_geo: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- userHometown: string (nullable = true)
 |-- Business_service: integer (nullable = true)
 |-- Check in / front desk: integer (nullable = true)
 |-- Cleanliness: integer (nullable = true)
 |-- Location: integer (nullable = true)
 |-- Rooms: integer (nullable = true)
 |-- Service: integer (nullable = true)
 |-- Sleep Quality: integer (nullable = true)
 |-- Value: integer (nullable = true)



In [16]:
# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)


In [17]:
bucket

'dataproc-staging-us-central1-883550361886-6am8uglv'

In [18]:
project

'shining-haiku-318402'

In [25]:
# Output Parameters.
output_dataset = 'tourism_dataset'
output_table = 'tourism_table'

In [26]:
# Stage data formatted as newline-delimited JSON in Google Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
#partitions = range(word_counts.getNumPartitions())
output_files = output_directory + '/part-*'


output_files

final_df.write.option("header",True).mode('overwrite').format('csv').save(output_directory)


In [27]:
output_files

'gs://dataproc-staging-us-central1-883550361886-6am8uglv/hadoop/tmp/bigquery/pyspark_output/part-*'

In [28]:
#! gsutil ls gs://dataproc-staging-us-central1-883550361886-6am8uglv/hadoop/tmp/bigquery/pyspark_output/

In [29]:
import subprocess

In [30]:
# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
    'bq load --source_format=CSV  '
    '--replace '
    '--autodetect '
    '{dataset}.{table} {files} '.format(
        dataset=output_dataset, table=output_table, files=output_files
    ).split())

0