In [2]:
#IMPORTING RELEVANT PACKAGES

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
from pyspark.sql import Window
from datetime import datetime
import re
from pyspark.sql.functions import regexp_extract  

In [29]:
spark = (
    SparkSession.builder
                .appName("Stack Overflow Data Wrangling")
                .getOrCreate()
)

In [4]:
#LOADING OUR DATASETS

questions = spark.read.csv("/home/eddie/Desktop/Blossom_Eddie/Stackoverflow/questions.csv", header=True,
                          inferSchema=True, escape='"', multiLine=True)

answers = spark.read.csv("/home/eddie/Desktop/Blossom_Eddie/Stackoverflow/answers.csv", header=True,
                          inferSchema=True, escape='"', multiLine=True)

users = spark.read.csv("/home/eddie/Desktop/Blossom_Eddie/Stackoverflow/users.csv", header=True,
                          inferSchema=True, escape='"', multiLine=True)

In [5]:
#LOOPING THROUGH QUESTION DATASET AND RENAMING COLUMNS ADDING ('_questions')
for questions_col in questions.columns:
    if questions_col in answers.columns + users.columns:
        questions = questions.withColumnRenamed(questions_col, questions_col + '_' + 'questions')

In [6]:
questions.select('id_questions').show()

+------------+
|id_questions|
+------------+
|    54233315|
|    54233145|
|    54233331|
|    54233149|
|    54233337|
|    54233360|
|    54233254|
|    54233264|
|    54243261|
|    54313205|
|    54233168|
|    54233170|
|    54233177|
|    54233196|
|    54233211|
|    54233218|
|    54233270|
|    54027043|
|    54233393|
|    54233398|
+------------+
only showing top 20 rows



In [7]:
#LOOPING THROUGH ANSWERS DATASET AND RENAMING COLUMNS ADDING ('_answers')
for answers_col in answers.columns:
    if answers_col in questions.columns + users.columns:
        answers = answers.withColumnRenamed(answers_col, answers_col + '_' + 'answers')

In [8]:
#LOOPING THROUGH USERS DATASET AND RENAMING COLUMNS ADDING ('_users')
for users_col in users.columns:
    if users_col in answers.columns + questions.columns:
        users = users.withColumnRenamed(users_col, users_col + '_' + 'users')

In [9]:
#SHOWING THE LOCATION COLUMN
users.select('location').show(50, truncate=False)

+----------------------------------------------------+
|location                                            |
+----------------------------------------------------+
|Bangalore, Karnataka, India                         |
|Canada                                              |
|Pennsylvania, United States                         |
|null                                                |
|New Delhi, Delhi, India                             |
|null                                                |
|日本 Tōkyō                                          |
|null                                                |
|null                                                |
|Gharaunda, Haryana, India                           |
|null                                                |
|Passo Fundo, RS, Brasil                             |
|null                                                |
|New Delhi, India                                    |
|null                                                |
|Patras, Gre

In [10]:
#FILTERING LOCATION COLUMN FROM THE USERS DATASET CONTAINING 'NETHERLANDS'
users = users.filter(users.location.contains('Netherlands')); users.show()

+--------+--------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+
|      id|        display_name|reputation|         website_url|            location|            about_me|views|up_votes|down_votes|           image_url|         created_at|         updated_at|
+--------+--------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+
| 4699799|        Niels Albers|         1|                null|Driebergen, Drieb...|                null|    1|       0|         0|https://graph.fac...|2015-03-22 13:13:32|2019-10-01 13:18:23|
| 4688933|         Joe Vrolijk|         7|                null|Amsterdam, Nether...|                null|    8|       0|         0|https://lh5.googl...|2015-03-19 09:06:50|2019-10-12 10:14:27|
| 8383926|       kumar navneet|    

In [11]:
#SPLITTING THE LOCATION COLUMN INTO 2 NEW COLUMNS 'city' & 'country'

users = users.withColumn('city', F.split('location', ',')[0])
users = users.withColumn('country', F.split('location', ',')[1])

