# Distributed Computing - Group Project Task 2

* Group:
    - Gopi Maguluri
    - Nihal Karim
    - Pooja Baralu Umesh
    - Ricky Miura

In [3]:
import os
import boto3
from dotenv import load_dotenv
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, sum

load_dotenv()


True

In [4]:
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_REGION = 'us-west-1'

s3 = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY_ID,
                                         aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                                         region_name=AWS_REGION)

In [6]:
bucket_name = 'msds-694-groupproject-12'
response = s3.list_objects_v2(Bucket=bucket_name, Prefix='')
for obj in response['Contents']:
    print('FILE NAME:', obj['Key'], '---> SIZE OF THE FILE:', obj['Size'])

FILE NAME: data/ ---> SIZE OF THE FILE: 0
FILE NAME: data/stackoverflow0to5.csv ---> SIZE OF THE FILE: 4873616231


In [None]:
object_key = 'stackoverflow0to5.csv'
local_file_path = './data/'+object_key

s3.download_file(bucket_name, object_key, local_file_path)

ClientError: An error occurred (404) when calling the HeadObject operation: Not Found

24/12/03 18:51:16 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 193989 ms exceeds timeout 120000 ms
24/12/03 18:51:16 WARN SparkContext: Killing executors is not supported by current scheduler.
24/12/03 18:51:19 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [7]:
sc = SparkContext(appName="StackOverflowPosts").getOrCreate()
spark = SparkSession.builder.getOrCreate()

24/12/03 17:13:04 WARN Utils: Your hostname, Rickys-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.169.165 instead (on interface en0)
24/12/03 17:13:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/03 17:13:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
data = sc.textFile(local_file_path)

In [21]:
full_df = spark.read.csv(local_file_path, header=True, inferSchema=True)

                                                                                

In [22]:
full_df.show(5)

+--------------------+----------+----------------+--------+-----+---------+--------------------+-----+--------------+-------------+------------+----------------+------------+----------------+-----------+----+
|                  Id|PostTypeId|AcceptedAnswerId|ParentId|Score|ViewCount|                Body|Title|ContentLicense|FavoriteCount|CreationDate|LastActivityDate|LastEditDate|LastEditorUserId|OwnerUserId|Tags|
+--------------------+----------+----------------+--------+-----+---------+--------------------+-----+--------------+-------------+------------+----------------+------------+----------------+-----------+----+
|                   7|         2|            NULL|     4.0|  522|     NULL|An explicit cast ...| NULL|          NULL|         NULL|        NULL|            NULL|        NULL|            NULL|       NULL|NULL|
|                 ```|      NULL|            NULL|    NULL| NULL|     NULL|                NULL| NULL|          NULL|         NULL|        NULL|            NULL|   

#### Checking the missing values

In [23]:
missing_values = full_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
missing_values.show()



+-----+----------+----------------+--------+--------+---------+--------+--------+--------------+-------------+------------+----------------+------------+----------------+-----------+--------+
|   Id|PostTypeId|AcceptedAnswerId|ParentId|   Score|ViewCount|    Body|   Title|ContentLicense|FavoriteCount|CreationDate|LastActivityDate|LastEditDate|LastEditorUserId|OwnerUserId|    Tags|
+-----+----------+----------------+--------+--------+---------+--------+--------+--------------+-------------+------------+----------------+------------+----------------+-----------+--------+
|16824|  55847375|        66431040|65980165|65030025| 69601005|65714391|71630387|      71745810|     71868890|    71989065|        72025836|    72041976|        72051047|   72056228|72065878|
+-----+----------+----------------+--------+--------+---------+--------+--------+--------------+-------------+------------+----------------+------------+----------------+-----------+--------+



                                                                                

#### If we just take a subset of the original dataset, the subset contains many null values. There are posts that may not have a body and questions are asked in the title so we will filter out rows where the whole row is null while keeping rows where only some columns are null.

In [24]:
columns_to_check = [c for c in df.columns if c != "Id"]
all_null_condition = " AND ".join([f"{col_name} IS NULL" for col_name in columns_to_check])
filtered_df = full_df.filter(f"NOT ({all_null_condition})")
subset_df = filtered_df.limit(5000)

#### Saving the subset data

In [25]:
subset_df.write.option("header", True).option("delimiter", ";").format("csv").mode("overwrite").save("data/stackoverflow0to5_subset.csv")

                                                                                

In [26]:
subset_df.show(5)

+---+----------+----------------+--------+-----+---------+--------------------+-----+--------------+-------------+------------+----------------+------------+----------------+-----------+----+
| Id|PostTypeId|AcceptedAnswerId|ParentId|Score|ViewCount|                Body|Title|ContentLicense|FavoriteCount|CreationDate|LastActivityDate|LastEditDate|LastEditorUserId|OwnerUserId|Tags|
+---+----------+----------------+--------+-----+---------+--------------------+-----+--------------+-------------+------------+----------------+------------+----------------+-----------+----+
|  7|         2|            NULL|     4.0|  522|     NULL|An explicit cast ...| NULL|          NULL|         NULL|        NULL|            NULL|        NULL|            NULL|       NULL|NULL|
|  9|         1|          1404.0|    NULL| 2199| 784860.0|Given a `DateTime...| NULL|          NULL|         NULL|        NULL|            NULL|        NULL|            NULL|       NULL|NULL|
| 11|         1|          1248.0|    NUL

