In [1]:
from string import punctuation, digits
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml import Pipeline
import pandas as pd

In [2]:
spark = (
    SparkSession.builder
        .appName("Stack Overflow Data Wrangling")
        .config("spark.jars",
    "/home/gifty_dovie/Documents/blossomData/jars/postgresql-42.2.8.jar")
        .getOrCreate()
)

In [3]:
questions = spark.read.format("jdbc").options(
    url='jdbc:postgresql://localhost:5432/stackoverflow',
    driver='org.postgresql.Driver',
    user='giftydovie',
    password='giftydovie',
    dbtable='stackoverflow.questions'
).load()

In [4]:
questions = questions\
    .withColumnRenamed('id', 'questions_id')\
    .withColumnRenamed('body', 'questions_body')\
    .withColumnRenamed('user_id', 'questions_user_id')\
    .withColumnRenamed('created_at', 'questions_created_at')\
    .withColumnRenamed('comment_count', 'questions_comment_count')\

In [5]:
questions.dtypes

[('questions_id', 'int'),
 ('questions_user_id', 'int'),
 ('title', 'string'),
 ('questions_body', 'string'),
 ('accepted_answer_id', 'int'),
 ('score', 'int'),
 ('view_count', 'int'),
 ('questions_comment_count', 'int'),
 ('questions_created_at', 'timestamp')]

In [6]:
answers = spark.read.format("jdbc").options(
    url='jdbc:postgresql://localhost:5432/stackoverflow',
    driver='org.postgresql.Driver',
    user='username',
    password='*******',
    dbtable='stackoverflow.answers'
).load()

In [7]:
answers.registerTempTable("answers")
answers = answers\
    .withColumnRenamed('id', 'answers_id')\
    .withColumnRenamed('body', 'answers_body')\
    .withColumnRenamed('user_id', 'answers_user_id')\
    .withColumnRenamed('question_id', 'answers_question_id')\
    .withColumnRenamed('score', 'answers_score')\
    .withColumnRenamed('comment_count', 'answers_comment_count')\
    .withColumnRenamed('created_at', 'answers_created_at')

In [8]:
answers.dtypes

[('answers_id', 'int'),
 ('answers_user_id', 'int'),
 ('answers_question_id', 'int'),
 ('answers_body', 'string'),
 ('answers_score', 'int'),
 ('answers_comment_count', 'int'),
 ('answers_created_at', 'timestamp')]

In [9]:
users = spark.read.format("jdbc").options(
    url='jdbc:postgresql://localhost:5432/stackoverflow',
    driver='org.postgresql.Driver',
    user='giftydovie',
    password='giftydovie',
    dbtable='stackoverflow.users'
).load()

In [10]:
users = users.withColumnRenamed('id', 'users_id')

In [11]:
users.dtypes

[('users_id', 'int'),
 ('display_name', 'string'),
 ('reputation', 'string'),
 ('website_url', 'string'),
 ('location', 'string'),
 ('about_me', 'string'),
 ('views', 'int'),
 ('up_votes', 'int'),
 ('down_votes', 'int'),
 ('image_url', 'string'),
 ('created_at', 'timestamp'),
 ('updated_at', 'timestamp')]

In [12]:
users.registerTempTable("users")

In [13]:
users_location = spark.sql("SELECT location, users_id, views, display_name, reputation, updated_at FROM users WHERE users.location = 'Geneva, Switzerland'")

In [14]:
users_location.show()

+-------------------+--------+-----+--------------------+----------+-------------------+
|           location|users_id|views|        display_name|reputation|         updated_at|
+-------------------+--------+-----+--------------------+----------+-------------------+
|Geneva, Switzerland|10899443|    6|Luis Manuel Quiro...|        23|2019-04-18 12:37:33|
|Geneva, Switzerland|10942263|    6|      Mehdi Mansouri|         3|2019-04-03 13:18:40|
|Geneva, Switzerland| 7379026|    0|    Anthony Levillon|         1|2019-10-11 15:24:48|
|Geneva, Switzerland|11516425|    2|Snezana Nektarijevic|         1|2019-10-11 12:45:40|
|Geneva, Switzerland|11185092|    3|                Fred|         1|2019-06-27 08:01:55|
|Geneva, Switzerland|10651242|   43|          hhaefliger|       297|2019-10-11 22:50:56|
|Geneva, Switzerland|11001751|   11|           Sebastian|        25|2019-08-03 09:01:22|
|Geneva, Switzerland| 3586678|   10|                 BCh|        60|2019-09-27 13:24:40|
|Geneva, Switzerland|

In [15]:
new_location = users_location\
    .withColumn('city', F.split(users_location['location'], ',')[0])\
    .withColumn('country', F.split(users_location['location'], ',')[1])

In [16]:
new_location.registerTempTable("new_location")
new_location = spark.sql("SELECT users_id, city, country, views, display_name, reputation, updated_at FROM new_location WHERE new_location.city = 'Geneva'")

In [17]:
new_location.show()

+--------+------+------------+-----+--------------------+----------+-------------------+
|users_id|  city|     country|views|        display_name|reputation|         updated_at|
+--------+------+------------+-----+--------------------+----------+-------------------+
|10942263|Geneva| Switzerland|    6|      Mehdi Mansouri|         3|2019-04-03 13:18:40|
|11001751|Geneva| Switzerland|   11|           Sebastian|        25|2019-08-03 09:01:22|
| 7379026|Geneva| Switzerland|    0|    Anthony Levillon|         1|2019-10-11 15:24:48|
|10899443|Geneva| Switzerland|    6|Luis Manuel Quiro...|        23|2019-04-18 12:37:33|
|11185092|Geneva| Switzerland|    3|                Fred|         1|2019-06-27 08:01:55|
|11516425|Geneva| Switzerland|    2|Snezana Nektarijevic|         1|2019-10-11 12:45:40|
| 8194488|Geneva| Switzerland|   12|              Jiizen|        26|2019-10-12 08:18:32|
| 3586678|Geneva| Switzerland|   10|                 BCh|        60|2019-09-27 13:24:40|
| 4063669|Geneva| Swi

