In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram, Tokenizer
import pyspark.sql.functions as F

In [3]:
#Create spark session
spark =  (
    SparkSession.builder
                .appName("Stack Overflow Data Wrangling")
                .config("spark.jars", "jars\postgresql-42.2.8.jar") 
                .getOrCreate())


In [4]:
#Loading datasets

In [5]:
questions = spark.read.csv("questions.csv", header = True, inferSchema = True,escape='"', multiLine=True)

In [6]:
answers = spark.read.csv("answers.csv", header = True, inferSchema = True,escape='"', multiLine=True)

In [7]:
users = spark.read.csv("users.csv", header = True, inferSchema = True,escape='"', multiLine=True)

In [8]:
questions = questions.withColumnRenamed('id','qid')
questions = questions.withColumnRenamed('body', 'qbody')
questions = questions.withColumnRenamed('score', 'qscore')
questions = questions.withColumnRenamed('comment_count', 'qcomment_count')
questions = questions.withColumnRenamed('created_at', 'qcreated_at')
questions.columns


['qid',
 'user_id',
 'title',
 'qbody',
 'accepted_answer_id',
 'qscore',
 'view_count',
 'qcomment_count',
 'qcreated_at']

In [9]:
answers = answers.withColumnRenamed('created_at', 'acreated_at')
answers = answers.withColumnRenamed('id', 'aid')
answers = answers.withColumnRenamed('user_id', 'auser_aid')
answers.columns

['aid',
 'auser_aid',
 'question_id',
 'body',
 'score',
 'comment_count',
 'acreated_at']

In [10]:
users.columns

['id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'created_at',
 'updated_at']

In [11]:

#Join this with the questions and only pick questions with at least 20 views
#Join the answers to the results of (3)




In [12]:
#Extract the country and city into new columns
users = users\
                .withColumn('city', F.split(users['location'], ',')[1])\
                .withColumn('country', F.split(users['location'],',')[2])


In [13]:
#Select users from only one country of your choosing
users = users.filter(users.country == "India")


In [14]:
#joining the resulting users df with questions 
df = users.join(questions, users.id == questions.user_id)

In [15]:
df.columns

['id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'created_at',
 'updated_at',
 'city',
 'country',
 'qid',
 'user_id',
 'title',
 'qbody',
 'accepted_answer_id',
 'qscore',
 'view_count',
 'qcomment_count',
 'qcreated_at']

In [16]:
df.select('views','view_count').show()

+-----+----------+
|views|view_count|
+-----+----------+
+-----+----------+



In [17]:
df_1 = df.filter(df.views > 19)

In [18]:
df_1.select('views')

DataFrame[views: int]

In [19]:
df_1.select('views').take(50)

[]

In [20]:
#Joining answers dataset
df_2 = df_1.join(answers, df_1.id == answers.auser_aid)

In [21]:
df_2.columns

['id',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'created_at',
 'updated_at',
 'city',
 'country',
 'qid',
 'user_id',
 'title',
 'qbody',
 'accepted_answer_id',
 'qscore',
 'view_count',
 'qcomment_count',
 'qcreated_at',
 'aid',
 'auser_aid',
 'question_id',
 'body',
 'score',
 'comment_count',
 'acreated_at']

In [22]:
#Returning min updated time
df_2.registerTempTable('new_df')


In [23]:
 spark.sql("SELECT updated_at, views from new_df").show()

+----------+-----+
|updated_at|views|
+----------+-----+
+----------+-----+



In [26]:
df_2.write.format("jdbc").options( url='jdbc:postgresql://localhost/postgres', driver='org.postgresql.Driver', user='Adutwumwaah', password='devinvogue',dbtable='stackoverflow_filtered.results').save(mode='append')


Py4JJavaError: An error occurred while calling o176.save.
: java.lang.ClassNotFoundException: org.postgresql.Driver
	at java.net.URLClassLoader.findClass(Unknown Source)
	at java.lang.ClassLoader.loadClass(Unknown Source)
	at java.lang.ClassLoader.loadClass(Unknown Source)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$$anonfun$5.apply(JDBCOptions.scala:99)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:99)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:197)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:201)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
