# Project4 - Basic End To End EXTRACT TRANSFORM LOAD(ETL) Pipeline

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
import pandas as pd

In [2]:
spark = (
    SparkSession.builder
                .appName('Stack Overflow Data Wrangling')
                .config('spark.jars','/Users/nanayaw/Desktop/blossom/blossomenv/project_4_ETL/jars/postgresql-42.2.8.jar')
                .getOrCreate()
)

# Load data into DataFrame

In [3]:
spark = SparkSession.builder.getOrCreate()
questions = spark.read.csv("questions.csv", header=True, inferSchema=True, escape='"', multiLine=True)
answers = spark.read.csv("answers.csv" , header=True, inferSchema=True, escape='"', multiLine=True)
users = spark.read.csv("users.csv", header=True, inferSchema=True, escape='"', multiLine=True)

# Rename Columns of dataframe

In [4]:
questions = questions.withColumnRenamed('id', 'questions_id')
answers = answers.withColumnRenamed('id', 'answers_id')
users = users.withColumnRenamed('id', 'users_id')
questions = questions.withColumnRenamed('created_at', 'questions_created_at')
answers = answers.withColumnRenamed('created_at', 'answers_created_at')
users = users.withColumnRenamed('created_at', 'users_created_at')
questions = questions.withColumnRenamed( 'body','questions_body')
answers = answers.withColumnRenamed( 'body','answers_body')
questions = questions.withColumnRenamed('user_id', 'questions_user_id')
answers = answers.withColumnRenamed('user_id', 'answers_user_id')
answers = answers.withColumnRenamed('question_id', 'answers_question_id')
questions = questions.withColumnRenamed( 'score','questions_score')
answers = answers.withColumnRenamed( 'score','answers_score')
questions = questions.withColumnRenamed( 'comment_count','questions_comment_count')
answers = answers.withColumnRenamed( 'comment_count','answers_comment_count')

# Select from User table where location is India

In [5]:
users.registerTempTable('n_users')
n_users = spark.sql("select * from n_users where location like '%India'")

# Remove middle towns from location columns with regular expression and split city from country

In [6]:
n_users = n_users.withColumn("location", F.regexp_replace('location', r"[,]\s*\w*\s*[,]", ','))

In [7]:
n_users_updated = n_users.withColumn('location', F.split(n_users.location, ',')).select('users_id', 'display_name', 'reputation', 'website_url', 
                                    'location', 'about_me', 'views', 'up_votes', 
                                    'down_votes', 'image_url', 'users_created_at', 
                                    'updated_at', F.element_at(F.col('location'),-2).alias('city')
        , F.element_at(F.col('location'), -1).alias('country'))

In [8]:
n_users_updated.select('country','city').show(60)

+-------+---------------+
|country|           city|
+-------+---------------+
|  India|      Bangalore|
|  India|      New Delhi|
|  India|      Gharaunda|
|  India|      New Delhi|
|  India|      Jalandhar|
|  India|          Surat|
|  India|    West Bengal|
|  India|           Pune|
|  India|         Mumbai|
|  India|      Bangalore|
|  India|         Mumbai|
|  India|      Bangalore|
|  India|         Mumbai|
|  India|      Hyderabad|
|  India| Madhya Pradesh|
|  India|           null|
|  India|      Bangalore|
|  India|    Naya Raipur|
|  India|      Bangalore|
|  India|     Tamil Nadu|
|  India|      Bangalore|
|  India|           Pune|
|  India|           Pune|
|  India|          Delhi|
|  India|      Bangalore|
|  India|         Mumbai|
|  India|    West Bengal|
|  India|      Bangalore|
|  India|     Tamil Nadu|
|  India|      Bangalore|
|  India|          Surat|
|  India|      Hyderabad|
|  India|      Hyderabad|
|  India|    West Bengal|
|  India|  Uttar Pradesh|
|  India|   

# Join users table and append with city and country to questions table

In [9]:
n_users_questions = n_users_updated.join(questions,n_users_updated.users_id == questions.questions_user_id)

# Filter new joint table where view_count column is 20 or more

In [10]:
n_users_questions.registerTempTable('n_users_questions_tmp')
n_users_questions_tmp = spark.sql('select * from n_users_questions_tmp where view_count > 20 ')

In [11]:
results = n_users_questions_tmp.join(answers, n_users_questions_tmp.users_id == answers.answers_user_id)

In [12]:
results.dtypes

[('users_id', 'int'),
 ('display_name', 'string'),
 ('reputation', 'int'),
 ('website_url', 'string'),
 ('location', 'array<string>'),
 ('about_me', 'string'),
 ('views', 'int'),
 ('up_votes', 'int'),
 ('down_votes', 'int'),
 ('image_url', 'string'),
 ('users_created_at', 'timestamp'),
 ('updated_at', 'timestamp'),
 ('city', 'string'),
 ('country', 'string'),
 ('questions_id', 'int'),
 ('questions_user_id', 'int'),
 ('title', 'string'),
 ('questions_body', 'string'),
 ('accepted_answer_id', 'int'),
 ('questions_score', 'int'),
 ('view_count', 'int'),
 ('questions_comment_count', 'int'),
 ('questions_created_at', 'timestamp'),
 ('answers_id', 'int'),
 ('answers_user_id', 'int'),
 ('answers_question_id', 'int'),
 ('answers_body', 'string'),
 ('answers_score', 'int'),
 ('answers_comment_count', 'int'),
 ('answers_created_at', 'timestamp')]

# Select minimum time from updated_at column

In [13]:
results.registerTempTable('tmp_results')
tmp_results = spark.sql('select min(updated_at) from tmp_results')

In [14]:
tmp_results.show()

+-------------------+
|    min(updated_at)|
+-------------------+
|2019-01-11 05:02:30|
+-------------------+



# Dataframe to results table in Schema stackoverflow_filtered

In [15]:
results.write.format("jdbc").options(
   url='jdbc:postgresql://localhost:5434/blossom',
   driver='org.postgresql.Driver',
   user='Admin',
   password='admin',
   dbtable='stackoverflow_filtered.results'
).save(mode='append')

DIFFERENCES BETWEEN VIEWS AND MATERIALIZED VIEWS:¶
- The difference between a View and a Materialized View is that, views are not stored physically on a disk. While, Materialized views are stored on a disc.
- View can be defined as a virtual table created as a result of the query expression. Whereby, Materialized view is a physical copy or snapshot of the base table.
- A view is updated always as the query creating view executes each time the view is used. Whereby a Materialized View is updated manually or by applying triggers to it.
- Materialized View responds faster than View as the Materialized View is precomputed.
- Materialized View utilizes the memory space as it is stored on the disk whereby, the View is just a display therefore it does not require memory space.

4. SQL DATA MANIPULATION

1. 142 cities appeared more than twice in my results table
2. There are 1,957 unique created_at dates in my results table
3. I will give an award to user name 'Bhuvanesh BS' because he has the highest answer_score of 85 from the results table