# Importing essential libraries

In [1]:
import pyspark
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql.functions import *

In [2]:
spark = (
    SparkSession.builder
                .appName("Stackoverflow")
                .config("spark.jars", "postgresql-42.2.8.jar") 
                .getOrCreate()
)

# Loading the StackOverflow questions, answers and users datasets with pyspark

In [3]:
questions = spark.read.csv(
            "questions.csv", 
            header=True, inferSchema=True)
    
answers = spark.read.csv(
            "answers.csv", 
            header=True, inferSchema=True)

users = spark.read.csv(
            "users.csv", 
            header=True, inferSchema=True)

---

# checking columns of each table

In [4]:
questions.columns

['id',
 'user_id',
 'title',
 'body',
 'accepted_answer_id',
 'score',
 'view_count',
 'comment_count',
 'created_at']

In [5]:
answers.columns

['id',
 'user_id',
 'question_id',
 'body',
 'score',
 'comment_count',
 'created_at']

In [6]:
users.columns

['id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'created_at',
 'updated_at']

---

# Renaming columns

In [7]:
questions = questions.withColumnRenamed('id', 'questions_id')
answers = answers.withColumnRenamed('id', 'answers_id')
users = users.withColumnRenamed('id', 'users_id')
users = users.withColumnRenamed('created_at', 'users_created_at')
questions = questions.withColumnRenamed('user_id', 'questions_user_id')
answers = answers.withColumnRenamed('user_id', 'answers_user_id')
answers = answers.withColumnRenamed('body', 'answers_body')
answers = answers.withColumnRenamed('score', 'answers_score')
answers = answers.withColumnRenamed('comment_count', 'answers_comment_count')
answers = answers.withColumnRenamed('created_at', 'answers_created_at')

# checking for some correct column rename changes

In [8]:
questions.select("questions_id", "questions_user_id", "created_at").show(10)

+--------------------+-----------------+----------+
|        questions_id|questions_user_id|created_at|
+--------------------+-----------------+----------+
|            54233315|          1118630|      null|
|<p>So in Python I...|             null|      null|
|<pre><code>for ph...|             null|      null|
|       </code></pre>|             null|      null|
|  <p>to do that.</p>|             null|      null|
|<p>But it gives m...|             null|      null|
|<pre><code>Syntax...|             null|      null|
|       </code></pre>|             null|      null|
|<p>And I changed ...|             null|      null|
|<pre><code>for ph...|             null|      null|
+--------------------+-----------------+----------+
only showing top 10 rows



In [9]:
answers.select("answers_id", "answers_user_id", "answers_created_at").show(10)

