### Initialize pyspark

In [1]:
import findspark
findspark.init()
import pyspark

### Initialize and create a spark session

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

### Register temp table

In [3]:
dfTags = (spark
            .read
            .options(header=True, inferSchema=True)
            .csv("..\\Resources\\question_tags_10K.csv")
            .toDF("id", "tag")
         )

In [4]:
dfTags.createOrReplaceTempView("so_tags")

### List all tables in Spark's catalog

In [6]:
spark.catalog.listTables().

[Table(name='so_tags', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

### List all tables in Spark's catalog using Spark SQL

In [7]:
spark.sql("show tables").show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |  so_tags|       true|
+--------+---------+-----------+



### Select columns

In [8]:
# dfTags.select("id", "tag").limit(10).show()

In [9]:
spark.sql("select id, tag from so_tags limit 10").show(5)

+---+---------------+
| id|            tag|
+---+---------------+
|  1|           data|
|  4|             c#|
|  4|       winforms|
|  4|type-conversion|
|  4|        decimal|
+---+---------------+
only showing top 5 rows



### Filter by column value

In [10]:
# dfTags.filter("tag == 'php'").show(10)

In [11]:
spark.sql("select * from so_tags where tag = 'php'").show(5)

+---+---+
| id|tag|
+---+---+
| 23|php|
| 42|php|
| 85|php|
|126|php|
|146|php|
+---+---+
only showing top 5 rows



### Count number of rows

In [12]:
# print("Number of php tags = {}".format(dfTags.filter("tag == 'php'").count()))

In [14]:
spark.sql("""select count(*) as php_count from so_tags where tag='php'""").show(5)

+---------+
|php_count|
+---------+
|      133|
+---------+



### SQL like

In [15]:
# dfTags.filter("tag like 's%'").show(10)

In [17]:
spark.sql("""select * from so_tags where tag like 's%'""").show(5)

+---+-------------+
| id|          tag|
+---+-------------+
| 25|      sockets|
| 36|          sql|
| 36|   sql-server|
| 40| structuremap|
| 48|submit-button|
+---+-------------+
only showing top 5 rows



### SQL where with and clause

In [18]:
# dfTags.filter("tag like 's%'").filter("id == 25 or id == 108").show(10)

In [21]:
spark.sql("""select * from so_tags where tag like 's%' and (id = 25 or id = 108)""").show(5)

+---+-------+
| id|    tag|
+---+-------+
| 25|sockets|
|108|    svn|
+---+-------+



### SQL IN clause

In [22]:
# dfTags.filter("id in (25, 108)").show(10)

In [23]:
spark.sql("""select * from so_tags where id in (25, 108)""").show(5)

+---+---------+
| id|      tag|
+---+---------+
| 25|      c++|
| 25|        c|
| 25|  sockets|
| 25|mainframe|
| 25|      zos|
+---+---------+
only showing top 5 rows



### SQL Group By

In [24]:
# dfTags.groupBy("tag").count().show(10)

In [25]:
spark.sql("""select tag, count(*) as count from so_tags group by tag""").show(5)

+-----------+-----+
|        tag|count|
+-----------+-----+
|type-safety|    4|
|    jbutton|    1|
|     iframe|    2|
|  svn-hooks|    2|
|  standards|    7|
+-----------+-----+
only showing top 5 rows



### SQL Group By with having clause

In [26]:
# dfTags.groupBy("tag").count().filter("count > 5").show(10)

In [28]:
spark.sql("""select tag, count(*) as count from so_tags group by tag having count > 5""").show(5)

+-------------+-----+
|          tag|count|
+-------------+-----+
|    standards|    7|
|     keyboard|    8|
|          rss|   12|
|documentation|   15|
|      session|    6|
+-------------+-----+
only showing top 5 rows



### SQL Order by

In [29]:
# dfTags.groupBy("tag").count().filter("count > 5").orderBy("tag").show(10)

In [31]:
spark.sql("""select tag, count(*) as count from so_tags group by tag having count > 5 order by tag""").show(5)

+--------------+-----+
|           tag|count|
+--------------+-----+
|          .net|  351|
|      .net-2.0|   14|
|      .net-3.5|   30|
|         64bit|    7|
|actionscript-3|   22|
+--------------+-----+
only showing top 5 rows



### Typed dataframe, filter and temp table

In [32]:
dfQuestionsCSV = (spark
                    .read
                    .options(header=True, inferSchema=True, dateFormat="yyyy-MM-dd HH:mm:ss")
                    .csv("..\\Resources\\questions_10K.csv")
                    .toDF("id", "creation_date", "closed_date", "deletion_date", "score", "owner_userid", "answer_count")
                 )

#### cast columns to data types

In [33]:
from pyspark.sql.functions import col

In [34]:
dfQuestions = dfQuestionsCSV.select(
                                col("id").cast("integer"),
                                col("creation_date").cast("timestamp"),
                                col("closed_date").cast("timestamp"),
                                col("deletion_date").cast("date"),
                                col("score").cast("integer"),
                                col("owner_userid").cast("integer"),
                                col("answer_count").cast("integer")
                                )

#### filter dataframe

In [36]:
dfQuestionsSubset = dfQuestions.filter("score > 400 and score < 410")

#### register temp table

In [37]:
dfQuestionsSubset.createOrReplaceTempView("so_questions")

### SQL Inner Join

In [43]:
# dfQuestionsSubset.join(dfTags, "id", "inner").show(10)

In [39]:
spark.sql("""select t.*, q.* from so_questions q inner join so_tags t on t.id = q.id""").show(5)

+---+---------+---+-------------------+-------------------+-------------+-----+------------+------------+
| id|      tag| id|      creation_date|        closed_date|deletion_date|score|owner_userid|answer_count|
+---+---------+---+-------------------+-------------------+-------------+-----+------------+------------+
|888|   xdebug|888|2008-08-04 04:48:21|2016-08-04 14:52:00|         null|  405|         131|          30|
|888| phpstorm|888|2008-08-04 04:48:21|2016-08-04 14:52:00|         null|  405|         131|          30|
|888|debugging|888|2008-08-04 04:48:21|2016-08-04 14:52:00|         null|  405|         131|          30|
|888|  eclipse|888|2008-08-04 04:48:21|2016-08-04 14:52:00|         null|  405|         131|          30|
|888|      php|888|2008-08-04 04:48:21|2016-08-04 14:52:00|         null|  405|         131|          30|
+---+---------+---+-------------------+-------------------+-------------+-----+------------+------------+
only showing top 5 rows



### SQL Left Outer Join

In [44]:
# dfQuestionsSubset.join(dfTags, "id", "left_outer").show(10)

In [42]:
spark.sql("""select t.*, q.* from so_questions q left outer join so_tags t on t.id = q.id""").show(5)

+---+---------+---+-------------------+-------------------+-------------+-----+------------+------------+
| id|      tag| id|      creation_date|        closed_date|deletion_date|score|owner_userid|answer_count|
+---+---------+---+-------------------+-------------------+-------------+-----+------------+------------+
|888|   xdebug|888|2008-08-04 04:48:21|2016-08-04 14:52:00|         null|  405|         131|          30|
|888| phpstorm|888|2008-08-04 04:48:21|2016-08-04 14:52:00|         null|  405|         131|          30|
|888|debugging|888|2008-08-04 04:48:21|2016-08-04 14:52:00|         null|  405|         131|          30|
|888|  eclipse|888|2008-08-04 04:48:21|2016-08-04 14:52:00|         null|  405|         131|          30|
|888|      php|888|2008-08-04 04:48:21|2016-08-04 14:52:00|         null|  405|         131|          30|
+---+---------+---+-------------------+-------------------+-------------+-----+------------+------------+
only showing top 5 rows



### SQL Right Outer Join

In [45]:
# dfTags.join(dfQuestionsSubset, "id", "right_outer").show(10)

In [46]:
spark.sql("""select t.*, q.* from so_questions q right outer join so_tags t on t.id = q.id""").show(5)

+---+---------------+----+-------------+-----------+-------------+-----+------------+------------+
| id|            tag|  id|creation_date|closed_date|deletion_date|score|owner_userid|answer_count|
+---+---------------+----+-------------+-----------+-------------+-----+------------+------------+
|  1|           data|null|         null|       null|         null| null|        null|        null|
|  4|             c#|null|         null|       null|         null| null|        null|        null|
|  4|       winforms|null|         null|       null|         null| null|        null|        null|
|  4|type-conversion|null|         null|       null|         null| null|        null|        null|
|  4|        decimal|null|         null|       null|         null| null|        null|        null|
+---+---------------+----+-------------+-----------+-------------+-----+------------+------------+
only showing top 5 rows



### Register User Defined Function (UDF) Function to prefix a String with so_ short for StackOverflow

In [47]:
def prefixStackoverflow(s):
    return "so_" + s

#### Register User Defined Function (UDF)

In [49]:
spark.udf.register("prefix_so", prefixStackoverflow)

<function __main__.prefixStackoverflow(s)>

#### Use udf prefixso to augment each tag value with so

In [50]:
spark.sql("""select id, prefix_so(tag) from so_tags""").show(5)

+---+------------------+
| id|    prefix_so(tag)|
+---+------------------+
|  1|           so_data|
|  4|             so_c#|
|  4|       so_winforms|
|  4|so_type-conversion|
|  4|        so_decimal|
+---+------------------+
only showing top 5 rows



### SQL Distinct

In [52]:
spark.sql("""select distinct tag from so_tags""").show(5)

+-----------+
|        tag|
+-----------+
|type-safety|
|    jbutton|
|     iframe|
|  svn-hooks|
|  standards|
+-----------+
only showing top 5 rows



### Closing Spark Session

In [53]:
spark.stop()