# Basic Extract Transform and Load(ETL) pipeline using spark and postgres
### Data is scrapped from Stack overflow into the following tables:
- Questions
- Answers
- Users

#### This script is divided into 4 steps to illustrate the process


### Step 1: Data Extraction

In [1]:
import pyspark

from pyspark.sql.types import BooleanType
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window

In [2]:
#initializing spark a spark session
spark = ( 
    SparkSession.builder
                .appName("Stack Overflow Data_Wrangling")
                .config("spark.jars", "../jars/postgresql-42.2.8.jar")
                .getOrCreate()
)

KeyboardInterrupt: 

In [6]:
#loading the data in dataframes
answers = spark.read.csv("../data/stackoverflow/answers.csv",  header=True, inferSchema=True,multiLine=True)
questions = spark.read.csv("../data/stackoverflow/questions.csv", header=True, inferSchema=True,multiLine=True)
users = spark.read.csv("../data/stackoverflow/users.csv", header=True, inferSchema=True,multiLine=True)

NameError: name 'spark' is not defined

In [4]:
answers.columns

['id',
 'user_id',
 'question_id',
 'body',
 'score',
 'comment_count',
 'created_at']

In [5]:
answers.take(2)

[Row(id='53999517', user_id='1771994', question_id='53999275', body='"<p>The <code>for..of</code> loop you have in your code isn\'t needed. Just use the code you already have and <code>num</code> as the <code>quotes</code> array index value. I added button to demonstrate how the function will only return a single value:</p>\n\n<p><div class=""snippet"" data-lang=""js"" data-hide=""false"" data-console=""true"" data-babel=""false"">', score=None, comment_count=None, created_at=None),
 Row(id='<div class=""snippet-code"">', user_id=None, question_id=None, body=None, score=None, comment_count=None, created_at=None)]

In [6]:
users.columns

['id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'created_at',
 'updated_at']

In [7]:
questions.columns

['id',
 'user_id',
 'title',
 'body',
 'accepted_answer_id',
 'score',
 'view_count',
 'comment_count',
 'created_at']

In [8]:
#transforming the id column names as well as the timestamps too to avoid the ambiguity
answers = answers.withColumnRenamed('id', 'answer_id').withColumnRenamed('created_at', 'answer_created_at').withColumnRenamed('body','answer_body').withColumnRenamed('score','answer_score').withColumnRenamed('comment_count','answer_comment_count')
questions = questions.withColumnRenamed('id', 'question_id').withColumnRenamed('created_at', 'question_created_at').withColumnRenamed('body','question_body').withColumnRenamed('score','question_score')
users = users.withColumnRenamed('id', 'user_id').withColumnRenamed('created_at', 'user_created_at').withColumnRenamed('updated_at', 'user_updated_at')

#### Step 2: Data Transformation

In [9]:
#getting only indian users..
india_users = users.filter(users.location.contains('India'))

In [10]:
india_users.select("location").show()

+--------------------+
|            location|
+--------------------+
|Bangalore, Karnat...|
|Jalandhar, Punjab...|
|Indore, Madhya Pr...|
|Chennai, Tamil Na...|
|Kolkata, West Ben...|
|Bangalore, Karnat...|
|Pune, Maharashtra...|
|Chandan Nagar, Pu...|
|Poonamallee, Chen...|
|Vellore, Tamil Na...|
|Bengaluru, Karnat...|
|Chennai, Tamil Na...|
|        Delhi, India|
|Coimbatore, Tamil...|
|Pune, Maharashtra...|
|Chennai, Tamil Na...|
|Mumbai, Maharasht...|
|Pune, Maharashtra...|
|Indore, Madhya Pr...|
|Coimbatore, Tamil...|
+--------------------+
only showing top 20 rows



In [11]:
#extracting the country and city into new columns
cols = F.split(india_users['location'], ',' )
india_users = india_users.withColumn('city', cols.getItem(0))
india_users = india_users.withColumn('country', cols.getItem(2))

