In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType, DecimalType, IntegerType 
from pyspark.sql.functions import element_at, split, col, udf

In [2]:
spark = (
    SparkSession.builder
                .appName("Stack Overflow Data Wrangling")
                .config("spark.jars", "postgresql-42.2.8.jar") 
                .getOrCreate()
)

spark = (
    SparkSession.builder
                .appName("Stack Overflow Data Wrangling")
                .config("spark.jars", "C:/Users/USER/Desktop/Blossom_Academy/jars/postgresql-42.2.8.jar") 
                .getOrCreate())

spark = SparkSession.builder.master("local[*]").appName("ETL Pipeline").getOrCreate()

In [3]:
# Set Logging Level to WARN
spark.sparkContext.setLogLevel("WARN")

In [5]:
#Loading the questions dataset
answers = spark.read.csv('C:/Users/USER/Desktop/Blossom_Academy/stackoverflow/answers.csv', header = True, inferSchema =True, multiLine = True, escape='"')
users = spark.read.csv('C:/Users/USER/Desktop/Blossom_Academy/stackoverflow/users.csv', header = True, inferSchema =True, multiLine = True, escape='"')
questions = spark.read.csv('C:/Users/USER/Desktop/Blossom_Academy/stackoverflow/questions.csv', header = True, inferSchema =True, multiLine = True, escape='"')

In [6]:
answers.head(5)

[Row(id=53999517, user_id=1771994, question_id=53999275, body='<p>The <code>for..of</code> loop you have in your code isn\'t needed. Just use the code you already have and <code>num</code> as the <code>quotes</code> array index value. I added button to demonstrate how the function will only return a single value:</p>\n\n<p><div class="snippet" data-lang="js" data-hide="false" data-console="true" data-babel="false">\n<div class="snippet-code">\n<pre class="snippet-code-js lang-js prettyprint-override"><code>function randomQuote() {\n  const quotes = document.querySelectorAll("p");\n  const num = (Math.floor(Math.random() * Math.floor(quotes.length)));\n  return quotes[num].innerText;\n}\n\ndocument.querySelector(\'#buttonEl\').addEventListener(\'click\', () =&gt; {\n  document.querySelector(\'#quoteEl\').innerHTML = randomQuote();\n});</code></pre>\n<pre class="snippet-code-css lang-css prettyprint-override"><code>#quoteEl {\n  color: red;\n}</code></pre>\n<pre class="snippet-code-html 

In [7]:
#Rename the answers column to another name
answers = answers.withColumnRenamed('id', 'answer_id')
users = users.withColumnRenamed('id', 'user_id')

In [8]:
answers.columns

['answer_id',
 'user_id',
 'question_id',
 'body',
 'score',
 'comment_count',
 'created_at']

In [9]:
users.columns

['user_id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'created_at',
 'updated_at']

In [10]:
users.select('location').show(20)

+--------------------+
|            location|
+--------------------+
|Bangalore, Karnat...|
|      æ—¥æœ¬ TÅ�kyÅ�|
|      Patras, Greece|
|Jalandhar, Punjab...|
|Makassar, Kota Ma...|
|                null|
|                null|
|                null|
|                null|
|Indore, Madhya Pr...|
|       Kyiv, Ukraine|
|                null|
|                null|
|                null|
|                null|
|                null|
|Bandung, Bandung ...|
|                null|
|West New York, NJ...|
|                null|
+--------------------+
only showing top 20 rows



In [11]:
#Selecting users from only one country of your choice
users.select('display_name', users["location"] == "Canada").show(20)

+--------------------+-------------------+
|        display_name|(location = Canada)|
+--------------------+-------------------+
|              suryan|              false|
|  Hirotaka Nishimiya|              false|
|Giorgos Paraskevo...|              false|
|             Ali Mir|              false|
|Adjie Satrio Prabowo|              false|
|         Jordan Cyan|               null|
|               Sjobi|               null|
|         Varun gowda|               null|
|          krown_loki|               null|
|       shirsh shukla|              false|
|               Pavel|              false|
|           minjie li|               null|
|              S. Das|               null|
|Holland Property ...|               null|
|                kelo|               null|
|            Daniel N|               null|
|          socionomad|              false|
|               Nadia|               null|
|           Bradley M|              false|
|           Codezters|               null|
+----------

In [13]:
users.withColumn("Country", split(col("location"), "\\.").getItem(0)).show(20)

+--------+--------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+--------------------+
| user_id|        display_name|reputation|         website_url|            location|            about_me|views|up_votes|down_votes|           image_url|         created_at|         updated_at|             Country|
+--------+--------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+--------------------+
| 8357266|              suryan|         7|https://twitter.c...|Bangalore, Karnat...|                null|    8|       0|         0|https://www.grava...|2017-07-24 10:55:23|2019-06-19 05:00:16|Bangalore, Karnat...|
| 2602456|                 Avi|         1|https://avtechtoo...|              Canada|                null|    0|       0|         0|             

new_users.select(split(new_users.location, ' ')[-2].alias('Country')).distinct().show(30)

In [12]:
users.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- display_name: string (nullable = true)
 |-- reputation: integer (nullable = true)
 |-- website_url: string (nullable = true)
 |-- location: string (nullable = true)
 |-- about_me: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- up_votes: integer (nullable = true)
 |-- down_votes: integer (nullable = true)
 |-- image_url: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)



In [14]:
users.describe('location').show()

+-------+--------------------+
|summary|            location|
+-------+--------------------+
|  count|               27008|
|   mean|   6035.526315789473|
| stddev|  22361.197593570734|
|    min|           6.2)</li>|
|    max|ëŒ€í•œë¯¼êµ­ ë¶€ì...|
+-------+--------------------+