+--------------------+---------------+------------------+
|          answers_id|answers_user_id|answers_created_at|
+--------------------+---------------+------------------+
|            53999517|        1771994|              null|
|<p><div class=""s...|           null|              null|
|<div class=""snip...|           null|              null|
|<pre class=""snip...|           null|              null|
|  const quotes = ...|           null|              null|
|  const num = (Ma...|           null|              null|
|  return quotes[n...|           null|              null|
|                   }|           null|              null|
|document.querySel...|     () =&gt; {|              null|
|  document.queryS...|           null|              null|
+--------------------+---------------+------------------+
only showing top 10 rows



In [10]:
users.select("users_id", "location", "views").show(10)

+--------+--------------------+-----+
|users_id|            location|views|
+--------+--------------------+-----+
| 8357266|Bangalore, Karnat...|    8|
| 2602456|              Canada|    0|
| 8360277|Pennsylvania, Uni...|    0|
|10255746|                null|   18|
| 6504306|New Delhi, Delhi,...|   10|
| 8370096|                null|    3|
| 8370890|          日本 Tōkyō|    5|
| 4695700|                null|    2|
|10260660|                null|    2|
|10260743|Gharaunda, Haryan...| null|
+--------+--------------------+-----+
only showing top 10 rows



# filtering the data 

In [11]:
users = users.filter(users.location.isNotNull())

In [12]:
users.count()

133759

In [13]:
print(users.schema)

StructType(List(StructField(users_id,StringType,true),StructField(display_name,StringType,true),StructField(reputation,StringType,true),StructField(website_url,StringType,true),StructField(location,StringType,true),StructField(about_me,StringType,true),StructField(views,StringType,true),StructField(up_votes,StringType,true),StructField(down_votes,StringType,true),StructField(image_url,StringType,true),StructField(users_created_at,StringType,true),StructField(updated_at,StringType,true)))


# lowercasing the location column in users table

In [14]:
users = users.withColumn('location', lower(col('location')))

---

---

# TASKS

In [15]:
users.filter(users.location.contains("india")).count()

21824

# Choosing India as country of choice and removing unwanted locations

In [16]:
indiaData = users.filter(users.location.contains("india"))
indiaData = indiaData.withColumn("location", regexp_replace('location', r"[,]\s*\w*\s*[,]", ','))
indiaData.show(10)

+--------+------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+
|users_id|      display_name|reputation|         website_url|            location|            about_me|views|up_votes|down_votes|           image_url|   users_created_at|         updated_at|
+--------+------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+
| 8357266|            suryan|         7|https://twitter.c...|    bangalore, india|                null|    8|       0|         0|https://www.grava...|2017-07-24 10:55:23|2019-06-19 05:00:16|
| 6504306|             A.Raw|         4|                null|    new delhi, india|                null|   10|       0|         0|https://i.stack.i...|2016-06-23 12:58:03|2019-10-12 06:59:32|
|10260743|     Kartik Juneja|         3|     

---

# Extract the country and city into new columns and creating a new dataframe of city and country from location

In [17]:
indiaDataLocation = indiaData.withColumn('location', split(indiaData.location, ',')) \
  .select('users_id', 'display_name', 'views', 'reputation', 'updated_at', 'location', element_at(col('location'),-2).alias('city')
         , element_at(col('location'), -1).alias('country'))
indiaDataLocation.show()

+--------+------------------+-----+----------+-------------------+--------------------+---------------+-------+
|users_id|      display_name|views|reputation|         updated_at|            location|           city|country|
+--------+------------------+-----+----------+-------------------+--------------------+---------------+-------+
| 8357266|            suryan|    8|         7|2019-06-19 05:00:16| [bangalore,  india]|      bangalore|  india|
| 6504306|             A.Raw|   10|         4|2019-10-12 06:59:32| [new delhi,  india]|      new delhi|  india|
|10260743|     Kartik Juneja| null|         3|               null| [gharaunda,  india]|      gharaunda|  india|
| 4689205|            sd5869|    5|         1|2019-09-18 14:36:03| [new delhi,  india]|      new delhi|  india|
|10262756|           Ali Mir| null|         5|               null| [jalandhar,  india]|      jalandhar|  india|
|10262983|     Nikunj Hapani| null|         1|               null|     [surat,  india]|          surat| 

# Joining the above with the questions and only pick questions with at least 20 view_counts.

In [18]:
indiaDataLocation.registerTempTable('indiaDataLocation')
questions.registerTempTable('questions')

indiaTJoinquestions = spark.sql("SELECT * FROM indiaDataLocation LEFT JOIN questions ON (indiaDataLocation.users_id = questions.questions_user_id) WHERE questions.view_count >= 20")

In [19]:
indiaTJoinquestions.show(5)

+--------+------------+-----+----------+-------------------+-------------------+---------+-------+------------+-----------------+-----+----+------------------+-------+----------+-------------+----------+
|users_id|display_name|views|reputation|         updated_at|           location|     city|country|questions_id|questions_user_id|title|body|accepted_answer_id|  score|view_count|comment_count|created_at|
+--------+------------+-----+----------+-------------------+-------------------+---------+-------+------------+-----------------+-----+----+------------------+-------+----------+-------------+----------+
| 3273751| user3273751|    5|        16|2019-07-09 07:35:59|[bangalore,  india]|bangalore|  india|           R|          3273751|46609|6452|              3447|1810631|     45933|         6382|      3447|
+--------+------------+-----+----------+-------------------+-------------------+---------+-------+------------+-----------------+-----+----+------------------+-------+----------+------

In [20]:
indiaTJoinquestions.count()

1

In [21]:
indiaDataLocation.count()

21824

In [22]:
questions.count()

7150127

# Join the answers table to the results of joint above

In [23]:
results = indiaTJoinquestions.join(answers, indiaTJoinquestions.questions_id == answers.question_id, how='left')

In [24]:
results.columns

['users_id',
 'display_name',
 'views',
 'reputation',
 'updated_at',
 'location',
 'city',
 'country',
 'questions_id',
 'questions_user_id',
 'title',
 'body',
 'accepted_answer_id',
 'score',
 'view_count',
 'comment_count',
 'created_at',
 'answers_id',
 'answers_user_id',
 'question_id',
 'answers_body',
 'answers_score',
 'answers_comment_count',
 'answers_created_at']

In [25]:
results.count()

3

---

# minimum updated_at time in the results table

In [26]:
minimum_updated_at_time = results.select([min('updated_at')]).show()

+-------------------+
|    min(updated_at)|
+-------------------+
|2019-07-09 07:35:59|
+-------------------+



---

In [27]:
results.printSchema()

root
 |-- users_id: string (nullable = true)
 |-- display_name: string (nullable = true)
 |-- views: string (nullable = true)
 |-- reputation: string (nullable = true)
 |-- updated_at: string (nullable = true)
 |-- location: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- questions_id: string (nullable = true)
 |-- questions_user_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- accepted_answer_id: string (nullable = true)
 |-- score: string (nullable = true)
 |-- view_count: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- answers_id: string (nullable = true)
 |-- answers_user_id: string (nullable = true)
 |-- question_id: string (nullable = true)
 |-- answers_body: string (nullable = true)
 |-- answers_score: string (nullable = true)
 |-- answers_comment_coun

---

# Writing the results file to postgresql

In [33]:
results.write.format("jdbc").options(
    url='jdbc:postgresql://localhost/postgres',
    driver='org.postgresql.Driver',
    user='postgres',
    password='gOSPEL2215',
    dbtable='stackoverflow_filtered.results'
).save(mode='append')

---

# Difference between views and materialized views in postgressql

# Views
It is the result of a stored query
It can be accessed as a virtual table
It does not store data physically

# Materialized views
They are disc-stored views that can be refreshed
Their underlying query is not executed every time an access is required
Used when view query is slow, and you can’t tolerate the slowness