In [348]:
from pyspark.sql.window import Window
import pyspark.sql.functions as f
from textblob import TextBlob
from pyspark.sql.types import StringType

In [349]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName('FinalTask')\
        .config('spark.driver.extraClassPath','/usr/lib/jvm/java-11-openjdk-amd64/lib/postgresql-42.6.0.jar')\
        .getOrCreate()


23/09/12 16:07:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### reading from postgres

In [350]:
reviews_df = spark.read.format("jdbc").options(url='jdbc:postgresql://localhost:5432/Cleaned_Data',
                                driver = 'org.postgresql.Driver',
                                dbtable = 'reviews', 
                                user='postgres',
                                password='postgres').load()

orders_df = spark.read.format("jdbc").options(url='jdbc:postgresql://localhost:5432/Cleaned_Data',
                                driver = 'org.postgresql.Driver',
                                dbtable = 'orders', 
                                user='postgres',
                                password='postgres').load()

order_item_df = spark.read.format("jdbc").options(url='jdbc:postgresql://localhost:5432/Cleaned_Data',
                                driver = 'org.postgresql.Driver',
                                dbtable = 'order_item', 
                                user='postgres',
                                password='postgres').load()


product_df = spark.read.format("jdbc").options(url='jdbc:postgresql://localhost:5432/Cleaned_Data',
                                driver = 'org.postgresql.Driver',
                                dbtable = 'product', 
                                user='postgres',
                                password='postgres').load()


### Compute the delivery deviation in days and actual delivery time also find the order delivery hour in category: delivery trend and purphase trend

In [351]:
#filter delivered orders only
delivered_orders_df = orders_df.filter(orders_df["Order_status"] == "delivered")

In [352]:
# Extract the hour of the day from the timestamp
delivered_orders_df = delivered_orders_df.withColumn("Order_purchase_hour", f.hour("Order_purchase_timestamp"))
delivered_orders_df.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|Order_purchase_hour|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|                 10|
|53cdb2fc8bc7dce0b...|b0830fb4747a6c6d2...|   delivered|     2018-07-24 20:41:37|2018-07-26 03:24:27|         2018-07-26 14:31:00|          2018-08-07 15:27

In [353]:
delivered_orders_df = delivered_orders_df.withColumn(
    "delivery_time_days",
    f.round((f.unix_timestamp("order_delivered_customer_date") - f.unix_timestamp("order_purchase_timestamp")) / (24 * 3600),3)
)
delivered_orders_df = delivered_orders_df.withColumn(
    "delivery_deviation_in_days",
    f.round((f.unix_timestamp("order_estimated_delivery_date") - f.unix_timestamp("order_delivered_customer_date")) / (24 * 3600),3)
)

delivered_orders_df.show()

#Average delivery time 
average_delivery_time = delivered_orders_df.selectExpr("avg(delivery_time_days) as avg_delivery_time").first()["avg_delivery_time"]
print("Average Delivery Time (in days):", average_delivery_time)

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------------+------------------+--------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|Order_purchase_hour|delivery_time_days|delivery_deviation_in_days|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------------+------------------+--------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:25:13|          2017-10-18 00:00:00|                 10|             8.437|

In [354]:
# Create a column for the time slot 
delivered_orders_df = delivered_orders_df.withColumn("Order_purchase_time_slot",
    f.when((delivered_orders_df["Order_purchase_hour"] >= 0) & (delivered_orders_df["Order_purchase_hour"] <= 6), "Dawn")
    .when((delivered_orders_df["Order_purchase_hour"] >= 7) & (delivered_orders_df["Order_purchase_hour"] <= 12), "Morning")
    .when((delivered_orders_df["Order_purchase_hour"] >= 13) & (delivered_orders_df["Order_purchase_hour"] <= 18), "Afternoon")
    .otherwise("Night")
)
delivered_orders_df.show()

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------------+------------------+--------------------------+------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|Order_purchase_hour|delivery_time_days|delivery_deviation_in_days|Order_purchase_time_slot|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------------+------------------+--------------------------+------------------------+
|e481f51cbdc54678b...|9ef432eb625129730...|   delivered|     2017-10-02 10:56:33|2017-10-02 11:07:15|         2017-10-04 19:55:00|          2017-10-10 21:

