In [1]:
from pyspark.sql.functions import *
import time
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [2]:
text_file = sc.textFile("gs://stackoverflow-dataset-677/Comments.xml")

In [3]:
text_file.take(3)

['<?xml version="1.0" encoding="utf-8"?>',
 '<comments>',
 '  <row Id="13" PostId="23" Score="0" Text="Using /opt helps me keep track of the applications I\'ve installed myself." CreationDate="2010-07-28T19:36:59.773" UserId="10" ContentLicense="CC BY-SA 2.5" />']

In [4]:
filteredRDD = text_file.filter(lambda x: x.startswith("  <row "))

In [5]:
filteredRDD.take(1)

['  <row Id="13" PostId="23" Score="0" Text="Using /opt helps me keep track of the applications I\'ve installed myself." CreationDate="2010-07-28T19:36:59.773" UserId="10" ContentLicense="CC BY-SA 2.5" />']

In [6]:
cleanedRDD = filteredRDD.map(lambda x: x.lstrip("  "))

In [7]:
cleanedRDD.take(1)

['<row Id="13" PostId="23" Score="0" Text="Using /opt helps me keep track of the applications I\'ve installed myself." CreationDate="2010-07-28T19:36:59.773" UserId="10" ContentLicense="CC BY-SA 2.5" />']

In [8]:
import xml.etree.ElementTree as ET

def parse_xml(rdd):
    """
    Read the xml string from rdd, parse and extract the elements,
    then return a list of list.
    """
    root = ET.fromstring(rdd)
    rec = []
    
    if "PostId" in root.attrib:
        rec.append(int(root.attrib['PostId']))
    else:
        rec.append(0)

    
    if "Score" in root.attrib:
        rec.append(int(root.attrib['Score']))
    else:
        rec.append(0)

    
    if "Text" in root.attrib:
        rec.append(root.attrib['Text'])
    else:
        rec.append("N/A")
    
    if "CreationDate" in root.attrib:
        rec.append(root.attrib['CreationDate'])
    else:
        rec.append("N/A")

    if "UserId" in root.attrib:
        rec.append(int(root.attrib['UserId']))
    else:
        rec.append(0)
    return rec

In [9]:
records_rdd = cleanedRDD.map(lambda x : parse_xml(x))

In [10]:
comments_data = ["postId","score","text","creationDate","userId"]
comments_df = records_rdd.toDF(comments_data)

In [11]:
comments_df.printSchema()

root
 |-- postId: long (nullable = true)
 |-- score: long (nullable = true)
 |-- text: string (nullable = true)
 |-- creationDate: string (nullable = true)
 |-- userId: long (nullable = true)



In [12]:
comments_df.show()

+------+-----+--------------------+--------------------+------+
|postId|score|                text|        creationDate|userId|
+------+-----+--------------------+--------------------+------+
|    23|    0|Using /opt helps ...|2010-07-28T19:36:...|    10|
|    18|    0|but popping in a ...|2010-07-28T19:38:...|    10|
|    27|    0|That will revert ...|2010-07-28T19:39:...|    50|
|    31|    0|I think you meant...|2010-07-28T19:41:...|    12|
|    18|    0|@DLH apparently n...|2010-07-28T19:41:...|    63|
|    12|    2|"ssh -X <server> ...|2010-07-28T19:46:...|    96|
|    12|    0|@Suppressingfire:...|2010-07-28T19:48:...|    10|
|    50|    0|Can you please re...|2010-07-28T19:48:...|    56|
|    27|    0|It probably shoul...|2010-07-28T19:49:...|     5|
|    58|    0|Do you mean the c...|2010-07-28T19:50:...|     5|
|    47|    0|Have you checked ...|2010-07-28T19:50:...|     4|
|    47|    1|Might be related ...|2010-07-28T19:51:...|   104|
|    58|    0|Do you use Gnome ...|2010-

In [13]:
trending_users = comments_df.groupBy("userId") \
    .agg(count("text").alias("count")) \
    .where(col("userId") > 0) \
    .orderBy(col('count').desc())

In [14]:
trending_users.show()

