In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *

# initialize
working_directory = 'main/jars/'

spark = SparkSession \
    .builder \
    .appName('data-public_local') \
    .config('spark.driver.extraClassPath',
            working_directory + 'postgresql-42.2.5.jar') \
    .master("local[*]") \
    .getOrCreate()

In [2]:
# get data from parquet files
output_directory = 'main/outputdata'

badges = spark.read.parquet(output_directory + '/Badges/*')
posts = spark.read.parquet(output_directory + '/Posts/*')
users = spark.read.parquet(output_directory + '/Users/*')

badges.createOrReplaceTempView("badges_sql")
posts.createOrReplaceTempView("posts_sql")
users.createOrReplaceTempView("users_sql")

In [3]:
# create table with number of posts per user
posts_number = spark.sql("SELECT tmp.ownerUserId, COUNT(*) as totalNumberOfPosts"
                         " FROM (posts_sql join users_sql"
                         "     on users_sql.id == posts_sql.ownerUserId) as tmp"
                         " GROUP BY tmp.ownerUserId"
                         )

posts_number.createOrReplaceTempView("posts_number_sql")

# JOIN with users table -> users table with number of posts
new_users_1 = spark.sql(" SELECT tmp.id, tmp.reputation, tmp.creationDate, tmp.displayName, tmp.emailHash,"
                                     " tmp.lastAccessDate, tmp.websiteUrl, tmp.location, tmp.age, tmp.aboutMe, tmp.views,"
                                     " tmp.upVotes, tmp.downVotes, tmp.profileImageUrl, tmp.accountId, COALESCE(tmp.totalNumberOfPosts, 0) as totalNumberOfPosts"
                                     " FROM (users_sql LEFT OUTER JOIN posts_number_sql"
                                     "  on users_sql.id == posts_number_sql.ownerUserId) as tmp")

new_users_1.createOrReplaceTempView("new_users_1_sql")
new_users_1.show(3)


+---+----------+--------------------+-------------+---------+--------------------+--------------------+-------------+----+--------------------+-----+-------+---------+---------------+---------+------------------+
| id|reputation|        creationDate|  displayName|emailHash|      lastAccessDate|          websiteUrl|     location| age|             aboutMe|views|upVotes|downVotes|profileImageUrl|accountId|totalNumberOfPosts|
+---+----------+--------------------+-------------+---------+--------------------+--------------------+-------------+----+--------------------+-----+-------+---------+---------------+---------+------------------+
|148|       440|2011-05-25 13:13:...|        scdmb|     null|2016-08-14 12:04:...|                    |         null|null|                null|    4|     12|        0|           null|   383149|                 0|
|243|       101|2011-06-01 13:18:...|   Binarylife|     null|2018-07-06 07:21:...|                null|         null|null|                null|    0

In [4]:
# table with critics
critics = spark.sql("SELECT tmp.userId, tmp.name as badge"
                            " FROM (badges_sql join users_sql"
                            " on badges_sql.userId == users_sql.id) as tmp"
                            " WHERE tmp.name == 'Critic'"
                            )

critics = critics.withColumn("critic", when(critics.badge == "Critic", 1).otherwise(0)).drop(col("badge"))
critics.createOrReplaceTempView("critics_sql")
critics.show(3)


new_users_2 = spark.sql(" SELECT tmp.id, tmp.reputation, tmp.creationDate, tmp.displayName, tmp.emailHash,"
                        " tmp.lastAccessDate, tmp.websiteUrl, tmp.location, tmp.age, tmp.aboutMe, tmp.views,"
                        " tmp.upVotes, tmp.downVotes, tmp.profileImageUrl, tmp.accountId, tmp.totalNumberOfPosts,"
                        " COALESCE(tmp.critic, 0) as critic"
                      " FROM (new_users_1_sql LEFT OUTER JOIN critics_sql"
                      "  on new_users_1_sql.id == critics_sql.userId) as tmp")

new_users_2.createOrReplaceTempView("new_users_2_sql")

new_users_2.show(3)


+------+------+
|userId|critic|
+------+------+
|     8|     1|
|     1|     1|
|     4|     1|
+------+------+
only showing top 3 rows



