In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext

In [2]:
import jaydebeapi, os

In [3]:
spark = (
SparkSession.builder
.appName("flock Data Wrangling")
.config("spark.jars","/home/girlytechgeek/Desktop/postgres/postgresql-42.2.8.jar")
.getOrCreate()
)

In [4]:
spark

In [5]:
question = spark.read.csv('questions.csv', header=True, escape='"', multiLine=True, inferSchema=True)
answer = spark.read.csv('answers5.csv', header=True, escape='"', multiLine=True, inferSchema=True)
user = spark.read.csv('users2.csv', header=True, inferSchema=True, escape='"', multiLine=True)

In [6]:
#rename individual conflicting column names using withColumnRenamed
question = question.withColumnRenamed('id', 'identity')
answer = answer.withColumnRenamed('id', 'identify')
user = user.withColumnRenamed('id', 'identical')

question = question.withColumnRenamed('created_at', 'questions_created_at')
answer = answer.withColumnRenamed('created_at', 'answers_created_at')
user = user.withColumnRenamed('created_at', 'users_created_at')

question = question.withColumnRenamed('user_id', 'questions_user_id')
answer = answer.withColumnRenamed('user_id', 'answers_user_id')

question = question.withColumnRenamed('score', 'questions_score')
answer = answer.withColumnRenamed('score', 'answers_score')

answer = answer.withColumnRenamed('body', 'answers_body')

In [7]:
question_drop = question.na.drop()

In [8]:
answer_drop = answer.na.drop()

In [9]:
user_drop = user.na.drop()

In [10]:
question_drop.count()

132620

In [11]:
answer_drop.count()

527544

In [12]:
user_drop.count()

17723

In [13]:
question_drop.show(5)
answer_drop.show(5)
user_drop.show(5)

+--------+-----------------+--------------------+--------------------+------------------+---------------+----------+-------------+--------------------+
|identity|questions_user_id|               title|                body|accepted_answer_id|questions_score|view_count|comment_count|questions_created_at|
+--------+-----------------+--------------------+--------------------+------------------+---------------+----------+-------------+--------------------+
|54233315|          1118630|XPath parent node...|<p>I'm trying to ...|          54233368|              1|       134|            4| 2019-01-17 09:59:47|
|54233145|          7984274| Is this a java BUG?|<p>why the follow...|          54234312|             -2|       132|            3| 2019-01-17 09:50:12|
|54233331|          1877002|Different results...|<p>I am new to li...|          54233375|             -1|        26|            0| 2019-01-17 10:00:17|
|54233149|         10927076|Using eval as pro...|<p>I know there a...|          54233257

In [14]:
user_drop.columns

['identical',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'users_created_at',
 'updated_at']

In [15]:
user_drop.select('location').show()

+--------------------+
|            location|
+--------------------+
|Jalandhar, Punjab...|
|San Francisco, CA...|
|Pune, Maharashtra...|
|             Germany|
|          Oxford, UK|
|Saint Albans, WV,...|
|     Berlin, Germany|
|          Madagascar|
|Medellín, Antioqu...|
|Madurai, Tamil Na...|
|Wellington, New Z...|
|              Turkey|
|             Moldova|
|           Argentina|
|Manjeshwar, Keral...|
|     Russia, Irkutsk|
|   Kingston, Jamaica|
|             Germany|
|      Minsk, Belarus|
|      Shanghai, Cina|
+--------------------+
only showing top 20 rows



In [16]:
answer_drop.select('question_id', 'answers_score').show()

+-----------+-------------+
|question_id|answers_score|
+-----------+-------------+
|   53994501|            0|
|   54001177|            0|
|   54004053|            0|
|   54381785|            0|
|   54001092|            0|
|   53995985|            0|
|   54000445|            0|
|   53993266|            0|
|   54004019|            0|
|   53996088|            0|
|   54000494|            0|
|   54008132|            0|
|   54008087|            0|
|   53994485|            0|
|   54001104|            0|
|   54330304|            0|
|   54007315|            0|
|   54000464|            0|
|   54008215|            0|
|   53996087|            0|
+-----------+-------------+
only showing top 20 rows