+------+-----+
|userId|count|
+------+-----+
|167850|14677|
|  4272|12192|
|158442|12091|
| 15811|10505|
|175814| 8835|
|307523| 7861|
|469152| 7567|
|178692| 7523|
| 35795| 7515|
|126395| 7398|
|295286| 6757|
| 19421| 6679|
|344926| 6268|
| 19626| 6164|
| 22949| 6136|
| 10616| 6124|
|225694| 5736|
| 94914| 5692|
|459561| 5346|
| 72216| 5314|
+------+-----+
only showing top 20 rows



In [13]:
comments_df.createOrReplaceTempView("comments")

In [14]:
comments_sql_df = spark.sql("SELECT * FROM comments")

In [15]:
comments_sql_df.printSchema()

root
 |-- postId: long (nullable = true)
 |-- score: long (nullable = true)
 |-- text: string (nullable = true)
 |-- creationDate: string (nullable = true)
 |-- userId: long (nullable = true)



In [16]:
comments_sql_df.show()

+------+-----+--------------------+--------------------+------+
|postId|score|                text|        creationDate|userId|
+------+-----+--------------------+--------------------+------+
|    23|    0|Using /opt helps ...|2010-07-28T19:36:...|    10|
|    18|    0|but popping in a ...|2010-07-28T19:38:...|    10|
|    27|    0|That will revert ...|2010-07-28T19:39:...|    50|
|    31|    0|I think you meant...|2010-07-28T19:41:...|    12|
|    18|    0|@DLH apparently n...|2010-07-28T19:41:...|    63|
|    12|    2|"ssh -X <server> ...|2010-07-28T19:46:...|    96|
|    12|    0|@Suppressingfire:...|2010-07-28T19:48:...|    10|
|    50|    0|Can you please re...|2010-07-28T19:48:...|    56|
|    27|    0|It probably shoul...|2010-07-28T19:49:...|     5|
|    58|    0|Do you mean the c...|2010-07-28T19:50:...|     5|
|    47|    0|Have you checked ...|2010-07-28T19:50:...|     4|
|    47|    1|Might be related ...|2010-07-28T19:51:...|   104|
|    58|    0|Do you use Gnome ...|2010-

In [42]:
spark.sql("SELECT * FROM comments where userId > 0").show()

+------+-----+--------------------+--------------------+------+
|postId|score|                text|        creationDate|userId|
+------+-----+--------------------+--------------------+------+
|    23|    0|Using /opt helps ...|2010-07-28T19:36:...|    10|
|    18|    0|but popping in a ...|2010-07-28T19:38:...|    10|
|    27|    0|That will revert ...|2010-07-28T19:39:...|    50|
|    31|    0|I think you meant...|2010-07-28T19:41:...|    12|
|    18|    0|@DLH apparently n...|2010-07-28T19:41:...|    63|
|    12|    2|"ssh -X <server> ...|2010-07-28T19:46:...|    96|
|    12|    0|@Suppressingfire:...|2010-07-28T19:48:...|    10|
|    50|    0|Can you please re...|2010-07-28T19:48:...|    56|
|    27|    0|It probably shoul...|2010-07-28T19:49:...|     5|
|    58|    0|Do you mean the c...|2010-07-28T19:50:...|     5|
|    47|    0|Have you checked ...|2010-07-28T19:50:...|     4|
|    47|    1|Might be related ...|2010-07-28T19:51:...|   104|
|    58|    0|Do you use Gnome ...|2010-

In [44]:
spark.sql("SELECT userId,count(*) as count FROM comments group by userId order by count desc").show()

+------+-----+
|userId|count|
+------+-----+
|     0|27141|
|167850|14677|
|  4272|12192|
|158442|12091|
| 15811|10505|
|175814| 8835|
|307523| 7861|
|469152| 7567|
|178692| 7523|
| 35795| 7515|
|126395| 7398|
|295286| 6757|
| 19421| 6679|
|344926| 6268|
| 19626| 6164|
| 22949| 6136|
| 10616| 6124|
|225694| 5736|
| 94914| 5692|
|459561| 5346|
+------+-----+
only showing top 20 rows



In [15]:
trending_users.createOrReplaceTempView("trending_users")

In [16]:
spark.sql("SELECT * FROM trending_users").show()

+------+-----+
|userId|count|
+------+-----+
|167850|14677|
|  4272|12192|
|158442|12091|
| 15811|10505|
|175814| 8835|
|307523| 7861|
|469152| 7567|
|178692| 7523|
| 35795| 7515|
|126395| 7398|
|295286| 6757|
| 19421| 6679|
|344926| 6268|
| 19626| 6164|
| 22949| 6136|
| 10616| 6124|
|225694| 5736|
| 94914| 5692|
|459561| 5346|
| 72216| 5314|
+------+-----+
only showing top 20 rows



