<a href="https://colab.research.google.com/github/Sirju1997/My_first_binder1/blob/main/a3_q2_816004557(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**What are the implications of loading additional features (that might not be necessary for analysis)
into PySpark**

Loading additional features into PySpark without considering their necessity for analysis can lead to several implications.

Firstly, it can increase resource consumption and slow down job execution due to added computational overhead. This complexity in data processing can make pipelines harder to understand and maintain.

Additionally, including unnecessary features in machine learning models raises the risk of overfitting and decreases generalization performance.

Lastly, debugging and troubleshooting become more challenging with unnecessary features, emphasizing the importance of carefully considering which features are truly needed for analysis to optimize performance and maintain data integrity.


**Create a sql context from PySpark SQLContext**

In [25]:
!apt-get update # Update apt-get repository.
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # Install Java.
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz # Download Apache Sparks.
!tar xf spark-3.1.1-bin-hadoop3.2.tgz # Unzip the tgz file.
!pip install -q findspark # Install findspark. Adds PySpark to the System path during runtime.

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

!ls

# Initialize findspark
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (185.125.190.36)] [Connecti                                                                                                    Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:5 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
a3_am

In [26]:
# Create a PySpark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

**Load the Amazon Review Dataset into a PySpark RDD, then Convert the RDD to a PySpark DataFrame**

In [78]:
df = spark.read.csv(r"/content/a3_amazon.csv", header = True, sep=',',inferSchema=False).toDF('Rating','ReviewerID','ProductID','Timestamp','ReviewText')

In [79]:
df.show()

+------+--------------+----------+----------+--------------------+
|Rating|    ReviewerID| ProductID| Timestamp|          ReviewText|
+------+--------------+----------+----------+--------------------+
|     1|A3SK6VNBQDNBJE|1118461304|1383264000|I have enjoyed Dr...|
|     3|A3SOFHUR27FO3K|1118461304|1381363200|Alan Gregerman be...|
|     1|A1HOG1PYCAE157|1118461304|1381276800|Alan Gregerman is...|
|     6|A26JGAM6GZMM4V|1118461304|1378512000|As I began to rea...|
|     5|A17K8WANMYHTX2|1118461304|1399766400|I teach a class o...|
|     3|A13IW3A6W43U0G|1118461304|1399593600|I so appreciated ...|
|     6|A1ECEGG1MP7J8J|1118461304|1399420800|As with Surrounde...|
|     1|A2D5X9G9S3A7RN|1118461304|1399161600|Alan Gregermans b...|
|     6| AP2F86JFRQ205|1118461304|1398988800|I am a believer i...|
|    10|A3VF3A5A3O04E1|1118461304|1397174400|A compelling book...|
|     7|A14DW5UMQ1M96O|1118461304|1395273600|Alan Gregerman's ...|
|     4|A2V7UVKOFG57IW|1118461304|1393459200|So refreshing to 

In [80]:
df.printSchema()

root
 |-- Rating: string (nullable = true)
 |-- ReviewerID: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- ReviewText: string (nullable = true)



In [82]:
# prompt: Convert the column Timestamp from the df dataframe which consists of string epoch data to date in the format "yyyy-mmm-dd" in a new column of the dataframe df 'Timestamp_date'

from pyspark.sql import functions as F

# Convert the Timestamp column to date format
df = df.withColumn('Timestamp_date', F.to_date(F.from_unixtime(df.Timestamp)))

# Show the DataFrame with the new column
df.show()


+------+--------------+----------+----------+--------------------+--------------+
|Rating|    ReviewerID| ProductID| Timestamp|          ReviewText|Timestamp_date|
+------+--------------+----------+----------+--------------------+--------------+
|     1|A3SK6VNBQDNBJE|1118461304|1383264000|I have enjoyed Dr...|    2013-11-01|
|     3|A3SOFHUR27FO3K|1118461304|1381363200|Alan Gregerman be...|    2013-10-10|
|     1|A1HOG1PYCAE157|1118461304|1381276800|Alan Gregerman is...|    2013-10-09|
|     6|A26JGAM6GZMM4V|1118461304|1378512000|As I began to rea...|    2013-09-07|
|     5|A17K8WANMYHTX2|1118461304|1399766400|I teach a class o...|    2014-05-11|
|     3|A13IW3A6W43U0G|1118461304|1399593600|I so appreciated ...|    2014-05-09|
|     6|A1ECEGG1MP7J8J|1118461304|1399420800|As with Surrounde...|    2014-05-07|
|     1|A2D5X9G9S3A7RN|1118461304|1399161600|Alan Gregermans b...|    2014-05-04|
|     6| AP2F86JFRQ205|1118461304|1398988800|I am a believer i...|    2014-05-02|
|    10|A3VF3A5A

In [83]:
df.printSchema()


root
 |-- Rating: string (nullable = true)
 |-- ReviewerID: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- ReviewText: string (nullable = true)
 |-- Timestamp_date: date (nullable = true)



In [52]:
df.show()

+------+--------------+----------+----------+--------------------+
|Rating|    ReviewerID| ProductID| Timestamp|          ReviewText|
+------+--------------+----------+----------+--------------------+
|     1|A3SK6VNBQDNBJE|1118461304|1383264000|I have enjoyed Dr...|
|     3|A3SOFHUR27FO3K|1118461304|1381363200|Alan Gregerman be...|
|     1|A1HOG1PYCAE157|1118461304|1381276800|Alan Gregerman is...|
|     6|A26JGAM6GZMM4V|1118461304|1378512000|As I began to rea...|
|     5|A17K8WANMYHTX2|1118461304|1399766400|I teach a class o...|
|     3|A13IW3A6W43U0G|1118461304|1399593600|I so appreciated ...|
|     6|A1ECEGG1MP7J8J|1118461304|1399420800|As with Surrounde...|
|     1|A2D5X9G9S3A7RN|1118461304|1399161600|Alan Gregermans b...|
|     6| AP2F86JFRQ205|1118461304|1398988800|I am a believer i...|
|    10|A3VF3A5A3O04E1|1118461304|1397174400|A compelling book...|
|     7|A14DW5UMQ1M96O|1118461304|1395273600|Alan Gregerman's ...|
|     4|A2V7UVKOFG57IW|1118461304|1393459200|So refreshing to 

In [113]:
df1 = df.filter(F.year('Timestamp_date')==F.lit(2015))

In [115]:
df1_sorted = df1.sort("Rating",ascending = False)
df1_sorted.show(20)

Prod_20 = df1_sorted.groupBy('ProductID').agg(F.count("*").alias('Product_count'))

Prod_20 = Prod_20.sort('Product_count', ascending = False)

Prod_20.show()


+------+--------------+----------+----------+--------------------+--------------+
|Rating|    ReviewerID| ProductID| Timestamp|          ReviewText|Timestamp_date|
+------+--------------+----------+----------+--------------------+--------------+
|     9|A2AQSV7K5FEXHK|B00006IV17|1427587200|Nice product-grea...|    2015-03-29|
|     9|A3N6SSJUXCROPI|B00009W3I4|1433116800|Our old dryer ven...|    2015-06-01|
|     9|A1465J3TJ3BKI5|B00006IV17|1425168000|Bed bash and beyo...|    2015-03-01|
|     9| AN63ZGJT5LO9B|B00004YWK2|1428710400|Kind of meh. Does...|    2015-04-11|
|     9|A150VHMFYQQ37E|B00006IV17|1423785600|It was exactly wh...|    2015-02-13|
|     9| A5POJ3A2LVFB1|B00004YWK2|1436832000|For the money it ...|    2015-07-14|
|     9|A2SJUILV7DFDEW|B00006IV17|1421020800|exactly as advert...|    2015-01-12|
|     9|A24EAX9PUWHNYA|B00004YWK2|1427328000|Works as expected...|    2015-03-26|
|     9|A3BBNA32BSRCAZ|B00006IV17|1420934400|Product arrived o...|    2015-01-11|
|     9|A17M9O2F

In [116]:
prod_index = Prod_20.withColumn("ProdIndex",F.monotonically_increasing_id())
prod_index = prod_index.where(F.col('ProdIndex') < 20)\
.show()

+----------+-------------+---------+
| ProductID|Product_count|ProdIndex|
+----------+-------------+---------+
|B000AST3AK|         1487|        0|
|B004UB1O9Q|         1471|        1|
|B00KJ07SEM|         1415|        2|
|B004INUWX0|          715|        3|
|B00E37TQV0|          658|        4|
|B0045LLC7K|          655|        5|
|B0014CN8Y8|          578|        6|
|B0042ACZU2|          555|        7|
|B00SPYDSFC|          453|        8|
|B001B35APA|          443|        9|
|B00DT5746Q|          433|       10|
|B00NIZ0DV0|          426|       11|
|B00INXG9MY|          421|       12|
|B000VL060M|          396|       13|
|B00AHR3IG4|          387|       14|
|B000HE5DNI|          386|       15|
|B00E37TR6E|          380|       16|
|B0053F80JA|          372|       17|
|B00LLM6ZFK|          363|       18|
|B0009GVYNW|          362|       19|
+----------+-------------+---------+



**Show the top 20 reviewers and the products they
reviewed in 2015**

In [101]:
reviewers = df1_sorted.groupBy('ReviewerID').agg(F.count("*").alias('Reviewers_count'))

reviewers = reviewers.sort('Reviewers_count', ascending = False)

reviewers.show()

reviewers_index = reviewers.withColumn("ReviewerIndex",F.monotonically_increasing_id())
reviewers_index.show(20)





+--------------+---------------+
|    ReviewerID|Reviewers_count|
+--------------+---------------+
|A2LDP3A4IE9T6T|            206|
| AFUVGAUNQVT0S|             40|
|A3H61AMBJ177DG|             29|
|A3RSUFA7CIL3TS|             23|
|A37C9W32E7WE1B|             18|
|A37DQO5LU8DXTV|             17|
|  AGADNX7MZ425|             16|
|A2SEK167AI6DZM|             16|
|A2CIEGHZ7L1WWR|             15|
|A3R9MYRHDWHTVL|             14|
|A2KRNK6HVD2DTY|             14|
| AV052V61HYMAE|             13|
| A41G22PCRD3E8|             12|
| AXYJOIFBWO3K3|             11|
|A13391AZAFJ67K|             11|
| A6FPI5CM4W5WB|             11|
|A17MK6PQZUDLXL|             10|
|A1AJW9DILZFTQI|             10|
|A1BTOP0JZS3YH8|              9|
|A1ELL3S7RMZ4YN|              9|
+--------------+---------------+
only showing top 20 rows

+--------------+---------------+-------------+
|    ReviewerID|Reviewers_count|ReviewerIndex|
+--------------+---------------+-------------+
|A2LDP3A4IE9T6T|            206|         

In [103]:
reviewers_index = reviewers_index.where(F.col('ReviewerIndex') < 20)
reviewers_index.show()

+--------------+---------------+-------------+
|    ReviewerID|Reviewers_count|ReviewerIndex|
+--------------+---------------+-------------+
|A2LDP3A4IE9T6T|            206|            0|
| AFUVGAUNQVT0S|             40|            1|
|A3H61AMBJ177DG|             29|            2|
|A3RSUFA7CIL3TS|             23|            3|
|A37C9W32E7WE1B|             18|            4|
|A37DQO5LU8DXTV|             17|            5|
|  AGADNX7MZ425|             16|            6|
|A2SEK167AI6DZM|             16|            7|
|A2CIEGHZ7L1WWR|             15|            8|
|A2KRNK6HVD2DTY|             14|            9|
|A3R9MYRHDWHTVL|             14|           10|
| AV052V61HYMAE|             13|           11|
| A41G22PCRD3E8|             12|           12|
| A6FPI5CM4W5WB|             11|           13|
| AXYJOIFBWO3K3|             11|           14|
|A13391AZAFJ67K|             11|           15|
|A17MK6PQZUDLXL|             10|           16|
|A1AJW9DILZFTQI|             10|           17|
|A1BTOP0JZS3Y

In [129]:
top_20 = reviewers_index.join(df1_sorted, reviewers_index['ReviewerID'] == df1_sorted['ReviewerID'])

top_20.sort('ReviewerIndex', ascending=True)\
.show()

+--------------+---------------+-------------+------+--------------+----------+----------+--------------------+--------------+
|    ReviewerID|Reviewers_count|ReviewerIndex|Rating|    ReviewerID| ProductID| Timestamp|          ReviewText|Timestamp_date|
+--------------+---------------+-------------+------+--------------+----------+----------+--------------------+--------------+
|A2LDP3A4IE9T6T|            206|            0|     9|A2LDP3A4IE9T6T|B0006GVNOA|1449273600|We have 24 foot o...|    2015-12-05|
|A2LDP3A4IE9T6T|            206|            0|     6|A2LDP3A4IE9T6T|B0006GVNOA|1449273600|We have 24 foot o...|    2015-12-05|
|A2LDP3A4IE9T6T|            206|            0|     1|A2LDP3A4IE9T6T|B0006GVNOA|1449273600|We have 24 foot o...|    2015-12-05|
|A2LDP3A4IE9T6T|            206|            0|     5|A2LDP3A4IE9T6T|B0006GVNOA|1449273600|We have 24 foot o...|    2015-12-05|
|A2LDP3A4IE9T6T|            206|            0|     4|A2LDP3A4IE9T6T|B0006GVNOA|1449273600|We have 24 foot o...|

**Determine whether the top 20 most reviewed products in 2015 are the same as the top 20 products with the highest average rating in 2015**

In [119]:
Prod_rating_20 = df1_sorted.groupBy('ProductID').agg(F.avg('Rating').alias('avg_rating'))

Prod_rating_20 = Prod_rating_20.sort('avg_rating', ascending = False)


In [120]:
Prod_rating_20_index = Prod_rating_20.withColumn("RatingIndex",F.monotonically_increasing_id())

Prod_rating_20_index = Prod_rating_20_index.where(F.col('RatingIndex') < 20)
Prod_rating_20_index.show()

+----------+----------+-----------+
| ProductID|avg_rating|RatingIndex|
+----------+----------+-----------+
|B00C6OT7TS|      10.0|          0|
|B00VVMLH9E|      10.0|          1|
|B000LOAV4A|      10.0|          2|
|B0057P2H4K|      10.0|          3|
|B00KA36KFG|      10.0|          4|
|B00M3ECJG4|      10.0|          5|
|B006O7BPE8|      10.0|          6|
|B00BW6BZV4|      10.0|          7|
|B00XK5UPYS|      10.0|          8|
|B00C9YRSCS|      10.0|          9|
|B00EN8N5ES|      10.0|         10|
|B00LOX4DXW|      10.0|         11|
|B00Y1TSJF4|      10.0|         12|
|B00MHIXZD2|      10.0|         13|
|B00QZPA8HE|      10.0|         14|
|B004IMNTYK|      10.0|         15|
|B00B9DZKE8|      10.0|         16|
|B00LHRA5JQ|      10.0|         17|
|B00MOCZLP6|      10.0|         18|
|B00IASX0SI|      10.0|         19|
+----------+----------+-----------+



In [128]:
compare_20 = prod_index.join(Prod_rating_20_index, on='ProductID', how='left_anti')
compare_20.show()

AttributeError: 'NoneType' object has no attribute 'join'