In [12]:
# rows where city was not quoted the country are being taken as None
india_users.select(['location','city','country']).show()

+--------------------+-------------+------------+
|            location|         city|     country|
+--------------------+-------------+------------+
|Bangalore, Karnat...|    Bangalore|       India|
|Jalandhar, Punjab...|    Jalandhar|       India|
|Indore, Madhya Pr...|       Indore|       India|
|Chennai, Tamil Na...|      Chennai|       India|
|Kolkata, West Ben...|      Kolkata|       India|
|Bangalore, Karnat...|    Bangalore|       India|
|Pune, Maharashtra...|         Pune|       India|
|Chandan Nagar, Pu...|Chandan Nagar| Maharashtra|
|Poonamallee, Chen...|  Poonamallee|  Tamil Nadu|
|Vellore, Tamil Na...|      Vellore|       India|
|Bengaluru, Karnat...|    Bengaluru|       India|
|Chennai, Tamil Na...|      Chennai|       India|
|        Delhi, India|        Delhi|        null|
|Coimbatore, Tamil...|   Coimbatore|       India|
|Pune, Maharashtra...|         Pune|       India|
|Chennai, Tamil Na...|      Chennai|       India|
|Mumbai, Maharasht...|       Mumbai|       India|


In [13]:
# an inner join of the filtered users df with the question df
df = india_users.join(questions, on='user_id', how='left')
df.columns

['user_id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'user_created_at',
 'user_updated_at',
 'city',
 'country',
 'question_id',
 'title',
 'question_body',
 'accepted_answer_id',
 'question_score',
 'view_count',
 'comment_count',
 'question_created_at']

In [14]:
# selecting only questions with at least 20 view counts
df = df.filter(df['view_count'] >= 20)

In [15]:
# joining the resultant table to the users answers table
df = df.join(answers, on=['question_id','user_id'], how='left')
df.columns

['question_id',
 'user_id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'user_created_at',
 'user_updated_at',
 'city',
 'country',
 'title',
 'question_body',
 'accepted_answer_id',
 'question_score',
 'view_count',
 'comment_count',
 'question_created_at',
 'answer_id',
 'answer_body',
 'answer_score',
 'answer_comment_count',
 'answer_created_at']

In [16]:
df.count()

26

In [23]:
df.dtypes

[('question_id', 'string'),
 ('user_id', 'string'),
 ('display_name', 'string'),
 ('reputation', 'string'),
 ('website_url', 'string'),
 ('location', 'string'),
 ('about_me', 'string'),
 ('views', 'string'),
 ('up_votes', 'string'),
 ('down_votes', 'string'),
 ('image_url', 'string'),
 ('user_created_at', 'string'),
 ('user_updated_at', 'string'),
 ('city', 'string'),
 ('country', 'string'),
 ('title', 'string'),
 ('question_body', 'string'),
 ('accepted_answer_id', 'string'),
 ('question_score', 'string'),
 ('view_count', 'string'),
 ('comment_count', 'string'),
 ('question_created_at', 'string'),
 ('answer_id', 'string'),
 ('answer_body', 'string'),
 ('answer_score', 'string'),
 ('answer_comment_count', 'string'),
 ('answer_created_at', 'string')]

### Step 3: Data Loading

In [24]:
#writing to the database
df.write.format("jdbc").options(
    url='jdbc:postgresql://localhost:5433/postgres',
    driver='org.postgresql.Driver',
    user='postgres',
    password='cl1f4d',
    dbtable='stackoverflow_filtered.results'
).save(mode='append')

