In [1]:
from pyspark.sql import SparkSession


# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Create Database from JSON") \
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Read JSON file into DataFrame
df = spark.read.format("json") \
    .load(r"C:\Users\jose\Downloads\goodreads_reviews_fantasy_paranormal.json")

# Create a database
spark.sql("CREATE DATABASE IF NOT EXISTS goodreads_reviews")

# Use the database
spark.sql("USE goodreads_reviews")

# Save DataFrame as a table in the database
df.write.mode("overwrite").saveAsTable("reviews")

# Save DataFrame as a table in a database
df.write.mode("overwrite").saveAsTable("goodreads_reviews.reviews")


In [2]:
# Use the database
spark.sql("USE goodreads_reviews")

# Query the data from the table
result = spark.sql("SELECT * FROM reviews")

# Show the result
result.show()


+--------+--------------------+--------------------+----------+-------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
| book_id|          date_added|        date_updated|n_comments|n_votes|rating|             read_at|           review_id|         review_text|          started_at|             user_id|
+--------+--------------------+--------------------+----------+-------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|26079201|Sun Nov 27 20:03:...|Sat Feb 04 03:09:...|         0|      0|     4|Tue Nov 29 00:00:...|ccca1430b2b25b44c...|Thank you to Raye...|Sun Nov 27 00:00:...|d043e5bbcc1135529...|
|31207141|Tue Nov 22 03:41:...|Sat Jan 28 09:50:...|         0|      2|     4|Fri Jan 27 00:00:...|e31bd4dc601d662b4...|Review also poste...|Wed Jan 25 00:00:...|d043e5bbcc1135529...|
|25151797|Fri Nov 04 19:45:...|Tue Nov 22 01:20:...|         0|      0|     3|Mo

In [3]:
# Save DataFrame as parquet files (a columnar storage format) in a directory
df.write.mode("overwrite").parquet(r"C:\Users\jose\OneDrive - Dublin Business School (DBS)\Desktop\CA1_Integrated_Assesment_MSc_Data_Analytics_CCT_Semester_2\parquet_2")

# Register the DataFrame as a temporary view
df.createOrReplaceTempView("reviews")

In [1]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Create Database from Parquet") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

# Create a database
spark.sql("CREATE DATABASE IF NOT EXISTS goodreads_reviews")

# Select the database
spark.sql("USE goodreads_reviews")

# Save DataFrame as a table in the database
spark.sql("CREATE TABLE IF NOT EXISTS reviews USING parquet OPTIONS (PATH 'C:/Users/jose/OneDrive - Dublin Business School (DBS)/Desktop/CA1_Integrated_Assesment_MSc_Data_Analytics_CCT_Semester_2/parquet_2')")


DataFrame[]

In [2]:
# Show tables in the current schema
spark.sql("SHOW TABLES").show()

+-----------------+---------+-----------+
|        namespace|tableName|isTemporary|
+-----------------+---------+-----------+
|goodreads_reviews|  reviews|      false|
+-----------------+---------+-----------+



In [3]:
# Use the database
spark.sql("USE goodreads_reviews")

# Query the data from the table
result = spark.sql("SELECT * FROM reviews")

# Show the result
result.show()


+--------+--------------------+--------------------+----------+-------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
| book_id|          date_added|        date_updated|n_comments|n_votes|rating|             read_at|           review_id|         review_text|          started_at|             user_id|
+--------+--------------------+--------------------+----------+-------+------+--------------------+--------------------+--------------------+--------------------+--------------------+
|26079201|Sun Nov 27 20:03:...|Sat Feb 04 03:09:...|         0|      0|     4|Tue Nov 29 00:00:...|ccca1430b2b25b44c...|Thank you to Raye...|Sun Nov 27 00:00:...|d043e5bbcc1135529...|
|31207141|Tue Nov 22 03:41:...|Sat Jan 28 09:50:...|         0|      2|     4|Fri Jan 27 00:00:...|e31bd4dc601d662b4...|Review also poste...|Wed Jan 25 00:00:...|d043e5bbcc1135529...|
|25151797|Fri Nov 04 19:45:...|Tue Nov 22 01:20:...|         0|      0|     3|Mo