In [15]:
users.filter(users.location == 'Canada').count()

136

In [16]:
users.filter(users.location == 'Canada')

DataFrame[user_id: int, display_name: string, reputation: int, website_url: string, location: string, about_me: string, views: int, up_votes: int, down_votes: int, image_url: string, created_at: timestamp, updated_at: timestamp]

In [17]:
from pyspark.sql import Row, SQLContext, SparkSession
from pyspark.sql.functions import udf
from pyspark.sql import functions as F

In [19]:
users.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- display_name: string (nullable = true)
 |-- reputation: integer (nullable = true)
 |-- website_url: string (nullable = true)
 |-- location: string (nullable = true)
 |-- about_me: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- up_votes: integer (nullable = true)
 |-- down_votes: integer (nullable = true)
 |-- image_url: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)



In [20]:
answers.printSchema()

root
 |-- answer_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- question_id: integer (nullable = true)
 |-- body: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)



In [21]:
users = users.withColumnRenamed('created_at', 'users_created_at')
answers = answers.withColumnRenamed('created_at', 'answers_created_at')

In [22]:
new_users =users.select('user_id', 'display_name', 'reputation', 'website_url',
                                   'location', 'about_me', 'views', 'up_votes',
                                   'down_votes', 'image_url', 'users_created_at',
                                   'updated_at', F.split(users['location'], ',')[0].alias('city'),
                                  F.split(users['location'], ',')[2].alias('country'))

In [23]:
new_users.show(10)

+--------+------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+--------------+-------+
| user_id|      display_name|reputation|         website_url|            location|            about_me|views|up_votes|down_votes|           image_url|   users_created_at|         updated_at|          city|country|
+--------+------------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+--------------+-------+
| 8357266|            suryan|         7|https://twitter.c...|Bangalore, Karnat...|                null|    8|       0|         0|https://www.grava...|2017-07-24 10:55:23|2019-06-19 05:00:16|     Bangalore|  India|
| 2602456|               Avi|         1|https://avtechtoo...|              Canada|                null|    0|       0|         0|               

In [24]:
questions.printSchema()

root
 |-- id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- accepted_answer_id: integer (nullable = true)
 |-- score: integer (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)



In [25]:
new_users.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- display_name: string (nullable = true)
 |-- reputation: integer (nullable = true)
 |-- website_url: string (nullable = true)
 |-- location: string (nullable = true)
 |-- about_me: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- up_votes: integer (nullable = true)
 |-- down_votes: integer (nullable = true)
 |-- image_url: string (nullable = true)
 |-- users_created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)



In [26]:
#Joining the Dataset for Questions and Users
quest_user = new_users.join(questions.withColumnRenamed('id', 'question_id'), new_users['user_id'] == questions['user_id']).drop(questions.user_id)
#result = df.join(other, cond, 'inner').drop(df.a)

In [27]:
quest_user.columns

['user_id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'users_created_at',
 'updated_at',
 'city',
 'country',
 'question_id',
 'title',
 'body',
 'accepted_answer_id',
 'score',
 'view_count',
 'comment_count',
 'created_at']

In [None]:
quest_user.printSchema()

#Changing the columns view_count as integer
from pyspark.sql.types import IntegerType
quest_user = quest_user.withColumn("view_count", quest_user["view_count"].cast(IntegerType()))

In [28]:
#Filtering out columns with views greater than 20 and replacing it with the joined data for questions and users
quest_user = quest_user.filter(quest_user["view_count"] >= 20)

In [29]:
quest_user.show()

+-------+--------------+----------+--------------------+---------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+---------+-------+-----------+--------------------+--------------------+------------------+-----+----------+-------------+-------------------+
|user_id|  display_name|reputation|         website_url|       location|            about_me|views|up_votes|down_votes|           image_url|   users_created_at|         updated_at|     city|country|question_id|               title|                body|accepted_answer_id|score|view_count|comment_count|         created_at|
+-------+--------------+----------+--------------------+---------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+---------+-------+-----------+--------------------+--------------------+------------------+-----+----------+-------------+-------------------+
|   2122| Rune Jacobsen|      5

In [30]:
answers = (answers.withColumnRenamed('score', 'answer_score')
                    .withColumnRenamed('body', 'answer_body'))

In [31]:
answers.printSchema()

root
 |-- answer_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- question_id: integer (nullable = true)
 |-- answer_body: string (nullable = true)
 |-- answer_score: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- answers_created_at: timestamp (nullable = true)