In [12]:
#JOINING OUR NEW USERS DATASET WITH QUESTIONS
joint = users.join(questions, questions.user_id_questions == users.id, how = 'inner'); joint.show()

+--------+--------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+---------------+----------------+------------+-----------------+--------------------+--------------------+------------------+---------------+----------+-----------------------+--------------------+
|      id|        display_name|reputation|         website_url|            location|            about_me|views|up_votes|down_votes|           image_url|         created_at|         updated_at|           city|         country|id_questions|user_id_questions|               title|      body_questions|accepted_answer_id|score_questions|view_count|comment_count_questions|created_at_questions|
+--------+--------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+---------------+------------

In [13]:
#CHECKING THE TOTAL NUMBER OF NULL VALUES IN THE 'view_count' COLUMN
joint.select('view_count').filter(joint.view_count.isNull()).count()

0

In [14]:
#CHECKING WHAT THE COLUMN LOOKS LIKE
joint.select('view_count').show()

+----------+
|view_count|
+----------+
|        46|
|       101|
|       489|
|        41|
|        32|
|       594|
|       131|
|      8303|
|       358|
|        39|
|       691|
|       108|
|       135|
|        50|
|        57|
|       392|
|      1554|
|        39|
|       931|
|        96|
+----------+
only showing top 20 rows



In [15]:
#FILTERING THE DATASET BY 'view_count' greater than or equal to 20
joint = joint.filter(col('view_count') >= 20)
joint.show()

+--------+--------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+---------------+----------------+------------+-----------------+--------------------+--------------------+------------------+---------------+----------+-----------------------+--------------------+
|      id|        display_name|reputation|         website_url|            location|            about_me|views|up_votes|down_votes|           image_url|         created_at|         updated_at|           city|         country|id_questions|user_id_questions|               title|      body_questions|accepted_answer_id|score_questions|view_count|comment_count_questions|created_at_questions|
+--------+--------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+---------------+------------

In [16]:
joint.select('updated_at').filter(joint.updated_at.isNotNull()).show(truncate=False)

+-------------------+
|updated_at         |
+-------------------+
|2019-10-10 07:57:22|
|2019-04-15 15:21:54|
|2019-10-04 21:42:18|
|2019-09-24 15:18:58|
|2019-01-10 09:15:31|
|2019-01-21 21:09:45|
|2019-10-08 09:04:34|
|2019-10-12 22:00:20|
|2019-10-12 15:06:42|
|2019-10-03 09:30:02|
|2019-09-24 06:30:48|
|2019-10-09 10:01:16|
|2019-10-12 15:50:37|
|2019-08-01 18:26:08|
|2019-10-12 06:35:21|
|2019-10-10 06:00:05|
|2019-01-20 18:42:56|
|2019-10-13 01:51:07|
|2019-09-01 06:08:43|
|2019-10-08 11:17:30|
+-------------------+
only showing top 20 rows



In [17]:
#CHECKING THE ANSWERS DATASET
answers.select('id_answers', 'user_id', 'question_id').show()

+----------+--------+-----------+
|id_answers| user_id|question_id|
+----------+--------+-----------+
|  53999517| 1771994|   53999275|
|  54005064|  948762|   54004882|
|  53995281| 5159168|   53995029|
|  54000208| 7964527|   54000128|
|  54005110| 9653876|   54003879|
|  53993534|10791196|   53993516|
|  54000209| 5195552|   53998312|
|  54000210| 8968602|   54000125|
|  54005119| 9832922|   54004920|
|  54005157| 6654100|   54004924|
|  54000215|10676716|   54000129|
|  54005202|10834092|   53998422|
|  54002921| 6387236|   54002865|
|  54052820| 3187033|   54052755|
|  54005239| 5173426|   54004875|
|  53999237| 2275482|   53999149|
|  54005286| 7515411|   54004875|
|  53995359|  539060|   53994232|
|  54000258| 1748716|   54000133|
|  53999338| 2390992|   53999153|
+----------+--------+-----------+
only showing top 20 rows



In [18]:
joint.columns