In [17]:
user_drop.select('identical','location').show()

+---------+--------------------+
|identical|            location|
+---------+--------------------+
| 10262756|Jalandhar, Punjab...|
|  6513406|San Francisco, CA...|
|  8391844|Pune, Maharashtra...|
| 10254253|             Germany|
|  8377832|          Oxford, UK|
|  6509967|Saint Albans, WV,...|
|  8386892|     Berlin, Germany|
|  2627166|          Madagascar|
| 10274636|Medellín, Antioqu...|
|  6505596|Madurai, Tamil Na...|
| 10278210|Wellington, New Z...|
| 10264747|              Turkey|
| 10299702|             Moldova|
| 10297198|           Argentina|
|  6543693|Manjeshwar, Keral...|
|  8421111|     Russia, Irkutsk|
|  2646801|   Kingston, Jamaica|
|  8421992|             Germany|
|  6546560|      Minsk, Belarus|
|  8425233|      Shanghai, Cina|
+---------+--------------------+
only showing top 20 rows



In [18]:
user_drop.registerTempTable('used')
new_user = spark.sql("select * from used where location like '%Germany' ")

In [19]:
new_user.show()

+---------+-------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+
|identical| display_name|reputation|         website_url|            location|            about_me|views|up_votes|down_votes|           image_url|   users_created_at|         updated_at|
+---------+-------------+----------+--------------------+--------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+
| 10254253|  HACKER 3000|         1|http://hacker3000.tk|             Germany|<p>i dont have a ...|    5|       0|         0|https://lh3.googl...|2018-08-21 10:25:54|2019-09-21 09:29:45|
|  8386892|    xtristrix|         1| http://cyberiade.de|     Berlin, Germany|<p>Linux user and...|    2|       0|         0|https://www.grava...|2017-07-29 17:08:47|2019-07-19 11:28:49|
|  8421992|     Peter K.|         1|  http://kappelt.net|        

In [20]:
import pyspark.sql.functions as f

In [21]:
split_col = f.split(new_user['location'], ',')
new_split = new_user.withColumn('city', split_col.getItem(0))
split = new_split.withColumn('country', split_col.getItem(1))

In [22]:
split.columns

['identical',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'users_created_at',
 'updated_at',
 'city',
 'country']

In [23]:
type(split)

pyspark.sql.dataframe.DataFrame

In [24]:
split.select('location', 'city', 'country').show()