#Dropping duplicated columns before joining the data
answers = answers.drop('comment_count','user_id', 'question_id')

In [32]:
answers = answers.withColumnRenamed('user_id', 'user_id_1')
answers = answers.withColumnRenamed('comment_count', 'comment_count_1')

In [33]:
answers.printSchema()

root
 |-- answer_id: integer (nullable = true)
 |-- user_id_1: integer (nullable = true)
 |-- question_id: integer (nullable = true)
 |-- answer_body: string (nullable = true)
 |-- answer_score: integer (nullable = true)
 |-- comment_count_1: integer (nullable = true)
 |-- answers_created_at: timestamp (nullable = true)



In [34]:
quest_user.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- display_name: string (nullable = true)
 |-- reputation: integer (nullable = true)
 |-- website_url: string (nullable = true)
 |-- location: string (nullable = true)
 |-- about_me: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- up_votes: integer (nullable = true)
 |-- down_votes: integer (nullable = true)
 |-- image_url: string (nullable = true)
 |-- users_created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- question_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- accepted_answer_id: integer (nullable = true)
 |-- score: integer (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- created_at: timestamp (nullable = true)



In [35]:
final_join = answers.join(quest_user.withColumnRenamed('comment_count', 'question_comment_count'), answers['user_id_1'] == quest_user['user_id'])

In [36]:
final_join.show(5)

+---------+---------+-----------+--------------------+------------+---------------+-------------------+-------+-------------+----------+--------------------+---------------+--------------------+-----+--------+----------+---------+-------------------+-------------------+-------+-------+-----------+--------------------+--------------------+------------------+-----+----------+----------------------+-------------------+
|answer_id|user_id_1|question_id|         answer_body|answer_score|comment_count_1| answers_created_at|user_id| display_name|reputation|         website_url|       location|            about_me|views|up_votes|down_votes|image_url|   users_created_at|         updated_at|   city|country|question_id|               title|                body|accepted_answer_id|score|view_count|question_comment_count|         created_at|
+---------+---------+-----------+--------------------+------------+---------------+-------------------+-------+-------------+----------+--------------------+---

In [37]:
final_join.printSchema()

root
 |-- answer_id: integer (nullable = true)
 |-- user_id_1: integer (nullable = true)
 |-- question_id: integer (nullable = true)
 |-- answer_body: string (nullable = true)
 |-- answer_score: integer (nullable = true)
 |-- comment_count_1: integer (nullable = true)
 |-- answers_created_at: timestamp (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- display_name: string (nullable = true)
 |-- reputation: integer (nullable = true)
 |-- website_url: string (nullable = true)
 |-- location: string (nullable = true)
 |-- about_me: string (nullable = true)
 |-- views: integer (nullable = true)
 |-- up_votes: integer (nullable = true)
 |-- down_votes: integer (nullable = true)
 |-- image_url: string (nullable = true)
 |-- users_created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- question_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- body: st

In [38]:
#Creating a temp table for SQL queries to generate python output
final_join.registerTempTable("finaljoin_sql")

In [39]:
spark.sql("select * from finaljoin_sql").show()

+---------+---------+-----------+--------------------+------------+---------------+-------------------+-------+-------------+----------+--------------------+---------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+---------+-------+-----------+--------------------+--------------------+------------------+-----+----------+----------------------+-------------------+
|answer_id|user_id_1|question_id|         answer_body|answer_score|comment_count_1| answers_created_at|user_id| display_name|reputation|         website_url|       location|            about_me|views|up_votes|down_votes|           image_url|   users_created_at|         updated_at|     city|country|question_id|               title|                body|accepted_answer_id|score|view_count|question_comment_count|         created_at|
+---------+---------+-----------+--------------------+------------+---------------+-------------------+-------+-------------+---------

In [40]:
#SQL funtion to print out the minimum value in the column updated_at
spark.sql("select MIN(created_at) from finaljoin_sql").show()

+-------------------+
|    min(created_at)|
+-------------------+
|2019-01-01 00:15:23|
+-------------------+



In [42]:
final_join.write.format("jdbc").options(
    url='jdbc:postgresql://localhost/postgres',
    driver='org.postgresql.driver',
    user='postgres',
    password='Mickey@374',
    dbtable='stackoverflow_filtered.results'
).save(mode='append')

Py4JJavaError: An error occurred while calling o227.save.
: java.lang.ClassNotFoundException: org.postgresql.driver
	at java.net.URLClassLoader$1.run(Unknown Source)
	at java.net.URLClassLoader$1.run(Unknown Source)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(Unknown Source)
	at java.lang.ClassLoader.loadClass(Unknown Source)
	at java.lang.ClassLoader.loadClass(Unknown Source)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:99)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:197)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:201)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)


In [None]:
a.write.format("jdbc").options(
    url='jdbc:postgresql://localhost/postgres',
    driver='org.postgresql.Driver',
    user='postgres',
    password='postgres',
    dbtable='stackoverflow_filtered.results'
).save(mode='append')