# <font color='blue'>Project 3 - Data Manipulation  with Apache Spark and Postgres</font>

## <font color='purple'>Import all Dependencies</font>

In [1]:
import pyspark
from pyspark.sql.functions import split,col, explode, regexp_replace
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql import SQLContext


## <font color='red'>Initiate Spark Session</font>

In [2]:
spark = (
    SparkSession.builder
                .appName("Stack Overflow Data Wrangling")
                .config("spark.jars", "postgresql-42.2.8.jar") 
                .getOrCreate()
)


## <font color='green'>Read Questions Dataset</font>

In [4]:
questions = spark.read.csv(
    "/home/priscilla/Desktop/blossom_academy/stackoverflow/questions.csv", header=True, inferSchema=True, multiLine=True, escape='"')

## <font color='brown'>Read Users Dataset</font>

In [6]:
users = spark.read.csv(
    "/home/priscilla/Desktop/blossom_academy/stackoverflow/users.csv", header=True, inferSchema=True, multiLine=True, escape='"')

## <font color='teal'>Rename Columns in Dataset</font>

In [7]:
questions = questions.withColumnRenamed('id', 'question_id') 
users = users.withColumnRenamed('id', 'user_id') 
questions = questions.withColumnRenamed('created_at', 'question_created_at') 


## <font color='amber'>Split "Location" Column into Country, City, State</font>

In [8]:
users = users.withColumn('country', split(users['location'], ',')[2])
users = users.withColumn('state', split(users['location'],',')[1])
users = users.withColumn('city', split(users['location'], ',')[0])

## <font color='teal'>Filter Country by selecting India</font>

In [13]:
users_india = users.filter(users['country'] == 'India')
users_india.show()

+--------+-----------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+-------+-----------+--------------------+
| user_id|     display_name|reputation|         website_url|            location|            about_me|views|up_votes|down_votes|           image_url|         created_at|         updated_at|country|      state|                city|
+--------+-----------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+-------+-----------+--------------------+
| 2685825| Akshay Champavat|        33|                null|Ahmedabad ,Gujara...|<p>-Total 5 years...|   35|       8|         0|https://i.stack.i...|2013-08-15 12:25:17|2019-09-24 12:17:00|  India|    Gujarat|          Ahmedabad |
| 4845514|         Dhinakar|        68|                null|chennai,Tamil na

## <font color='red'>Filter views greater than 20</font>

In [14]:
greater_questions = questions.filter(questions.view_count >= 20)
greater_questions.show()

+-----------+--------+--------------------+--------------------+------------------+-----+----------+-------------+-------------------+
|question_id| user_id|               title|                body|accepted_answer_id|score|view_count|comment_count|question_created_at|
+-----------+--------+--------------------+--------------------+------------------+-----+----------+-------------+-------------------+
|   54233315| 1118630|XPath parent node...|<p>I'm trying to ...|          54233368|    1|       134|            4|2019-01-17 09:59:47|
|   54233145| 7984274| Is this a java BUG?|<p>why the follow...|          54234312|   -2|       132|            3|2019-01-17 09:50:12|
|   54233331| 1877002|Different results...|<p>I am new to li...|          54233375|   -1|        26|            0|2019-01-17 10:00:17|
|   54233149|10927076|Using eval as pro...|<p>I know there a...|          54233257|    1|        49|            2|2019-01-17 09:50:30|
|   54233337| 8171766|Can't run ng serv...|<p>So I am t

## <font color='peach'>Join "greater_questions" && "users_india"</font>

In [15]:
a = users_india.join(greater_questions,'user_id')
a.show()

+--------+-----------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+-------+----------+--------------------+-----------+--------------------+--------------------+------------------+-----+----------+-------------+-------------------+
| user_id|     display_name|reputation|         website_url|            location|            about_me|views|up_votes|down_votes|           image_url|         created_at|         updated_at|country|     state|                city|question_id|               title|                body|accepted_answer_id|score|view_count|comment_count|question_created_at|
+--------+-----------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+-------+----------+--------------------+-----------+--------------------+--------------------+---------

## <font color='turquoise'>Create a cre</font>

In [17]:
a.registerTempTable('joined_df')
spark.sql("SELECT min(updated_at) from joined_df").show()


+-------------------+
|    min(updated_at)|
+-------------------+
|2019-03-27 09:55:22|
+-------------------+



In [19]:
a.write.format("jdbc").options(
    url='jdbc:postgresql://localhost/postgres',
    driver='org.postgresql.Driver',
    user='postgres',
    password='postgres',
    dbtable='stackoverflow_filtered.results'
).save(mode='append')