['id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'created_at',
 'updated_at',
 'city',
 'country',
 'id_questions',
 'user_id_questions',
 'title',
 'body_questions',
 'accepted_answer_id',
 'score_questions',
 'view_count',
 'comment_count_questions',
 'created_at_questions']

In [19]:
#JOINING THE NEW DATA SET WITH THE ANSWERS DATASET
joints = joint.join(answers, joint.id_questions == answers.question_id, how='inner'); joints.show()

+--------+-----------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+-----------+------------+------------+-----------------+--------------------+--------------------+------------------+---------------+----------+-----------------------+--------------------+----------+--------+-----------+--------------------+-----+-------------+-------------------+
|      id|     display_name|reputation|         website_url|            location|            about_me|views|up_votes|down_votes|           image_url|         created_at|         updated_at|       city|     country|id_questions|user_id_questions|               title|      body_questions|accepted_answer_id|score_questions|view_count|comment_count_questions|created_at_questions|id_answers| user_id|question_id|                body|score|comment_count| created_at_answers|
+--------+-----------------+----------+-----------------

In [20]:
joints.select('updated_at').filter(joints.updated_at.isNotNull()).show()

+-------------------+
|         updated_at|
+-------------------+
|2019-10-10 17:18:18|
|2019-10-10 17:18:18|
|2019-10-10 17:18:18|
|2019-09-26 10:57:47|
|2019-09-26 10:57:47|
|2019-09-26 10:57:47|
|2019-10-12 18:19:02|
|2019-10-12 18:19:02|
|2019-10-02 09:35:20|
|2019-10-02 09:35:20|
|2019-10-11 09:43:06|
|2019-10-11 09:43:06|
|2019-10-10 17:18:18|
|2019-10-10 17:18:18|
|2019-08-27 12:54:11|
|2019-08-27 12:54:11|
|2019-08-27 12:54:11|
|2019-08-27 12:54:11|
|2019-10-08 10:36:30|
|2019-10-08 10:36:30|
+-------------------+
only showing top 20 rows



In [21]:
#RETURNING THE MINIMUM 'updated_at' DATE
new = joints.select(F.min('updated_at')); new.show()

+-------------------+
|    min(updated_at)|
+-------------------+
|2019-01-10 09:15:31|
+-------------------+



In [22]:
joints.dtypes

[('id', 'int'),
 ('display_name', 'string'),
 ('reputation', 'int'),
 ('website_url', 'string'),
 ('location', 'string'),
 ('about_me', 'string'),
 ('views', 'int'),
 ('up_votes', 'int'),
 ('down_votes', 'int'),
 ('image_url', 'string'),
 ('created_at', 'string'),
 ('updated_at', 'string'),
 ('city', 'string'),
 ('country', 'string'),
 ('id_questions', 'int'),
 ('user_id_questions', 'int'),
 ('title', 'string'),
 ('body_questions', 'string'),
 ('accepted_answer_id', 'int'),
 ('score_questions', 'int'),
 ('view_count', 'int'),
 ('comment_count_questions', 'int'),
 ('created_at_questions', 'string'),
 ('id_answers', 'int'),
 ('user_id', 'int'),
 ('question_id', 'int'),
 ('body', 'string'),
 ('score', 'int'),
 ('comment_count', 'int'),
 ('created_at_answers', 'string')]

In [31]:
#LOADING THE FINAL DATASET TO DBEAVER USING POSTGRESQL
joints.write.format("jdbc").options(
    url='jdbc:postgresql://localhost/postgres',
    driver='org.postgresql.Driver',
    user='postgres',
    password='postgres',
    dbtable='stackoverflow_filtered.results'
).save(mode='append')

In [24]:
questions.columns

['id_questions',
 'user_id_questions',
 'title',
 'body_questions',
 'accepted_answer_id',
 'score_questions',
 'view_count',
 'comment_count_questions',
 'created_at_questions']

In [25]:
answers.columns

['id_answers',
 'user_id',
 'question_id',
 'body',
 'score',
 'comment_count',
 'created_at_answers']

In [26]:
users.columns

['id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'created_at',
 'updated_at',
 'city',
 'country']