Py4JJavaError: An error occurred while calling o408.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 33.0 failed 1 times, most recent failure: Lost task 2.0 in stage 33.0 (TID 1045, localhost, executor driver): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO stackoverflow_filtered.results ("question_id","user_id","display_name","reputation","website_url","location","about_me","views","up_votes","down_votes","image_url","user_created_at","user_updated_at","city","country","title","question_body","accepted_answer_id","question_score","view_count","comment_count","question_created_at","answer_id","answer_body","answer_score","answer_comment_count","answer_created_at") VALUES ('54543956','10361602','Rakesh','158',NULL,'India',NULL,'103','48','0','https://www.gravatar.com/avatar/b903ec19f73f93f17d68df77e7106e41?s=128&d=identicon&r=PG&f=1','2018-09-14 03:21:33','2019-08-25 01:06:03','India',NULL,'finding prime number using the square root method','<p>I was able to write a function for the prime number using this way</p>

<pre><code>def isprime(num):
    if num &gt; 1:
        for i in range(2, num):
            if num % i == 0:
                return False
        return True

%timeit [i for i in range(1000) if isprime(i)]
7.94 ms Â± 273 Âµs per loop (mean Â± std. dev. of 7 runs, 100 loops each)
</code></pre>

<p>Then I found that there''s an even faster way to write this using the square root but I couldn''t understand the working of this. <strong>Can anyone explain this code in easier terms and why it works?</strong></p>

<pre><code>def isprime(num):
    if num &gt; 1:
        for i in range(2, int(num ** 0.5) + 1):
            if num % i == 0:
                return False
        return True

%timeit [i for i in range(1000) if isprime(i)]    
1.94 ms Â± 54.7 Âµs per loop (mean Â± std. dev. of 7 runs, 1000 loops each)
</code></pre>

<p>If this is a duplicate please let me know I will delete it instantly.</p>
','54544012','0','638','8','2019-02-05 22:21:21',NULL,NULL,NULL,NULL,NULL) was aborted: ERROR: column "user_created_at" is of type timestamp without time zone but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 471  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:837)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: ERROR: column "user_created_at" is of type timestamp without time zone but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 471
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2497)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2233)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:834)
	... 16 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO stackoverflow_filtered.results ("question_id","user_id","display_name","reputation","website_url","location","about_me","views","up_votes","down_votes","image_url","user_created_at","user_updated_at","city","country","title","question_body","accepted_answer_id","question_score","view_count","comment_count","question_created_at","answer_id","answer_body","answer_score","answer_comment_count","answer_created_at") VALUES ('54543956','10361602','Rakesh','158',NULL,'India',NULL,'103','48','0','https://www.gravatar.com/avatar/b903ec19f73f93f17d68df77e7106e41?s=128&d=identicon&r=PG&f=1','2018-09-14 03:21:33','2019-08-25 01:06:03','India',NULL,'finding prime number using the square root method','<p>I was able to write a function for the prime number using this way</p>

<pre><code>def isprime(num):
    if num &gt; 1:
        for i in range(2, num):
            if num % i == 0:
                return False
        return True

%timeit [i for i in range(1000) if isprime(i)]
7.94 ms Â± 273 Âµs per loop (mean Â± std. dev. of 7 runs, 100 loops each)
</code></pre>

<p>Then I found that there''s an even faster way to write this using the square root but I couldn''t understand the working of this. <strong>Can anyone explain this code in easier terms and why it works?</strong></p>

<pre><code>def isprime(num):
    if num &gt; 1:
        for i in range(2, int(num ** 0.5) + 1):
            if num % i == 0:
                return False
        return True

%timeit [i for i in range(1000) if isprime(i)]    
1.94 ms Â± 54.7 Âµs per loop (mean Â± std. dev. of 7 runs, 1000 loops each)
</code></pre>

<p>If this is a duplicate please let me know I will delete it instantly.</p>
','54544012','0','638','8','2019-02-05 22:21:21',NULL,NULL,NULL,NULL,NULL) was aborted: ERROR: column "user_created_at" is of type timestamp without time zone but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 471  Call getNextException to see other errors in the batch.
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:837)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:672)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:834)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.postgresql.util.PSQLException: ERROR: column "user_created_at" is of type timestamp without time zone but expression is of type character varying
  Hint: You will need to rewrite or cast the expression.
  Position: 471
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2497)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2233)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:834)
	... 16 more