In [17]:
users_data = sc.textFile("gs://stackoverflow-dataset-677/users_out/*.csv")

In [18]:
users_data.take(3)

['1,Community', '2,Geoff Dalgas', '3,Jarrod Dixon']

In [19]:
def create_user(rdd):
    rdd_split = rdd.split(",")
    return [int(rdd_split[0]),rdd_split[1]]

In [20]:
users_rdd = users_data.map(lambda x: create_user(x))

In [21]:
users_rdd.take(3)

[[1, 'Community'], [2, 'Geoff Dalgas'], [3, 'Jarrod Dixon']]

In [22]:
user_data = ["id","username"]
user_df = users_rdd.toDF(user_data)

In [23]:
user_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- username: string (nullable = true)



In [24]:
user_df.show()

+---+----------------+
| id|        username|
+---+----------------+
|  1|       Community|
|  2|    Geoff Dalgas|
|  3|    Jarrod Dixon|
|  4|      txwikinger|
|  5|    Nathan Osman|
|  6|          Emmett|
|  7|           Helix|
|  8| mechanical_meat|
|  9|          Andrew|
| 10|             DLH|
| 11|   hannes.koller|
| 12|   Michael Terry|
| 13|   Keith Maurino|
| 14|          Jweede|
| 16|        Jeremy L|
| 17|          tutuca|
| 18|          excid3|
| 20|   ParanoiaPuppy|
| 21|            GeoD|
| 22|Alan Featherston|
+---+----------------+
only showing top 20 rows



In [25]:
user_df.createOrReplaceTempView("users")

In [26]:
user_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- username: string (nullable = true)



In [58]:
spark.sql("SELECT * FROM users where id=26").show()

+---+--------+
| id|username|
+---+--------+
| 26| njpatel|
+---+--------+



In [27]:
trending_usernames = spark.sql("SELECT count,username FROM trending_users tu JOIN users u ON u.id = tu.userId order by count desc limit 10")
trending_usernames.show()

+-----+----------------+
|count|        username|
+-----+----------------+
|14677|          Pilot6|
|12192|        heynnema|
|12091|            muru|
|10505|        Rinzwind|
| 8835|  David Foerster|
| 7861|WinEunuuchs2Unix|
| 7567|         guiverc|
| 7523|     steeldriver|
| 7515|         Panther|
| 7398|         oldfred|
+-----+----------------+



In [29]:
trending_usernames.printSchema()

root
 |-- count: long (nullable = false)
 |-- username: string (nullable = true)



In [None]:
trending_usernames.repartition(1).write.csv("gs://stackoverflow-dataset-677/trending_usernames", sep=',')

In [118]:
comments_users_sql_df = spark.sql("SELECT * FROM users u JOIN comments c ON u.id = c.UserId")
comments_users_sql_df.show()

+----+-------------------+------+-----+--------------------+--------------------+------+
|  id|           username|postId|score|                text|        creationDate|userId|
+----+-------------------+------+-----+--------------------+--------------------+------+
| 964|Hendrik Brummermann|  4602|    0|I can confirm thi...|2010-10-13T21:37:...|   964|
| 964|Hendrik Brummermann|118087|    0|They took it in d...|2012-04-28T06:17:...|   964|
| 964|Hendrik Brummermann|638027|    0|I have the same i...|2015-08-03T13:26:...|   964|
|1677|         eslambasha| 84949|    0|@fossfreedom i do...|2011-12-03T21:56:...|  1677|
|1697|           Frxstrem| 16683|    0|@Marco, I know, I...|2010-12-08T22:36:...|  1697|
|1697|           Frxstrem| 16784|    0|This seems to be ...|2010-12-09T19:05:...|  1697|
|1697|           Frxstrem| 16886|    1|I only want to di...|2010-12-10T22:26:...|  1697|
|1697|           Frxstrem| 16892|    1|This is not an ac...|2010-12-10T22:28:...|  1697|
|1697|           Frxs

In [None]:
user_data = ["id","username"]
user_df = users_rdd.toDF(user_data)

In [119]:
comments_users_sql_df.createOrReplaceTempView("comments_users")

In [121]:
user_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- username: string (nullable = true)