+--------------------+-------------------+--------+
|            location|               city| country|
+--------------------+-------------------+--------+
|             Germany|            Germany|    null|
|     Berlin, Germany|             Berlin| Germany|
|             Germany|            Germany|    null|
|  Wuppertal, Germany|          Wuppertal| Germany|
|             Germany|            Germany|    null|
|        Ulm, Germany|                Ulm| Germany|
|             Germany|            Germany|    null|
| Düsseldorf, Germany|         Düsseldorf| Germany|
|             Germany|            Germany|    null|
|Rosenheim (Bavari...|Rosenheim (Bavaria)| Germany|
|             Germany|            Germany|    null|
|             Germany|            Germany|    null|
|     Bremen, Germany|             Bremen| Germany|
|             Germany|            Germany|    null|
|             Germany|            Germany|    null|
|  Stuttgart, Germany|          Stuttgart| Germany|
|    Hamburg

In [25]:
user = split.join(question, split['identical'] == question.questions_user_id)

In [26]:
user.show()

+---------+------------+----------+--------------------+-----------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+--------+--------+--------+-----------------+--------------------+--------------------+------------------+---------------+----------+-------------+--------------------+
|identical|display_name|reputation|         website_url|         location|            about_me|views|up_votes|down_votes|           image_url|   users_created_at|         updated_at|    city| country|identity|questions_user_id|               title|                body|accepted_answer_id|questions_score|view_count|comment_count|questions_created_at|
+---------+------------+----------+--------------------+-----------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+--------+--------+--------+-----------------+--------------------+--------------------+------------------+----------

In [27]:
user.columns

['identical',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'users_created_at',
 'updated_at',
 'city',
 'country',
 'identity',
 'questions_user_id',
 'title',
 'body',
 'accepted_answer_id',
 'questions_score',
 'view_count',
 'comment_count',
 'questions_created_at']

In [28]:
type(user)

pyspark.sql.dataframe.DataFrame

In [29]:
result = user.join(answer, user['questions_created_at'] == answer.answers_created_at)

In [30]:
result.show()

+---------+------------------+----------+--------------------+-------------------+--------------------+-----+--------+----------+--------------------+-------------------+-------------------+----------+--------+--------+-----------------+--------------------+--------------------+------------------+---------------+----------+-------------+--------------------+--------+---------------+-----------+--------------------+-------------+----------+-------------------+
|identical|      display_name|reputation|         website_url|           location|            about_me|views|up_votes|down_votes|           image_url|   users_created_at|         updated_at|      city| country|identity|questions_user_id|               title|                body|accepted_answer_id|questions_score|view_count|comment_count|questions_created_at|identify|answers_user_id|question_id|        answers_body|answers_score|comment_at| answers_created_at|
+---------+------------------+----------+--------------------+----------

In [31]:
result.select('country', 'city').show()

+--------+----------+
| country|      city|
+--------+----------+
|    null|   Germany|
| Germany| Pforzheim|
|    null|   Germany|
|    null|   Germany|
| Germany|    Berlin|
| Germany|    Berlin|
| Germany|    Berlin|
| Germany|    Berlin|
| Germany|Heidelberg|
|    null|   Germany|
|    null|   Germany|
| Germany|    Berlin|
| Germany|    Aachen|
|    null|   Germany|
| Germany|    Berlin|
|    null|   Germany|
| Germany|    Munich|
| Germany|    Munich|
| Germany|    Munich|
| Germany| Bielefeld|
+--------+----------+
only showing top 20 rows



In [32]:
result = result.filter((result.city !='Germany')& (result.country !='null'))

In [33]:
result.select('country','city','location').show()

+--------+----------+-------------------+
| country|      city|           location|
+--------+----------+-------------------+
| Germany| Pforzheim| Pforzheim, Germany|
| Germany|    Berlin|    Berlin, Germany|
| Germany|    Berlin|    Berlin, Germany|
| Germany|    Berlin|    Berlin, Germany|
| Germany|    Berlin|    Berlin, Germany|
| Germany|Heidelberg|Heidelberg, Germany|
| Germany|    Berlin|    Berlin, Germany|
| Germany|    Aachen|    Aachen, Germany|
| Germany|    Berlin|    Berlin, Germany|
| Germany|    Munich|    Munich, Germany|
| Germany|    Munich|    Munich, Germany|
| Germany|    Munich|    Munich, Germany|
| Germany| Bielefeld| Bielefeld, Germany|
| Germany|  Chemnitz|  Chemnitz, Germany|
| Germany|  Chemnitz|  Chemnitz, Germany|
| Germany|   Leipzig|   Leipzig, Germany|
| Germany|    Berlin|    Berlin, Germany|
| Germany|  Chemnitz|  Chemnitz, Germany|
| Germany|    Berlin|    Berlin, Germany|
| Germany|    Berlin|    Berlin, Germany|
+--------+----------+-------------

In [34]:
type(result)

pyspark.sql.dataframe.DataFrame

In [35]:
answer.columns

['identify',
 'answers_user_id',
 'question_id',
 'answers_body',
 'answers_score',
 'comment_at',
 'answers_created_at']

In [36]:
result.columns

['identical',
 'display_name',
 'reputation',
 'website_url',
 'location',
 'about_me',
 'views',
 'up_votes',
 'down_votes',
 'image_url',
 'users_created_at',
 'updated_at',
 'city',
 'country',
 'identity',
 'questions_user_id',
 'title',
 'body',
 'accepted_answer_id',
 'questions_score',
 'view_count',
 'comment_count',
 'questions_created_at',
 'identify',
 'answers_user_id',
 'question_id',
 'answers_body',
 'answers_score',
 'comment_at',
 'answers_created_at']

In [37]:
result.dtypes

[('identical', 'int'),
 ('display_name', 'string'),
 ('reputation', 'int'),
 ('website_url', 'string'),
 ('location', 'string'),
 ('about_me', 'string'),
 ('views', 'int'),
 ('up_votes', 'int'),
 ('down_votes', 'int'),
 ('image_url', 'string'),
 ('users_created_at', 'timestamp'),
 ('updated_at', 'timestamp'),
 ('city', 'string'),
 ('country', 'string'),
 ('identity', 'int'),
 ('questions_user_id', 'int'),
 ('title', 'string'),
 ('body', 'string'),
 ('accepted_answer_id', 'int'),
 ('questions_score', 'int'),
 ('view_count', 'int'),
 ('comment_count', 'int'),
 ('questions_created_at', 'timestamp'),
 ('identify', 'int'),
 ('answers_user_id', 'int'),
 ('question_id', 'int'),
 ('answers_body', 'string'),
 ('answers_score', 'int'),
 ('comment_at', 'int'),
 ('answers_created_at', 'timestamp')]

In [38]:
result.select( 'identity', 'display_name', 'reputation', 'country', 'city', 'accepted_answer_id').show()

+--------+------------------+----------+--------+----------+------------------+
|identity|      display_name|reputation| country|      city|accepted_answer_id|
+--------+------------------+----------+--------+----------+------------------+
|55115319|     Tom el Safadi|      1682| Germany| Pforzheim|          55115371|
|56061793|              Adam|      5655| Germany|    Berlin|          56061862|
|55686943|            Antony|      1141| Germany|    Berlin|          55686944|
|56486345|  Lukas Würzburger|      3690| Germany|    Berlin|          56486346|
|56361125|            Alex44|      1817| Germany|    Berlin|          56373408|
|56479912|            DrWhat|       838| Germany|Heidelberg|          57181198|
|54537837|      Joerg Krause|       840| Germany|    Berlin|              null|
|55593426|        J. Hesters|      1715| Germany|    Aachen|          55597200|
|56045322|              Adam|      5655| Germany|    Berlin|          56049393|
|54588493|          gsamaras|     55594|

In [39]:
result.registerTempTable('new_df')
new_df = spark.sql('select min (updated_at) from new_df')

In [40]:
new_df.show()

+-------------------+
|    min(updated_at)|
+-------------------+
|2019-10-02 08:53:59|
+-------------------+



In [41]:
spark.sql("SELECT city,location,updated_at, country from new_df").show()

+----------+-------------------+-------------------+--------+
|      city|           location|         updated_at| country|
+----------+-------------------+-------------------+--------+
| Pforzheim| Pforzheim, Germany|2019-10-13 00:01:21| Germany|
|    Berlin|    Berlin, Germany|2019-10-13 00:28:06| Germany|
|    Berlin|    Berlin, Germany|2019-10-12 17:27:12| Germany|
|    Berlin|    Berlin, Germany|2019-10-12 21:55:38| Germany|
|    Berlin|    Berlin, Germany|2019-10-11 16:39:07| Germany|
|Heidelberg|Heidelberg, Germany|2019-10-09 12:31:20| Germany|
|    Berlin|    Berlin, Germany|2019-10-11 07:58:55| Germany|
|    Aachen|    Aachen, Germany|2019-10-12 18:31:33| Germany|
|    Berlin|    Berlin, Germany|2019-10-13 00:28:06| Germany|
|    Munich|    Munich, Germany|2019-10-12 19:19:48| Germany|
|    Munich|    Munich, Germany|2019-10-10 08:07:15| Germany|
|    Munich|    Munich, Germany|2019-10-10 08:07:15| Germany|
| Bielefeld| Bielefeld, Germany|2019-10-12 21:31:42| Germany|
|  Chemn

In [42]:
result.write.format("jdbc").options(
   name="PostgreSQL-postgres",
   url="jdbc:postgresql://localhost:5432/postgres",
   driver="org.postgresql.Driver",
   user="anna",
   password='anna',
   ssl='true',
   dbtable='stackoverflow_filtered.result',
)

<pyspark.sql.readwriter.DataFrameWriter at 0x7fe2ae241fd0>