#### Checking the missing values in the subset data

In [27]:
missing_values = subset_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in subset_df.columns])
missing_values.show()



+---+----------+----------------+--------+-----+---------+----+-----+--------------+-------------+------------+----------------+------------+----------------+-----------+----+
| Id|PostTypeId|AcceptedAnswerId|ParentId|Score|ViewCount|Body|Title|ContentLicense|FavoriteCount|CreationDate|LastActivityDate|LastEditDate|LastEditorUserId|OwnerUserId|Tags|
+---+----------+----------------+--------+-----+---------+----+-----+--------------+-------------+------------+----------------+------------+----------------+-----------+----+
|  2|         3|            3237|    2730| 2476|     4267|2700| 4843|          4893|         4937|        4973|            4988|        4991|            4992|       4995|4998|
+---+----------+----------------+--------+-----+---------+----+-----+--------------+-------------+------------+----------------+------------+----------------+-----------+----+



                                                                                

* After filtering the original dataset first and then taking a subset, we now see less null values.

In [28]:
subset_df.dtypes

[('Id', 'string'),
 ('PostTypeId', 'string'),
 ('AcceptedAnswerId', 'string'),
 ('ParentId', 'string'),
 ('Score', 'string'),
 ('ViewCount', 'string'),
 ('Body', 'string'),
 ('Title', 'string'),
 ('ContentLicense', 'string'),
 ('FavoriteCount', 'string'),
 ('CreationDate', 'string'),
 ('LastActivityDate', 'string'),
 ('LastEditDate', 'string'),
 ('LastEditorUserId', 'string'),
 ('OwnerUserId', 'string'),
 ('Tags', 'string')]

In [29]:
sorted_df = subset_df.orderBy(col("ViewCount").desc())
sorted_df.show()



+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+---------------+-------------+--------------------+----------------+------------+----------------+-----------+----+
|                  Id|          PostTypeId|    AcceptedAnswerId|            ParentId|               Score|           ViewCount|                Body|      Title| ContentLicense|FavoriteCount|        CreationDate|LastActivityDate|LastEditDate|LastEditorUserId|OwnerUserId|Tags|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------+---------------+-------------+--------------------+----------------+------------+----------------+-----------+----+
|     Knuth (volume 4|          fascicle 2| 7.2.1.3) tells u...|t)-combination is...|t)-combination is...|t)-combination in...| of length (s+t))...|       NULL|           N

                                                                                

* Looking at the data types, all of the columns contain strings.

* We want to use 'ViewCount' and 'Score' to calculate some aggregations on numerical columns so we want to cast the values in these columns to integers.

* The issue is that these columns contain some weird values as shown above.

* So we will filter out the original dataset to only include rows where the values in these two columns can be casted to integers and take a subset of this data.

In [30]:
filtered_df = filtered_df.filter(col("ViewCount").cast("int").isNotNull() & col("Score").cast("int").isNotNull())
subset_df = filtered_df.limit(5000)
subset_df = subset_df.withColumn("ViewCount", col("ViewCount").cast("int")).withColumn("Score", col("Score").cast("int"))

In [31]:
subset_df.show(5)