In [6]:
# Limiting the number of columns to display
result.limit(10).toPandas()


Unnamed: 0,book_id,date_added,date_updated,n_comments,n_votes,rating,read_at,review_id,review_text,started_at,user_id
0,26079201,Sun Nov 27 20:03:02 -0800 2016,Sat Feb 04 03:09:37 -0800 2017,0,0,4,Tue Nov 29 00:00:00 -0800 2016,ccca1430b2b25b44ce1baaf0d13849ee,Thank you to Raye Wagner for providing me with...,Sun Nov 27 00:00:00 -0800 2016,d043e5bbcc1135529327faf7260fecaa
1,31207141,Tue Nov 22 03:41:56 -0800 2016,Sat Jan 28 09:50:35 -0800 2017,0,2,4,Fri Jan 27 00:00:00 -0800 2017,e31bd4dc601d662b405ac00bcf834160,Review also posted at: http://underthebookcove...,Wed Jan 25 00:00:00 -0800 2017,d043e5bbcc1135529327faf7260fecaa
2,25151797,Fri Nov 04 19:45:39 -0700 2016,Tue Nov 22 01:20:27 -0800 2016,0,0,3,Mon Nov 21 00:00:00 -0800 2016,4583dd8f12dd00735f558d98019b95c8,Review also posted at: http://underthebookcove...,Fri Nov 18 00:00:00 -0800 2016,d043e5bbcc1135529327faf7260fecaa
3,31342605,Fri Nov 04 19:45:29 -0700 2016,Sat Nov 26 19:23:06 -0800 2016,0,0,4,Wed Nov 23 00:00:00 -0800 2016,556c7364116af51e25cecd39ea7998be,Review also posted at: http://underthebookcove...,Mon Nov 21 00:00:00 -0800 2016,d043e5bbcc1135529327faf7260fecaa
4,32608639,Sun Oct 30 20:45:17 -0700 2016,Sun Oct 30 20:46:31 -0700 2016,1,1,5,Sat Oct 29 00:00:00 -0700 2016,65005bc2b4bfc86bcb8a72e26099e790,Review also posted at: http://underthebookcove...,,d043e5bbcc1135529327faf7260fecaa
5,28114396,Wed Oct 12 17:06:19 -0700 2016,Sat Jan 21 11:43:12 -0800 2017,0,0,4,Fri Jan 20 00:00:00 -0800 2017,9cc90e511ff1108afc908bf33f258f31,Review also posted at: http://underthebookcove...,Wed Jan 18 00:00:00 -0800 2017,d043e5bbcc1135529327faf7260fecaa
6,31349010,Mon Sep 19 17:13:17 -0700 2016,Mon Sep 19 17:14:41 -0700 2016,0,1,5,Mon Sep 19 00:00:00 -0700 2016,a6bd36edd3c876b0edae0a5d250e4b67,Review also posted at: http://underthebookcove...,Mon Sep 19 00:00:00 -0700 2016,d043e5bbcc1135529327faf7260fecaa
7,26219455,Thu Sep 15 23:58:25 -0700 2016,Sat Apr 22 03:48:38 -0700 2017,0,0,5,Wed Apr 19 00:34:53 -0700 2017,33c6cfa7baaf44820765ed431e74d5b3,Review also posted at: http://underthebookcove...,Tue Apr 18 17:56:12 -0700 2017,d043e5bbcc1135529327faf7260fecaa
8,31849588,Wed Aug 24 11:07:20 -0700 2016,Sun Oct 30 06:10:16 -0700 2016,0,0,5,Tue Aug 23 00:00:00 -0700 2016,21404647c030da430b617cc91d98a7e7,Review also posted at: http://underthebookcove...,,d043e5bbcc1135529327faf7260fecaa
9,31849587,Fri Aug 19 10:10:54 -0700 2016,Sun Oct 30 06:12:17 -0700 2016,0,0,5,Thu Aug 18 00:00:00 -0700 2016,791ff07632260f9cfb162e24500b4561,Review also posted at: http://underthebookcove...,,d043e5bbcc1135529327faf7260fecaa


