**Intall Required Libraries**

In [1]:
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=600ebb46f9ab556f73a5f14be14c662ed2014f8f5f705d7b86c70865b091dee7
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


**Intialize finspark and create Spark Session**

In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("Amazon_Reviews_Analysis").getOrCreate()


**Mount Google drive for path**

In [3]:
from google.colab import drive

# Mount your Google Drive
drive.mount('/content/drive')


Mounted at /content/drive


**Read csv and convert it into dataframe**

In [4]:
file_path = '/content/drive/My Drive/Colab Notebooks/interactions_validation.csv'
reviews_df = spark.read.csv(file_path, header=True, inferSchema=True)
reviews_df.show(5)


+-------+---------+----------+------+---+------+
|user_id|recipe_id|      date|rating|  u|     i|
+-------+---------+----------+------+---+------+
|  76535|    33627|2005-02-15|   4.0|  5|177317|
| 160497|    75307|2005-10-24|   4.0| 23|170785|
| 930021|   100961|2008-11-30|   4.0| 31|165555|
|  58439|   154105|2007-03-24|   4.0| 44|177453|
| 628951|    14525|2008-02-16|   5.0| 45|142367|
+-------+---------+----------+------+---+------+
only showing top 5 rows



**Find least, most and longest ratings and format date column**

In [5]:
from pyspark.sql.functions import col, length, to_date, date_format

# Group by 'recipe_id' and count the number of ratings
ratings_count = reviews_df.groupBy('recipe_id').count()

# Sort the DataFrame by count in ascending order to get the least rated items
least_rated_items = ratings_count.sort(col('count')).limit(1)

# Sort the DataFrame by count in descending order to get the most rated items
most_rated_items = ratings_count.sort(col('count').desc()).limit(1)
least_rated_items.show(5)
most_rated_items.show(5)

# Calculate the length of reviews ('i') for each item ('recipe_id')
reviews_length = reviews_df.groupBy('recipe_id').agg({'i': 'max'}).withColumnRenamed('max(i)', 'max_review_length')

# Find the item with the maximum review length
longest_review_item = reviews_length.sort(col('max_review_length').desc()).limit(1)
longest_review_item.show(5)

# Transformation: Change date format
reviews_df = reviews_df.withColumn("new_date", to_date(col("date"), "MM-dd-yyyy"))
reviews_df.show(5)

# DataFrame Operation example (Filtering)
desired_operation = reviews_df.filter(col("rating") > 4)
desired_operation.show(5)


+---------+-----+
|recipe_id|count|
+---------+-----+
|   301798|    1|
+---------+-----+

+---------+-----+
|recipe_id|count|
+---------+-----+
|    83928|    4|
+---------+-----+

+---------+-----------------+
|recipe_id|max_review_length|
+---------+-----------------+
|   241491|           178263|
+---------+-----------------+

+-------+---------+----------+------+---+------+----------+
|user_id|recipe_id|      date|rating|  u|     i|  new_date|
+-------+---------+----------+------+---+------+----------+
|  76535|    33627|2005-02-15|   4.0|  5|177317|2005-02-15|
| 160497|    75307|2005-10-24|   4.0| 23|170785|2005-10-24|
| 930021|   100961|2008-11-30|   4.0| 31|165555|2008-11-30|
|  58439|   154105|2007-03-24|   4.0| 44|177453|2007-03-24|
| 628951|    14525|2008-02-16|   5.0| 45|142367|2008-02-16|
+-------+---------+----------+------+---+------+----------+
only showing top 5 rows

+-------+---------+----------+------+---+------+----------+
|user_id|recipe_id|      date|rating|  u| 

**Convert PySpark DataFrame into Pandas DataFrame**

In [6]:
import pandas as pd

#converting PySpark DataFrame into Pandas DataFrame
pandas_df = reviews_df.toPandas()

# Convert 'date' column to datetime format
pandas_df['date'] = pd.to_datetime(pandas_df['date'])

# Convert the 'date' column to MM-DD-YYYY format
pandas_df['date'] = pandas_df['date'].dt.strftime('%m-%d-%Y')



**Insert DataFrame into a database**

In [7]:
#Storing DataFrame into database using sqlite
import sqlite3

sqlite_db = 'example.db'

# Create a connection to the SQLite database file
conn = sqlite3.connect(sqlite_db)

# Write Pandas DataFrame to SQLite database file
pandas_df.to_sql('reviews', conn, if_exists='replace', index=False)

# Close the connection
conn.close()


**Convert file into parquet file**

In [8]:
output_parquet_path = "/content/drive/My Drive/Colab Notebooks/validation_parquet_file"  # Replace with your desired output path
reviews_df.write.parquet(output_parquet_path)

**Stop the Spark Session**

In [9]:
spark.stop()