In [18]:
questions.registerTempTable("questions")

In [19]:
newUserLocation_questions = questions.join(new_location, questions.questions_user_id == new_location.users_id)

In [20]:
newUserLocation_questions.show()

+------------+-----------------+--------------------+--------------------+------------------+-----+----------+-----------------------+--------------------+--------+------+------------+-----+--------------------+----------+-------------------+
|questions_id|questions_user_id|               title|      questions_body|accepted_answer_id|score|view_count|questions_comment_count|questions_created_at|users_id|  city|     country|views|        display_name|reputation|         updated_at|
+------------+-----------------+--------------------+--------------------+------------------+-----+----------+-----------------------+--------------------+--------+------+------------+-----+--------------------+----------+-------------------+
|    55257298|           993592|Pinning icons to ...|<p>I have a video...|          55257852|    1|        68|                      0| 2019-03-20 09:20:42|  993592|Geneva| Switzerland|  325|          ardochhigh|      3204|2019-10-12 07:30:23|
|    55764766|           993

In [21]:
newUserLocation_questions.registerTempTable("newUserLocation_questions")
questions_viewCount = spark.sql("SELECT * FROM newUserLocation_questions WHERE view_count >= 20")

In [22]:
questions_viewCount.show()

+------------+-----------------+--------------------+--------------------+------------------+-----+----------+-----------------------+--------------------+--------+------+------------+-----+--------------------+----------+-------------------+
|questions_id|questions_user_id|               title|      questions_body|accepted_answer_id|score|view_count|questions_comment_count|questions_created_at|users_id|  city|     country|views|        display_name|reputation|         updated_at|
+------------+-----------------+--------------------+--------------------+------------------+-----+----------+-----------------------+--------------------+--------+------+------------+-----+--------------------+----------+-------------------+
|    55257298|           993592|Pinning icons to ...|<p>I have a video...|          55257852|    1|        68|                      0| 2019-03-20 09:20:42|  993592|Geneva| Switzerland|  325|          ardochhigh|      3204|2019-10-12 07:30:23|
|    55764766|           993

In [23]:
answers.registerTempTable("answers")
get_answers = spark.sql("SELECT * FROM answers")

In [24]:
get_answers.show()

+----------+---------------+-------------------+--------------------+-------------+---------------------+-------------------+
|answers_id|answers_user_id|answers_question_id|        answers_body|answers_score|answers_comment_count| answers_created_at|
+----------+---------------+-------------------+--------------------+-------------+---------------------+-------------------+
|  55729762|       10212443|           55713573|<p>Here is the co...|            0|                    0|2019-04-17 14:20:44|
|  55748103|        1177119|           55617273|<p>Here's an alte...|            0|                    1|2019-04-18 14:24:30|
|  55730420|       10545883|           55730331|<p>zip is just a ...|            0|                    0|2019-04-17 14:49:25|
|  55800049|        3233363|           55799087|<p>When a date co...|            0|                    2|2019-04-22 19:31:36|
|  55723275|        5157454|           55722988|<p>With lodash yo...|            0|                    0|2019-04-17 08

In [25]:
questions_viewCount.registerTempTable("questions_viewCount")
get_answers.registerTempTable("get_answers")

In [26]:
more_join = get_answers.join(questions_viewCount, get_answers.answers_question_id == questions_viewCount.questions_id)

In [27]:
more_join.dtypes

[('answers_id', 'int'),
 ('answers_user_id', 'int'),
 ('answers_question_id', 'int'),
 ('answers_body', 'string'),
 ('answers_score', 'int'),
 ('answers_comment_count', 'int'),
 ('answers_created_at', 'timestamp'),
 ('questions_id', 'int'),
 ('questions_user_id', 'int'),
 ('title', 'string'),
 ('questions_body', 'string'),
 ('accepted_answer_id', 'int'),
 ('score', 'int'),
 ('view_count', 'int'),
 ('questions_comment_count', 'int'),
 ('questions_created_at', 'timestamp'),
 ('users_id', 'int'),
 ('city', 'string'),
 ('country', 'string'),
 ('views', 'int'),
 ('display_name', 'string'),
 ('reputation', 'string'),
 ('updated_at', 'timestamp')]

In [28]:
more_join.count()

98

In [29]:
more_join.registerTempTable('find_time')
find_time = spark.sql("SELECT min(updated_at) FROM find_time")

In [30]:
find_time.show()

+-------------------+
|    min(updated_at)|
+-------------------+
|2019-04-03 13:18:40|
+-------------------+



In [None]:
more_join.write.format("jdbc").options(
    url='jdbc:postgresql://localhost:5432/stackoverflow',
    driver='org.postgresql.Driver',
    user='username',
    password='*******',
    dbtable='stackoverflow_filtered.results'
).save(mode='append')

# Difference Between Views and Materialized Views

Views are virtual projections created as a result of a query expression. They are not stored physically on a disk thus, do not require memory space. Material view are a non-virtual schema, that is, they are a physical copy of the base table and are stored physically on a disk, thus, utilizing memory space.

Views have slow processing speed while that of materialized view is fast.