+---+----------+--------------------+-------------+---------+--------------------+--------------------+-------------+----+--------------------+-----+-------+---------+---------------+---------+------------------+------+
| id|reputation|        creationDate|  displayName|emailHash|      lastAccessDate|          websiteUrl|     location| age|             aboutMe|views|upVotes|downVotes|profileImageUrl|accountId|totalNumberOfPosts|critic|
+---+----------+--------------------+-------------+---------+--------------------+--------------------+-------------+----+--------------------+-----+-------+---------+---------------+---------+------------------+------+
|148|       440|2011-05-25 13:13:...|        scdmb|     null|2016-08-14 12:04:...|                    |         null|null|                null|    4|     12|        0|           null|   383149|                 0|     0|
|243|       101|2011-06-01 13:18:...|   Binarylife|     null|2018-07-06 07:21:...|                null|         null|nul

In [5]:
# table with editors
editors = spark.sql("SELECT tmp.userId, tmp.name as badge"
                    " FROM (badges_sql join users_sql"
                    " on badges_sql.userId == users_sql.id) as tmp"
                    " WHERE tmp.name == 'Editor'"
                    )

editors = editors.withColumn("editor", when(editors.badge == "Editor", 1).otherwise(0)).drop(
    col("badge"))
editors.createOrReplaceTempView("editors_sql")
editors.show(3)

new_users_3 = spark.sql(" SELECT tmp.id, tmp.reputation, tmp.creationDate, tmp.displayName, tmp.emailHash,"
                        " tmp.lastAccessDate, tmp.websiteUrl, tmp.location, tmp.age, tmp.aboutMe, tmp.views,"
                        " tmp.upVotes, tmp.downVotes, tmp.profileImageUrl, tmp.accountId, tmp.totalNumberOfPosts,"
                        " tmp.critic, COALESCE(tmp.editor, 0) as editor"
                       " FROM (new_users_2_sql LEFT OUTER JOIN editors_sql"
                       "  on new_users_2_sql.id == editors_sql.userId) as tmp")

new_users_3.createOrReplaceTempView("new_users_3_sql")

new_users_3.show(3)

+------+------+
|userId|editor|
+------+------+
|     8|     1|
|     6|     1|
|     1|     1|
+------+------+
only showing top 3 rows



+---+----------+--------------------+-------------+---------+--------------------+--------------------+-------------+----+--------------------+-----+-------+---------+---------------+---------+------------------+------+------+
| id|reputation|        creationDate|  displayName|emailHash|      lastAccessDate|          websiteUrl|     location| age|             aboutMe|views|upVotes|downVotes|profileImageUrl|accountId|totalNumberOfPosts|critic|editor|
+---+----------+--------------------+-------------+---------+--------------------+--------------------+-------------+----+--------------------+-----+-------+---------+---------------+---------+------------------+------+------+
|148|       440|2011-05-25 13:13:...|        scdmb|     null|2016-08-14 12:04:...|                    |         null|null|                null|    4|     12|        0|           null|   383149|                 0|     0|     0|
|243|       101|2011-06-01 13:18:...|   Binarylife|     null|2018-07-06 07:21:...|          

In [6]:
# table with last 30 days number ofposts
number_posts_30_days = spark.sql("SELECT p.ownerUserId, COUNT(*) as lastMonthNumberOfPosts"
          " FROM posts_sql as p"
          " WHERE (CURRENT_TIMESTAMP - INTERVAL 30 days) <= p.creationDate"
          " GROUP BY p.ownerUserId")

number_posts_30_days.createOrReplaceTempView("number_posts_30_days_sql")

final_users = spark.sql(" SELECT tmp.id, tmp.reputation, tmp.creationDate, tmp.displayName, tmp.emailHash,"
                                    " tmp.lastAccessDate, tmp.websiteUrl, tmp.location, tmp.age, tmp.aboutMe, tmp.views,"
                                    " tmp.upVotes, tmp.downVotes, tmp.profileImageUrl, tmp.accountId, tmp.totalNumberOfPosts,"
                                    " tmp.critic, tmp.editor, COALESCE(tmp.lastMonthNumberOfPosts,0) as lastMonthNumberOfPosts"
                                     " FROM (new_users_3_sql LEFT OUTER JOIN number_posts_30_days_sql"
                                     "  on new_users_3_sql.id == number_posts_30_days_sql.ownerUserId) as tmp")

final_users.createOrReplaceTempView("final_users_sql")

final_users.show(3)

+---+----------+--------------------+-------------+---------+--------------------+--------------------+-------------+----+--------------------+-----+-------+---------+---------------+---------+------------------+------+------+----------------------+
| id|reputation|        creationDate|  displayName|emailHash|      lastAccessDate|          websiteUrl|     location| age|             aboutMe|views|upVotes|downVotes|profileImageUrl|accountId|totalNumberOfPosts|critic|editor|lastMonthNumberOfPosts|
+---+----------+--------------------+-------------+---------+--------------------+--------------------+-------------+----+--------------------+-----+-------+---------+---------------+---------+------------------+------+------+----------------------+
|148|       440|2011-05-25 13:13:...|        scdmb|     null|2016-08-14 12:04:...|                    |         null|null|                null|    4|     12|        0|           null|   383149|                 0|     0|     0|                     0|


