In [23]:
from pyspark.sql.functions import col, size, split, when, concat_ws, substring
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession
import pandas as pd
import time


In [15]:
json_path = 'C:/Files/College/GP/UCSDBooks/Downloads/goodreads_reviews_dedup.json'
csv_path = '../preprocessed_original/reviewsFull.csv'
bookidsPath = '../preprocessed_original/book_ids.csv'
authorsPath = 'C:/Files/College/GP/UCSDBooks/Books/goodreads_book_authors.json'

columns = ['book_id', 'user_id', 'rating']
json_data = []

In [16]:
spark = SparkSession.builder.appName("Read JSON").getOrCreate()
spark

In [17]:
dfAuthors = spark.read.json(authorsPath)
dfAuthors.show(5)

+---------+--------------+----------------+-------------+------------------+
|author_id|average_rating|            name|ratings_count|text_reviews_count|
+---------+--------------+----------------+-------------+------------------+
|   604031|          3.98|Ronald J. Fields|           49|                 7|
|   626222|          4.08|   Anita Diamant|       546796|             28716|
|    10333|          3.92|  Barbara Hambly|       122118|              5075|
|     9212|          3.68| Jennifer Weiner|       888522|             36262|
|   149918|          3.82|   Nigel Pennick|         1740|                96|
+---------+--------------+----------------+-------------+------------------+
only showing top 5 rows



In [33]:
# rename name column to full_name
dfAuthors = dfAuthors.withColumnRenamed("name", "full_name")

# keep only author_id and full_name columns
dfAuthors = dfAuthors.select("author_id", "full_name")

# Split the name column into separate columns for first, middle (if applicable), and last names
split_name = split(col('full_name'), ' ')
# show the split_name column
dfAuthors = dfAuthors.withColumn('first_name', split_name.getItem(0))\
    .withColumn('middle_name', split_name.getItem(1))\
    .withColumn('last_name', split_name.getItem(2))

# if last name is null, then switch the data in the last_name and middle_name columns 
dfAuthors = dfAuthors.withColumn('last_name', when(col('last_name').isNull(), col('middle_name')).otherwise(col('last_name')))
# middle name is null when its is the same as the last name
dfAuthors = dfAuthors.withColumn('middle_name', when(col('middle_name') == col('last_name'), None).otherwise(col('middle_name')))
dfAuthors.show(5)


+---------+----------------+----------+-----------+---------+
|author_id|       full_name|first_name|middle_name|last_name|
+---------+----------------+----------+-----------+---------+
|   604031|Ronald J. Fields|    Ronald|         J.|   Fields|
|   626222|   Anita Diamant|     Anita|       null|  Diamant|
|    10333|  Barbara Hambly|   Barbara|       null|   Hambly|
|     9212| Jennifer Weiner|  Jennifer|       null|   Weiner|
|   149918|   Nigel Pennick|     Nigel|       null|  Pennick|
+---------+----------------+----------+-----------+---------+
only showing top 5 rows



In [34]:
dfAuthors.show(5)

+---------+----------------+----------+-----------+---------+
|author_id|       full_name|first_name|middle_name|last_name|
+---------+----------------+----------+-----------+---------+
|   604031|Ronald J. Fields|    Ronald|         J.|   Fields|
|   626222|   Anita Diamant|     Anita|       null|  Diamant|
|    10333|  Barbara Hambly|   Barbara|       null|   Hambly|
|     9212| Jennifer Weiner|  Jennifer|       null|   Weiner|
|   149918|   Nigel Pennick|     Nigel|       null|  Pennick|
+---------+----------------+----------+-----------+---------+
only showing top 5 rows



In [None]:
# firstName: {
#         type: String,
#         required: true,
#         trim: true,
#         minlength: 1,
#         maxlength: 50
#     },
#     middleName: {
#         type: String,
#         required: false,
#         trim: true,
#         minlength: 1,
#         maxlength: 50
#     },
#     lastName: {
#         type: String,
#         required: false,
#         trim: true,
#         minlength: 1,
#         maxlength: 50
#     },
#     fullName: {
#         type: String,
#         required: true,
#         trim: true,
#         minlength: 1,
#         maxlength: 150
#     },
#     authorId:{
#         type: String,
#         required: true,
#         trim: true,
#         minlength: 1,
#         maxlength: 150
#     },

#rename to fit schema
dfAuthors = dfAuthors.withColumnRenamed("author_id", "authorId")
dfAuthors = dfAuthors.withColumnRenamed("first_name", "firstName")
dfAuthors = dfAuthors.withColumnRenamed("middle_name", "middleName")
dfAuthors = dfAuthors.withColumnRenamed("last_name", "lastName")
dfAuthors = dfAuthors.withColumnRenamed("full_name", "fullName")


In [None]:
# save as 1 json file
dfAuthors.coalesce(1).write.format('json').save('authorsUp.json', mode='overwrite')