In [355]:
window_spec = Window.partitionBy("order_purchase_hour")
delivered_orders_df = delivered_orders_df.withColumn("approved_number_of_order", f.count("order_id").over(window_spec))

delivered_orders_df.show()


[Stage 6:>                                                          (0 + 1) / 1]

+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------------+------------------+--------------------------+------------------------+------------------------+
|            order_id|         customer_id|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|Order_purchase_hour|delivery_time_days|delivery_deviation_in_days|Order_purchase_time_slot|approved_number_of_order|
+--------------------+--------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+-------------------+------------------+--------------------------+------------------------+------------------------+
|dd78f560c270f1909...|8b212b9525f9e74e8...|   delivered|     2018-03-12 01:50:2

                                                                                

In [356]:
delivery_status = delivered_orders_df.select("order_id", "order_purchase_timestamp", "order_delivered_customer_date","delivery_deviation_in_days","delivery_time_days","Order_purchase_time_slot","Order_purchase_hour","approved_number_of_order")

delivery_status.show()

+--------------------+------------------------+-----------------------------+--------------------------+------------------+------------------------+-------------------+------------------------+
|            order_id|order_purchase_timestamp|order_delivered_customer_date|delivery_deviation_in_days|delivery_time_days|Order_purchase_time_slot|Order_purchase_hour|approved_number_of_order|
+--------------------+------------------------+-----------------------------+--------------------------+------------------+------------------------+-------------------+------------------------+
|dd78f560c270f1909...|     2018-03-12 01:50:26|          2018-03-21 14:41:50|                     6.388|             9.536|                    Dawn|                  1|                    1133|
|aa466a92af916f9f6...|     2018-07-03 01:17:32|          2018-07-11 22:52:20|                    27.047|             8.899|                    Dawn|                  1|                    1133|
|6b7c5703bdc03335b...|     201

                                                                                

In [357]:
delivery_status.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/Output',
                                driver = 'org.postgresql.Driver',
                                dbtable = 'delivery_status_table', 
                                user='postgres', 
                                password='postgres').mode('overwrite').save()


                                                                                

### Question 2

### find the review sentiment analysis and find the coorelation 

In [358]:
count_no_comment = reviews_df.filter(f.col("review_comment_message") == "no comment").count()

print("Count of 'No comment':", count_no_comment)

Count of 'No comment': 58274


### Perform sentiment analysis on the comment

In [359]:
def analyze_sentiment(text):
    if text.lower() != "no comment":
        analysis = TextBlob(text)
        polarity = analysis.sentiment.polarity
        return polarity
    return None  # Return None for "no comment" comments

In [360]:
sentiment_udf = f.udf(analyze_sentiment, StringType())

In [361]:
reviews_df = reviews_df.withColumn("sentiment_score", sentiment_udf(reviews_df["review_comment_message"]))

reviews_df=reviews_df.withColumn("sentiment_score", reviews_df["sentiment_score"].cast("float"))


# Filter out rows with sentiment scores (exclude "no comment" comments)
filtered_reviews_df = reviews_df.filter(reviews_df["sentiment_score"].isNotNull())


# Show the filtered DataFrame with sentiment scores
filtered_reviews_df.show()


[Stage 18:>                                                         (0 + 1) / 1]

+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+---------------+
|           review_id|            order_id|review_score|review_comment_title|review_comment_message|review_creation_date|review_answer_timestamp|sentiment_score|
+--------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+---------------+
|736c499ee6f254cef...|a1b9ef764d1de3a28...|           5|          no comment|  The post office w...| 2017-10-17 00:00:00|    2017-10-17 23:58:09|           -0.3|
|abb4e4b1fd981095b...|b53c76841e4d01912...|           3|          no comment|  I was in doubt if...| 2017-03-30 00:00:00|    2017-04-03 11:13:18|         0.1875|
|8c00ed9f01a667397...|6112465ce26bd647d...|           3|          no comment|  two products were...| 2018-03-10 00:00:00|    2018-03-12 17:22:36|            0.0|
|4d9738ab27699a838...|e8373f

                                                                                