In [8]:
# Assuming df is your Spark DataFrame
columns_to_drop = ["date_added", "date_updated","n_comments","n_votes","review_id","started_at","read_at"]  # List of columns to drop

# Drop the specified columns
result = result.drop(*columns_to_drop)

# Show the result
result.show()

+--------+------+--------------------+--------------------+
| book_id|rating|         review_text|             user_id|
+--------+------+--------------------+--------------------+
|26079201|     4|Thank you to Raye...|d043e5bbcc1135529...|
|31207141|     4|Review also poste...|d043e5bbcc1135529...|
|25151797|     3|Review also poste...|d043e5bbcc1135529...|
|31342605|     4|Review also poste...|d043e5bbcc1135529...|
|32608639|     5|Review also poste...|d043e5bbcc1135529...|
|28114396|     4|Review also poste...|d043e5bbcc1135529...|
|31349010|     5|Review also poste...|d043e5bbcc1135529...|
|26219455|     5|Review also poste...|d043e5bbcc1135529...|
|31849588|     5|Review also poste...|d043e5bbcc1135529...|
|31849587|     5|Review also poste...|d043e5bbcc1135529...|
|31835951|     4|Review also poste...|d043e5bbcc1135529...|
|23014836|     3|Review also poste...|d043e5bbcc1135529...|
|28235555|     5|Review also poste...|d043e5bbcc1135529...|
|26805518|     4|Review also poste...|d0

In [13]:
from pyspark.sql.functions import col, count

# Cast the review_text column to string type
result = result.withColumn("review_text", col("review_text").cast("string"))

# Assuming df is your Spark DataFrame
# Select each column and count the number of null values in each column
null_counts = result.select(*(count(col(c)).alias(c) for c in result.columns))

# Show the result
null_counts.show()


+-------+-------+-----------+-------+
|book_id| rating|review_text|user_id|
+-------+-------+-----------+-------+
|3424641|3424641|    3424641|3424641|
+-------+-------+-----------+-------+



In [14]:
# Assuming df is your Spark DataFrame
# Drop rows with any null values
result = result.dropna()

# Show the cleaned DataFrame
result.show()


+--------+------+--------------------+--------------------+
| book_id|rating|         review_text|             user_id|
+--------+------+--------------------+--------------------+
|26079201|     4|Thank you to Raye...|d043e5bbcc1135529...|
|31207141|     4|Review also poste...|d043e5bbcc1135529...|
|25151797|     3|Review also poste...|d043e5bbcc1135529...|
|31342605|     4|Review also poste...|d043e5bbcc1135529...|
|32608639|     5|Review also poste...|d043e5bbcc1135529...|
|28114396|     4|Review also poste...|d043e5bbcc1135529...|
|31349010|     5|Review also poste...|d043e5bbcc1135529...|
|26219455|     5|Review also poste...|d043e5bbcc1135529...|
|31849588|     5|Review also poste...|d043e5bbcc1135529...|
|31849587|     5|Review also poste...|d043e5bbcc1135529...|
|31835951|     4|Review also poste...|d043e5bbcc1135529...|
|23014836|     3|Review also poste...|d043e5bbcc1135529...|
|28235555|     5|Review also poste...|d043e5bbcc1135529...|
|26805518|     4|Review also poste...|d0

In [17]:
# Assuming df is your Spark DataFrame
columns_to_drop = ["review_text"]  # List of columns to drop

# Drop the specified columns
result = result.drop(*columns_to_drop)

# Show the result
result.show()

