# BASIC END TO END EXTRACT TRANSFORM LOAD(ETL) PIPELINE

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd

In [3]:
spark = (
    SparkSession.builder
                .appName("Stack Overflow Data Wrangling")
                .config("spark.jars","C:\\Users\\Z\\Documents\\blossom\\postgresql-42.2.8.jar") 
                .getOrCreate()
)

##                      Load data into DataFrame

In [4]:
#spark = SparkSession.builder.getOrCreate()
questions = spark.read.csv("questions.csv", header=True, inferSchema=True, escape='"', multiLine=True)
answers = spark.read.csv("answers.csv" , header=True, inferSchema=True, escape='"', multiLine=True)
users = spark.read.csv("users.csv", header=True, inferSchema=True, escape='"', multiLine=True)

##  Rename Columns of dataframe to   prevent Ambiguity

In [5]:
questions = questions.withColumnRenamed('id', 'questions_id')
answers = answers.withColumnRenamed('id', 'answers_id')
users = users.withColumnRenamed('id', 'users_id')
questions = questions.withColumnRenamed('created_at', 'questions_created_at')
answers = answers.withColumnRenamed('created_at', 'answers_created_at')
users = users.withColumnRenamed('created_at', 'users_created_at')
questions = questions.withColumnRenamed( 'body','questions_body')
answers = answers.withColumnRenamed( 'body','answers_body')
questions = questions.withColumnRenamed('user_id', 'questions_user_id')
answers = answers.withColumnRenamed('user_id', 'answers_user_id')
answers = answers.withColumnRenamed('question_id', 'answers_question_id')
questions = questions.withColumnRenamed( 'score','questions_score')
answers = answers.withColumnRenamed( 'score','answers_score')
questions = questions.withColumnRenamed( 'comment_count','questions_comment_count')
answers = answers.withColumnRenamed( 'comment_count','answers_comment_count')

### Select from User table where location is "India"

In [6]:
users.registerTempTable('new_users')
new_users = spark.sql("select * from new_users where location like '%India%' ")


### Remove middle towns from location columns with regular expression and split city from country

In [7]:
new_users = new_users.withColumn("location", F.regexp_replace('location', r"[,]\s*\w*\s*[,]", ','))


In [8]:
new_users_updated = new_users.withColumn('location', F.split(new_users.location, ',')).select('users_id', 'display_name', 'reputation', 'website_url', 
                                    'location', 'about_me', 'views', 'up_votes', 
                                    'down_votes', 'image_url', 'users_created_at', 
                                    'updated_at', F.element_at(F.col('location'),-2).alias('city')
        , F.element_at(F.col('location'), -1).alias('country'))


In [9]:
new_users_updated.select('city','country').show(100)

+------------------+-------+
|              city|country|
+------------------+-------+
|         Bangalore|  India|
|         Jalandhar|  India|
|    Madhya Pradesh|  India|
|        Tamil Nadu|  India|
|       West Bengal|  India|
|         Bangalore|  India|
|              Pune|  India|
|       Maharashtra|  India|
|        Tamil Nadu|  India|
|        Tamil Nadu|  India|
|         Bengaluru|  India|
|        Tamil Nadu|  India|
|             Delhi|  India|
|        Tamil Nadu|  India|
|              Pune|  India|
|        Tamil Nadu|  India|
|            Mumbai|  India|
|              Pune|  India|
|    Madhya Pradesh|  India|
|        Tamil Nadu|  India|
|         Bangalore|  India|
|         Ahmedabad|  India|
|         Hyderabad|  India|
|        Tamil Nadu|  India|
|            Mumbai|  India|
|        Tamil Nadu|  India|
|         Bangalore|  India|
|         Ahmedabad|  India|
|     Uttar Pradesh|  India|
|       Gandhinagar|  India|
|         New Delhi|  India|
|        Tamil

### Join  users table appended with city and country to questions table

In [10]:
new_users_questions = new_users_updated.join(questions, new_users_updated.users_id == questions.questions_user_id)

### Filter new joint table where view_count column is 20 or more

In [11]:
new_users_questions.registerTempTable('new_users_questions_temp')
new_users_questions_temp = spark.sql("select * from new_users_questions_temp where view_count > 20")

In [12]:
results = new_users_questions_temp.join(answers, new_users_questions_temp.users_id == answers.answers_user_id)

In [13]:
results.dtypes

[('users_id', 'int'),
 ('display_name', 'string'),
 ('reputation', 'int'),
 ('website_url', 'string'),
 ('location', 'array<string>'),
 ('about_me', 'string'),
 ('views', 'int'),
 ('up_votes', 'int'),
 ('down_votes', 'int'),
 ('image_url', 'string'),
 ('users_created_at', 'timestamp'),
 ('updated_at', 'timestamp'),
 ('city', 'string'),
 ('country', 'string'),
 ('questions_id', 'int'),
 ('questions_user_id', 'int'),
 ('title', 'string'),
 ('questions_body', 'string'),
 ('accepted_answer_id', 'int'),
 ('questions_score', 'int'),
 ('view_count', 'int'),
 ('questions_comment_count', 'int'),
 ('questions_created_at', 'timestamp'),
 ('answers_id', 'int'),
 ('answers_user_id', 'int'),
 ('answers_question_id', 'int'),
 ('answers_body', 'string'),
 ('answers_score', 'int'),
 ('answers_comment_count', 'int'),
 ('answers_created_at', 'timestamp')]

### Select minimum time from updated_at column

In [14]:
results.registerTempTable('temp_results')
temp_results = spark.sql("select min(updated_at) from temp_results")

In [15]:
temp_results.show()

+-------------------+
|    min(updated_at)|
+-------------------+
|2019-02-02 02:07:00|
+-------------------+



### Write dataframe to results table in  Schema stackoverflow_filtered  

In [19]:
results.write.format("jdbc").options(
   url='jdbc:postgresql://localhost:5432/postgres',
   driver='org.postgresql.Driver',
   user='postgres',
   password='postgres',
   dbtable='stackoverflow_filtered.results'
).save(mode='append')

### DIFFERENCES BETWEEN VIEWS AND MATERIALIZED VIEWS:

#### -  The basic difference between View and Materialized View is that Views are not stored physically on the disk.  On the other hands, Materialized Views are stored on the disc.

#### -  View can be defined as a virtual table created as a result of the query expression. However, Materialized View is a physical copy, picture or snapshot of the base table.

#### -  A view is always updated as the query creating View executes each time the View is used. On the other hands, Materialized View is updated manually or by applying triggers to it.

#### -  Materialized View responds faster than View as the Materialized View is precomputed.

#### -  Materialized View utilizes the memory space as it stored on the disk whereas, the View is just a display hence it do not require memory space.

## 4. SQL DATA MANIPULATION

### 1.  142 cities appeared more than twice in my results table
### 2.  There are 1,957 unique created_at dates in my results table
### 3. I will give an award to user name 'Bhuvanesh BS' because he has the highest answer_score of 85 from the results table