In [7]:
# writing to local database

# TODO pass your database properties here
properties = {
    'driver': 'org.postgresql.Driver',
    'url': 'jdbc:postgresql:stackoverflow',
    'user': 'postgres',
    'password': 'testpassword',
    'dbtable': 'users',
}

final_users.write \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .mode('append') \
    .save()


In [8]:
# reading from local database

df_local = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('user', properties['user']) \
    .option('password', properties['password']) \
    .option('dbtable', properties['dbtable']) \
    .load()

df_local.select('*').show(3)
df_local.count()


+---+----------+--------------------+-----------+---------+--------------------+----------+--------+----+-------+-----+-------+---------+---------------+---------+------------------+------+------+----------------------+
| id|reputation|        creationDate|displayName|emailHash|      lastAccessDate|websiteUrl|location| age|aboutMe|views|upVotes|downVotes|profileImageUrl|accountId|totalNumberOfPosts|critic|editor|lastMonthNumberOfPosts|
+---+----------+--------------------+-----------+---------+--------------------+----------+--------+----+-------+-----+-------+---------+---------------+---------+------------------+------+------+----------------------+
|243|       101|2011-06-01 13:18:...| Binarylife|     null|2018-07-06 07:21:...|      null|    null|null|   null|    0|      0|        0|           null|   263377|                 0|     0|     0|                     0|
| 85|         1|2011-05-24 23:26:...|      Marco|     null|2011-05-24 23:26:...|      null|    null|null|   null|    0| 

238

In [54]:
# writing to remote  database

# TODO pass your database properties here
properties_remote = {
    'driver': 'org.postgresql.Driver',
    'url': 'jdbc:postgresql:stackoverflow',
    'user': 'postgres',
    'password': 'testpassword',
    'dbtable': 'users',
}

final_users.write \
    .format('jdbc') \
    .option('driver', properties_remote['driver']) \
    .option('url', properties_remote['url']) \
    .option('user', properties_remote['user']) \
    .option('password', properties_remote['password']) \
    .option('dbtable', properties_remote['dbtable']) \
    .mode('append') \
    .save()


In [59]:
# read from temote database

df_remote = spark.read \
    .format('jdbc') \
    .option('driver', properties_remote['driver']) \
    .option('url', properties_remote['url']) \
    .option('user', properties_remote['user']) \
    .option('password', properties_remote['password']) \
    .option('dbtable', properties_remote['dbtable']) \
    .load()


df_remote.select('*').show(3)
print(df_remote.count())
df_remote.select('*').where(col('id') == 8).show()

+---+----------+--------------------+-------------+---------+--------------------+--------------------+-------------+----+--------------------+-----+-------+---------+---------------+---------+------------------+------+------+----------------------+
| id|reputation|        creationDate|  displayName|emailHash|      lastAccessDate|          websiteUrl|     location| age|             aboutMe|views|upVotes|downVotes|profileImageUrl|accountId|totalNumberOfPosts|critic|editor|lastMonthNumberOfPosts|
+---+----------+--------------------+-------------+---------+--------------------+--------------------+-------------+----+--------------------+-----+-------+---------+---------------+---------+------------------+------+------+----------------------+
|148|       440|2011-05-25 13:13:...|        scdmb|     null|2016-08-14 12:04:...|                    |         null|null|                null|    4|     12|        0|           null|   383149|                 0|     0|     0|                     0|


238


+---+----------+--------------------+-----------+---------+--------------------+----------------+--------------------+----+--------------------+-----+-------+---------+--------------------+---------+------------------+------+------+----------------------+
| id|reputation|        creationDate|displayName|emailHash|      lastAccessDate|      websiteUrl|            location| age|             aboutMe|views|upVotes|downVotes|     profileImageUrl|accountId|totalNumberOfPosts|critic|editor|lastMonthNumberOfPosts|
+---+----------+--------------------+-----------+---------+--------------------+----------------+--------------------+----+--------------------+-----+-------+---------+--------------------+---------+------------------+------+------+----------------------+
|  8|      4889|2011-05-24 19:27:...|   Pekka 웃|     null|2018-07-06 08:12:...|http://pekka.net|Tabayesco, Lanzar...|null|<p>Web developer,...|  366|    206|        1|https://i.stack.i...|    63368|                 7|     1|     1| 