
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

I intialized the spark session, because it's necessary for performing the distributed data processing using Apache Spark.

In [0]:
from pyspark.sql import SparkSession

# Initializing the Spark session
spark = SparkSession.builder.appName("DataIntegration").getOrCreate()
print("Spark session created")

In [0]:
# Loading the dataset
file_path = "/FileStore/tables/Reviews/Reviews.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)
print("Dataset loaded")
df.show(5)

Dataset loaded
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|                   0|                     0|    1|1346976000|   Not as Advertised|"Product arrived ...|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    4|1219017600|"""Delight"" says...|"This is a confec...|
|  4|B000UA0QIQ|A395BORC6FGVXV|                Ka

I then dropped rows with missing values to ensure data quality. I then proceeded to select relevant columns needed for analysis.

In [0]:
# Data Preprocessing: Dropping the rows with missing values and selecting the relevant columns
df = df.dropna()
df = df.select("Id", "ProductId", "UserId", "ProfileName", "HelpfulnessNumerator", "HelpfulnessDenominator", "Score", "Time", "Summary", "Text")
print("Data preprocessing completed")
df.show(5)

Data preprocessing completed
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|                   0|                     0|    1|1346976000|   Not as Advertised|"Product arrived ...|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    4|1219017600|"""Delight"" says...|"This is a confec...|
|  4|B000UA0QIQ|A395BORC6FGVXV|    

"withColumn" method is used to cast the columns "HelpfulnessNumerator", "HelpfulnessDenominator", "Score", and "Time" to integer and long data types respectively to ensure accuratenumerical operations and calculations.

In [0]:
from pyspark.sql.functions import col

# Converting the columns to appropriate data types
df = df.withColumn("HelpfulnessNumerator", col("HelpfulnessNumerator").cast("integer"))
df = df.withColumn("HelpfulnessDenominator", col("HelpfulnessDenominator").cast("integer"))
df = df.withColumn("Score", col("Score").cast("integer"))
df = df.withColumn("Time", col("Time").cast("long"))

print("Columns converted to appropriate data types")
df.show(5)

Columns converted to appropriate data types
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
| Id| ProductId|        UserId|         ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|      Time|             Summary|                Text|
+---+----------+--------------+--------------------+--------------------+----------------------+-----+----------+--------------------+--------------------+
|  1|B001E4KFG0|A3SGXH7AUHU8GW|          delmartian|                   1|                     1|    5|1303862400|Good Quality Dog ...|I have bought sev...|
|  2|B00813GRG4|A1D87F6ZCVE5NK|              dll pa|                   0|                     0|    1|1346976000|   Not as Advertised|"Product arrived ...|
|  3|B000LQOCH0| ABXLMWJIXXAIN|"Natalia Corres "...|                   1|                     1|    4|1219017600|"""Delight"" says...|"This is a confec...|
|  4|B000UA0QIQ|A395

I then simulated another dataset containing product information. The "product_info_data" list contains a sample data, and the "product_info_schema" list defines the column names. The "createDataFrame" method creates a spark dataframe from the simulated data.

In [0]:
# Simulating another dataset containing a sample product information
product_info_data = [
    ("B001", "Product A", "Category A"),
    ("B002", "Product B", "Category B"),
    ("B003", "Product C", "Category C")
]
product_info_schema = ["ProductId", "ProductName", "Category"]
product_info = spark.createDataFrame(product_info_data, product_info_schema)
print("Simulated product information dataset created")
product_info.show()

Simulated product information dataset created
+---------+-----------+----------+
|ProductId|ProductName|  Category|
+---------+-----------+----------+
|     B001|  Product A|Category A|
|     B002|  Product B|Category B|
|     B003|  Product C|Category C|
+---------+-----------+----------+



I then proceeded to join the main dataset "df", with the simulated product information dataset "product_info" on the "ProductID" column using an inner join, on rows only including "ProductId" values on both DataFrames.

In [0]:
# Integrating the data using Spark
integrated_data = df.join(product_info, on="ProductId", how="inner")
print("Data integration completed")
integrated_data.show(5)

Data integration completed
+---------+---+------+-----------+--------------------+----------------------+-----+----+-------+----+-----------+--------+
|ProductId| Id|UserId|ProfileName|HelpfulnessNumerator|HelpfulnessDenominator|Score|Time|Summary|Text|ProductName|Category|
+---------+---+------+-----------+--------------------+----------------------+-----+----+-------+----+-----------+--------+
+---------+---+------+-----------+--------------------+----------------------+-----+----+-------+----+-----------+--------+



In [0]:
# Aggregation: Calculating the average score for each product
aggregated_data = integrated_data.groupBy("ProductId").agg({"Score": "avg"})
aggregated_data = aggregated_data.withColumnRenamed("avg(Score)", "average_score")
print("Aggregation completed")
aggregated_data.show(5)

Aggregation completed
+---------+-------------+
|ProductId|average_score|
+---------+-------------+
+---------+-------------+



I then saved the aggregated data. 

In [0]:
# Definining the output path
output_path = "/FileStore/tables/aggregated_data"

# Removing the existing output path if it already exists
def remove_existing_path(path):
    if any(f.path == path for f in dbutils.fs.ls(os.path.dirname(path))):
        dbutils.fs.rm(path, True)
        print(f"Removed existing directory: {path}")

remove_existing_path(output_path)

# Saving the result to a CSV file
aggregated_data.coalesce(1).write.option("header", "true").csv(output_path)
print("Aggregated data saved to CSV")

Aggregated data saved to CSV


In [0]:
# Stopping the Spark session to free up resources and avoid memory leaks
spark.stop()
print("Spark session stopped")