In [362]:
correlation = filtered_reviews_df.select(f.corr("sentiment_score", "review_score")).first()[0]
# Create a DataFrame to store the correlation result
correlation_df = spark.createDataFrame([(correlation,)], ["Correlation"])

# Show the correlation result
correlation_df.show()
print("Correlation between sentiment_score and review_score:", correlation)

[Stage 22:>                                                         (0 + 1) / 1]

+------------------+
|       Correlation|
+------------------+
|0.5368841750490808|
+------------------+

Correlation between sentiment_score and review_score: 0.5368841750490808


                                                                                

In [363]:
results_df = filtered_reviews_df.join(order_item_df, "order_id", "inner")
joined_df= results_df.join(product_df, on = "product_id", how ="inner")
joined_df = joined_df.select("order_id", "review_id","review_score", "review_comment_message","product_category_name","sentiment_score")

joined_df.show()


[Stage 24:>                                                         (0 + 1) / 1]

+--------------------+--------------------+------------+----------------------+---------------------+---------------+
|            order_id|           review_id|review_score|review_comment_message|product_category_name|sentiment_score|
+--------------------+--------------------+------------+----------------------+---------------------+---------------+
|184f17bb701af22b8...|8dc6ed948a1806c9a...|           3|  Very good. ..I re...|        fashion shoes|           0.91|
|8a98ad89f447011bd...|8165a35c97adca2e1...|           5|  Excellent product...|           stationery|           0.75|
|d579ab4fe27352cb0...|129e42c8d3e47ea56...|           1|  Beautiful watch, ...|                 null|     0.11666667|
|70fdb98b88f9b34c5...|b1172723acc2f1d5c...|           1|  in addition to th...|       sports leisure|           -0.2|
|c1cb3668980881dd3...|7030ebd12401ee187...|           1|  In addition to be...|        health beauty|           -0.4|
|db67f190728e6864b...|907209bec7fb03d05...|           5|

                                                                                

In [364]:
joined_df = joined_df.withColumn(
    "sentiment",
    f.when(joined_df["sentiment_score"] > 0, "positive")
    .when(joined_df["sentiment_score"] == 0, "neutral")
    .otherwise("negative")
)

joined_df.show()



[Stage 33:>                                                         (0 + 1) / 1]

+--------------------+--------------------+------------+----------------------+---------------------+---------------+---------+
|            order_id|           review_id|review_score|review_comment_message|product_category_name|sentiment_score|sentiment|
+--------------------+--------------------+------------+----------------------+---------------------+---------------+---------+
|184f17bb701af22b8...|8dc6ed948a1806c9a...|           3|  Very good. ..I re...|        fashion shoes|           0.91| positive|
|8a98ad89f447011bd...|8165a35c97adca2e1...|           5|  Excellent product...|           stationery|           0.75| positive|
|d579ab4fe27352cb0...|129e42c8d3e47ea56...|           1|  Beautiful watch, ...|                 null|     0.11666667| positive|
|70fdb98b88f9b34c5...|b1172723acc2f1d5c...|           1|  in addition to th...|       sports leisure|           -0.2| negative|
|c1cb3668980881dd3...|7030ebd12401ee187...|           1|  In addition to be...|        health beauty|   

                                                                                

In [365]:


joined_df.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/Output',
                                driver = 'org.postgresql.Driver',
                                dbtable = 'review_sentiment_analysis', 
                                user='postgres', 
                                password='postgres').mode('overwrite').save()


correlation_df.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/Output',
                                driver = 'org.postgresql.Driver',
                                dbtable = 'correlation_of_reviewscore_and_sentiment', 
                                user='postgres', 
                                password='postgres').mode('overwrite').save()



                                                                                

In [366]:
spark.stop()