+--------+------+--------------------+
| book_id|rating|             user_id|
+--------+------+--------------------+
|26079201|     4|d043e5bbcc1135529...|
|31207141|     4|d043e5bbcc1135529...|
|25151797|     3|d043e5bbcc1135529...|
|31342605|     4|d043e5bbcc1135529...|
|32608639|     5|d043e5bbcc1135529...|
|28114396|     4|d043e5bbcc1135529...|
|31349010|     5|d043e5bbcc1135529...|
|26219455|     5|d043e5bbcc1135529...|
|31849588|     5|d043e5bbcc1135529...|
|31849587|     5|d043e5bbcc1135529...|
|31835951|     4|d043e5bbcc1135529...|
|23014836|     3|d043e5bbcc1135529...|
|28235555|     5|d043e5bbcc1135529...|
|26805518|     4|d043e5bbcc1135529...|
|29546138|     3|d043e5bbcc1135529...|
|26114202|     4|d043e5bbcc1135529...|
|18584855|     5|d043e5bbcc1135529...|
|29569157|     4|d043e5bbcc1135529...|
|27396942|     3|d043e5bbcc1135529...|
|26114621|     5|d043e5bbcc1135529...|
+--------+------+--------------------+
only showing top 20 rows



In [18]:
from pyspark.sql import *  
from pyspark.sql.functions import *  
from pyspark.sql.types import *  
import numpy as np    
import pandas as pd


reviews_pandas_df= result.select("*").toPandas()

In [19]:
reviews_pandas_df

Unnamed: 0,book_id,rating,user_id
0,26079201,4,d043e5bbcc1135529327faf7260fecaa
1,31207141,4,d043e5bbcc1135529327faf7260fecaa
2,25151797,3,d043e5bbcc1135529327faf7260fecaa
3,31342605,4,d043e5bbcc1135529327faf7260fecaa
4,32608639,5,d043e5bbcc1135529327faf7260fecaa
...,...,...,...
3424636,5907,5,9a1e8a72fadaa8f3038aa783114e12cc
3424637,6137154,5,a13e16a58266ffa4a0851c417c4cda8f
3424638,41804,4,007c41115862813421ff17dde43728b3
3424639,14740456,4,fc5aea6994fc754daefa27cbea10ac79


In [4]:
import pandas as pd

# Define the number of rows per chunk
chunk_size = 1000  # Adjust the chunk size as needed

# Convert Spark DataFrame to Pandas DataFrame in chunks
reviews_pandas_chunks = []
for chunk in result.toPandas().to_numpy().reshape(-1, chunk_size):
    pandas_chunk = pd.DataFrame(chunk, columns=result.columns)
    reviews_pandas_chunks.append(pandas_chunk)

# Concatenate chunks into a single Pandas DataFrame
reviews_pandas_df = pd.concat(reviews_pandas_chunks, ignore_index=True)


Py4JJavaError: An error occurred while calling o49.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 11 tasks (1145.7 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4148)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4145)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)


In [6]:
reviews_pandas_df

NameError: name 'reviews_pandas_df' is not defined

In [5]:
from pyspark.sql import *  
from pyspark.sql.functions import *  
from pyspark.sql.types import *  
import numpy as np    
import pandas as pd


reviews_pandas_df= result.select("*").toPandas()

Py4JJavaError: An error occurred while calling o56.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 11 tasks (1145.7 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4148)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4322)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4320)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4320)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4145)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)


In [None]:
# Save DataFrame as parquet files (a columnar storage format) in a directory
df.write.mode("overwrite").parquet(r"C:\Users\jose\OneDrive - Dublin Business School (DBS)\Desktop\CA1_Integrated_Assesment_MSc_Data_Analytics_CCT_Semester_2\parquet_2")

# Register the DataFrame as a temporary view
df.createOrReplaceTempView("reviews")

In [None]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Create Database from CSV") \
    .getOrCreate()

# Create a database
spark.sql("CREATE DATABASE IF NOT EXISTS amz_reviews")

# Save DataFrame as a table in the database
spark.sql("CREATE TABLE IF NOT EXISTS reviews USING parquet OPTIONS (PATH 'C:/Users/jose/OneDrive - Dublin Business School (DBS)/Desktop/CA1_Integrated_Assesment_MSc_Data_Analytics_CCT_Semester_2/parquet_2')")


In [15]:
# Stop SparkSession
spark.stop()