+---+----------+----------------+--------+-----+---------+--------------------+-----+--------------+-------------+------------+----------------+------------+----------------+-----------+----+
| Id|PostTypeId|AcceptedAnswerId|ParentId|Score|ViewCount|                Body|Title|ContentLicense|FavoriteCount|CreationDate|LastActivityDate|LastEditDate|LastEditorUserId|OwnerUserId|Tags|
+---+----------+----------------+--------+-----+---------+--------------------+-----+--------------+-------------+------------+----------------+------------+----------------+-----------+----+
|  9|         1|          1404.0|    NULL| 2199|   784860|Given a `DateTime...| NULL|          NULL|         NULL|        NULL|            NULL|        NULL|            NULL|       NULL|NULL|
| 11|         1|          1248.0|    NULL| 1644|   197314|Given a specific ...| NULL|          NULL|         NULL|        NULL|            NULL|        NULL|            NULL|       NULL|NULL|
| 14|         1|            NULL|    NUL

* This is what our subset looks like which we can begin aggregating on.

In [32]:
subset_df.write.format("csv").option("header", "true").mode("overwrite").save("data/stackoverflow0to5_subset.csv")

                                                                                

## Summarization and aggregation

In [33]:
top_10_by_views = subset_df.orderBy(col("ViewCount").desc()).limit(10)
top_10_by_views.show()



+--------------------+----------+----------------+--------------------+----------+---------+--------------------+-----+--------------+-------------+------------+----------------+------------+----------------+-----------+----+
|                  Id|PostTypeId|AcceptedAnswerId|            ParentId|     Score|ViewCount|                Body|Title|ContentLicense|FavoriteCount|CreationDate|LastActivityDate|LastEditDate|LastEditorUserId|OwnerUserId|Tags|
+--------------------+----------+----------------+--------------------+----------+---------+--------------------+-----+--------------+-------------+------------+----------------+------------+----------------+-----------+----+
|               46155|         1|         46181.0|                NULL|      5478|  4413064|I'd like to check...| NULL|          NULL|         NULL|        NULL|            NULL|        NULL|            NULL|       NULL|NULL|
|               25969|         1|         25971.0|                NULL|      1756|  3184203|I am

                                                                                

In [34]:
top_10_by_views.select("Body").rdd.flatMap(lambda x: x).collect()

                                                                                

["I'd like to check if the user input is an email address in JavaScript, before sending it to a server or attempting to send an email to it, to prevent the most basic mistyping. How could I achieve this?",
 'I am trying to `INSERT INTO` a table using the input from another table. Although this is entirely feasible for many database engines, I always seem to struggle to remember the correct syntax for the `SQL` engine of the day ([MySQL](http://en.wikipedia.org/wiki/MySQL), [Oracle](http://en.wikipedia.org/wiki/Oracle_Database), [SQL Server](http://en.wikipedia.org/wiki/Microsoft_SQL_Server), [Informix](http://en.wikipedia.org/wiki/IBM_Informix), and [DB2](http://en.wikipedia.org/wiki/IBM_DB2)).',
 'How do I determine the size of my array in C? ',
 'I want to merge two dictionaries into a new dictionary.',
 'I have a JavaScript object. Is there a built-in or accepted best practice way to get the length of this object?',
 "I always thought Java uses . However, I read [a blog post](http:/

##### We are also interested in the highest scored posts in our subset.

In [35]:
top_10_by_score = subset_df.orderBy(col("Score").desc()).limit(10)
top_10_by_score.show()



+--------------------+----------+----------------+--------------------+----------+---------+--------------------+-----+--------------+-------------+------------+----------------+------------+----------------+-----------+----+
|                  Id|PostTypeId|AcceptedAnswerId|            ParentId|     Score|ViewCount|                Body|Title|ContentLicense|FavoriteCount|CreationDate|LastActivityDate|LastEditDate|LastEditorUserId|OwnerUserId|Tags|
+--------------------+----------+----------------+--------------------+----------+---------+--------------------+-----+--------------+-------------+------------+----------------+------------+----------------+-----------+----+
|[http://news.zdne...|1000000121|         2134207|00.htm](http://ne...|1000000121|  2134207|             00.htm)| NULL|          NULL|         NULL|        NULL|            NULL|        NULL|            NULL|       NULL|NULL|
|               40480|         1|            NULL|                NULL|      7645|  2574861|I al

                                                                                

In [36]:
top_10_by_score.select("Body").rdd.flatMap(lambda x: x).collect()

                                                                                

['00.htm)',
 "I always thought Java uses . However, I read [a blog post](http://javadude.com/articles/passbyvalue.htm) which claims that Java uses . I don't think I understand the distinction the author is making.",
 'What are the differences between these two and which one should I use?',
 'I want to merge two dictionaries into a new dictionary.',
 '> ',
 "I'd like to check if the user input is an email address in JavaScript, before sending it to a server or attempting to send an email to it, to prevent the most basic mistyping. How could I achieve this?",
 'Also, how do `LEFT OUTER JOIN`, `RIGHT OUTER JOIN`, and `FULL OUTER JOIN` fit in?',
 'What are the differences between a [HashMap](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/HashMap.html) and a [Hashtable](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/Hashtable.html) in Java?',
 'How do I `grep` and show the preceding and following 5 lines surrounding each matched line?',
 'How 

In [37]:
sc.stop()
spark.stop()