# Basics

   ## Spark Session

Start the session to create a single entry point for intertacting with teh underlying Spark engine. This session defines how you will interact with Spark, allowing you to define the name and custom configuration settings.

In [1]:
from pyspark.sql import SparkSession


spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Read & Show

Load a JSON file into a dataframe and look to see if it's what you're expecting

In [25]:
comments_df = spark.read.json("./data/RC_2008-01.json")

In [26]:
comments_df.show(2)

+--------+---------+----------------------+-----------------+--------------------+----------------+-----------+-------------+-----+------+------+-------+--------+----------+----------+------------+-----+------------+----------+------------+---+
|archived|   author|author_flair_css_class|author_flair_text|                body|controversiality|created_utc|distinguished|downs|edited|gilded|     id| link_id|      name| parent_id|retrieved_on|score|score_hidden| subreddit|subreddit_id|ups|
+--------+---------+----------------------+-----------------+--------------------+----------------+-----------+-------------+-----+------+------+-------+--------+----------+----------+------------+-----+------------+----------+------------+---+
|    true|    Haven|                  null|             null|Wow, you're a buz...|               0| 1199145604|         null|    0| false|     0|c02s9s6|t3_648oh|t1_c02s9s6|t1_c02s9rv|  1425820157|    4|       false|reddit.com|        t5_6|  4|
|    true|[deleted]|

Print the schema of your new 

In [27]:
comments_df.printSchema()

root
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- downs: long (nullable = true)
 |-- edited: string (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- score_hidden: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- ups: long (nullable = true)



## Select

Reduce the fields to something more managable, but stil useful

In [28]:
trimmed_comments = comments_df.select('author','body', 'created_utc', 'subreddit', 'subreddit_id', 'parent_id', 'link_id', 'id')

In [29]:
trimmed_comments.show()

+-------------------+--------------------+-----------+-----------+------------+----------+--------+-------+
|             author|                body|created_utc|  subreddit|subreddit_id| parent_id| link_id|     id|
+-------------------+--------------------+-----------+-----------+------------+----------+--------+-------+
|              Haven|Wow, you're a buz...| 1199145604| reddit.com|        t5_6|t1_c02s9rv|t3_648oh|c02s9s6|
|          [deleted]|Time to hang out ...| 1199145619| reddit.com|        t5_6|t1_c02s8c8|t3_647ht|c02s9s7|
|           lilmiss2|You can march for...| 1199145620| reddit.com|        t5_6|t1_c02s9rv|t3_648oh|c02s9s8|
|          [deleted]|           [deleted]| 1199145623| reddit.com|        t5_6|  t3_648oo|t3_648oo|c02s9s9|
|          [deleted]|           [deleted]| 1199145632| reddit.com|        t5_6|  t3_648et|t3_648et|c02s9sa|
|          [deleted]|           [deleted]| 1199145643|   politics|    t5_2cneq|  t3_648iy|t3_648iy|c02s9sb|
|EverybodysAnAsshole|Damn, I

## Filter and Where

Remove the deleted comments

In [30]:
valid_comments = trimmed_comments.filter(trimmed_comments['body'] != '[deleted]')

In [31]:
valid_comments.show()

+-------------------+--------------------+-----------+-----------+------------+----------+--------+-------+
|             author|                body|created_utc|  subreddit|subreddit_id| parent_id| link_id|     id|
+-------------------+--------------------+-----------+-----------+------------+----------+--------+-------+
|              Haven|Wow, you're a buz...| 1199145604| reddit.com|        t5_6|t1_c02s9rv|t3_648oh|c02s9s6|
|          [deleted]|Time to hang out ...| 1199145619| reddit.com|        t5_6|t1_c02s8c8|t3_647ht|c02s9s7|
|           lilmiss2|You can march for...| 1199145620| reddit.com|        t5_6|t1_c02s9rv|t3_648oh|c02s9s8|
|EverybodysAnAsshole|Damn, I'm really ...| 1199145644| reddit.com|        t5_6|t1_c02s976|t3_648et|c02s9sc|
|           generalk|Wait, what? 

You...| 1199145647|programming|     t5_2fwo|t1_c02s8md|t3_647yd|c02s9sd|
|          seeker135|And if you sincer...| 1199145650|   politics|    t5_2cneq|  t3_6483n|t3_6483n|c02s9se|
|              plehu|Not in 

## Count

Calculate how many comments you now have, and how many you deleted

In [32]:
valid_comments.count()

390424

In [33]:
comments_df.count()

452990

In [34]:
comments_df.count()-valid_comments.count()

62566

In [35]:
politics = valid_comments.where(valid_comments['subreddit'] == 'politics')

In [36]:
politics.show()

+---------------+--------------------+-----------+---------+------------+----------+--------+-------+
|         author|                body|created_utc|subreddit|subreddit_id| parent_id| link_id|     id|
+---------------+--------------------+-----------+---------+------------+----------+--------+-------+
|      seeker135|And if you sincer...| 1199145650| politics|    t5_2cneq|  t3_6483n|t3_6483n|c02s9se|
|    elasticsoul|What organization...| 1199145728| politics|    t5_2cneq|t1_c02s9e2|t3_6483n|c02s9so|
|LiberalDemocrat|NO HIS PROBLEM IS...| 1199145817| politics|    t5_2cneq|  t3_648os|t3_648os|c02s9t4|
|LiberalDemocrat| **HILL BABY IN 08**| 1199145837| politics|    t5_2cneq|  t3_648lz|t3_648lz|c02s9t6|
| generic_handle|war. n.

     1. ...| 1199145849| politics|    t5_2cneq|t1_c02s9e2|t3_6483n|c02s9t7|
|   NoFixedAbode|Frankly the artic...| 1199145889| politics|    t5_2cneq|t1_c02s830|t3_6481i|c02s9te|
|      [deleted]|OK I educated mys...| 1199145909| politics|    t5_2cneq|t1_c02s9e

## SQL

Use SQL to see the comments of one user

In [37]:
politics.createOrReplaceTempView("politics")

In [38]:
DiscoWolf = spark.sql("select * from politics where author like 'DiscoWolf'")

In [39]:
DiscoWolf.show()

+---------+--------------------+-----------+---------+------------+---------+--------+-------+
|   author|                body|created_utc|subreddit|subreddit_id|parent_id| link_id|     id|
+---------+--------------------+-----------+---------+------------+---------+--------+-------+
|DiscoWolf|An anagram for th...| 1199146090| politics|    t5_2cneq| t3_648os|t3_648os|c02s9u6|
|DiscoWolf|Adding, "I never ...| 1199317859| politics|    t5_2cneq| t3_64c5v|t3_64c5v|c02srdg|
+---------+--------------------+-----------+---------+------------+---------+--------+-------+



In [40]:
zyzzogeton = spark.sql("select body from politics where author like 'zyzzogeton'")

In [41]:
zyzzogeton.show()

+--------------------+
|                body|
+--------------------+
|So who is respons...|
|1st, 10th, 100th....|
|This is a stupid ...|
|Well, the Great W...|
|The Berlin Wall w...|
|I bet Haliburton ...|
|He has 80 million...|
|That was English....|
|More like the evi...|
| I wanna be anarchy.|
|Back when Bush Sr...|
|DEMOCRATS:

Obama...|
|The exchange rate...|
|In more ways than...|
|When Kucinich and...|
|Those kinds of qu...|
|Tax them all and ...|
|Excellent use of ...|
|It is agreed.  On...|
|This was the grea...|
+--------------------+
only showing top 20 rows



# Aggregation

## GroupBy

Group by Subreddit

In [42]:
valid_comments.groupBy('subreddit').show()

AttributeError: 'GroupedData' object has no attribute 'show'

In [43]:
valid_comments.groupBy('subreddit')

<pyspark.sql.group.GroupedData at 0x7f8970eed278>

The groupby by itself does not return a dataframe. Add an aggregation to create a dataframe.

In [44]:
valid_comments.groupBy('subreddit').count()

DataFrame[subreddit: string, count: bigint]

In [24]:
valid_comments.groupBy('subreddit').count().show()

+------------+-----+
|   subreddit|count|
+------------+-----+
|          sl|   23|
|          pl|    7|
|       arxiv|    4|
| programming| 7135|
|lipstick.com|   97|
|  reddit.com|65652|
|          tr|   30|
|          de|   24|
|          es|    2|
|        nsfw|    7|
|     request|   34|
|     science| 1888|
|          it|   11|
|          sv|    2|
|          nl|    2|
| freeculture|   96|
|          ru|   15|
|    features|  148|
|        joel|   53|
|          zh|    1|
+------------+-----+
only showing top 20 rows



## OrderBy & Sort

Find the # of comments by subreddit 

In [45]:
valid_comments.groupBy('subreddit').count().show()

+------------+-----+
|   subreddit|count|
+------------+-----+
|       anime|    7|
|      iphone|    1|
|          vi|    9|
|productivity|    1|
|   AskReddit|  130|
|      videos|    4|
|        digg|    4|
|       4chan|    3|
|    browsers|    1|
|          sl|    1|
|     lolcats|   12|
|  web_design|    1|
|     haskell|    2|
|  philosophy|   24|
|     offbeat|   23|
|    torrents|   28|
|        l33t|   13|
|       apple|    4|
|        auto|    2|
|  technology|    9|
+------------+-----+
only showing top 20 rows



Find the most popular subreddits

In [46]:
from pyspark.sql.functions import desc

In [47]:
valid_comments.groupBy('subreddit').count().sort(desc("count")).show()

+-------------+------+
|    subreddit| count|
+-------------+------+
|   reddit.com|195845|
|     politics|131367|
|  programming| 35834|
|      science| 16304|
|entertainment|  3279|
|     business|  2262|
|       gaming|  1941|
|      gadgets|  1194|
|       sports|   535|
|       netsec|   244|
|         pics|   224|
|         nsfw|   209|
|         joel|   155|
|    AskReddit|   130|
|       cogsci|    71|
|         ruby|    65|
|           ja|    58|
|         math|    50|
|       comics|    41|
|      ronpaul|    38|
+-------------+------+
only showing top 20 rows



Find the most popular threads and their subreddits

In [48]:
valid_comments.groupBy('subreddit', 'parent_id').count().sort(desc("count")).show()

+-----------+---------+-----+
|  subreddit|parent_id|count|
+-----------+---------+-----+
| reddit.com| t3_675oj|  561|
| reddit.com| t3_66hta|  236|
| reddit.com| t3_64xuh|  177|
| reddit.com| t3_667q4|  166|
|   politics| t3_65s21|  153|
|   politics| t3_64ht4|  147|
| reddit.com| t3_649s2|  147|
|   politics| t3_674tn|  136|
| reddit.com| t3_66aa9|  135|
|   politics| t3_65t90|  133|
| reddit.com| t3_669yz|  132|
| reddit.com| t3_65o58|  127|
| reddit.com| t3_66xys|  124|
|   politics| t3_66mmu|  123|
| reddit.com| t3_648et|  119|
|   politics| t3_65ecx|  118|
| reddit.com| t3_66idx|  116|
| reddit.com| t3_66uxr|  115|
|programming| t3_666gt|  113|
| reddit.com| t3_65zjo|  112|
+-----------+---------+-----+
only showing top 20 rows



# Data Calculations

## User Defined Functions

In [63]:
import nltk
from textblob import TextBlob, Word
from pyspark.sql.functions import explode, split, lit, col, countDistinct

from nltk.corpus import stopwords

STOPWORDS = stopwords.words('english')

for w in ['http',"\'s","n\'t","\'re","\'m"]:
    STOPWORDS.append(w)

def tokenize(text):
    tokens = []
    blob = TextBlob(text.strip().lower())
    for word in blob.words:
        if word not in STOPWORDS:
            tokens.append(str(word))
    return tokens

In [64]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, IntegerType
tokenize_udf = udf(tokenize, ArrayType(StringType()))

In [65]:
tokenized = politics.select('body', tokenize_udf('body'))

In [66]:
words = tokenized.select(explode(tokenized[1]).alias('words'))

In [67]:
words_by_count =words.groupBy('words').count().orderBy(desc("count"))

In [68]:
words_by_count.show()

+----------+-----+
|     words|count|
+----------+-----+
|    people|24459|
|     would|23418|
|      like|18268|
|        gt|18078|
|      paul|17184|
|     think|15494|
|       one|14972|
|       get|12675|
|        us|10844|
|       ron|10744|
|      know| 9930|
|      even| 9620|
|government| 8944|
|     right| 8676|
|       see| 8326|
|      good| 8122|
|      time| 8096|
|      make| 8076|
|      much| 8035|
|    really| 8028|
+----------+-----+
only showing top 20 rows



## WithColumn

Add column for the polarity of a comment

In [72]:
def get_polarity(text):
    blob = TextBlob(text)
    return blob.sentiment.polarity

In [73]:
get_polarity_udf = udf(get_polarity, FloatType())

NameError: name 'FloatType' is not defined

Remember to import types

In [76]:
from pyspark.sql.types import FloatType

In [77]:
get_polarity_udf = udf(get_polarity, FloatType())

In [78]:
valid_comments_with_polarity = valid_comments.withColumn("polarity",get_polarity_udf('body'))

In [79]:
valid_comments_with_polarity.show()

+-------------------+--------------------+-----------+-----------+------------+----------+--------+-------+------------+
|             author|                body|created_utc|  subreddit|subreddit_id| parent_id| link_id|     id|    polarity|
+-------------------+--------------------+-----------+-----------+------------+----------+--------+-------+------------+
|              Haven|Wow, you're a buz...| 1199145604| reddit.com|        t5_6|t1_c02s9rv|t3_648oh|c02s9s6|         0.1|
|          [deleted]|Time to hang out ...| 1199145619| reddit.com|        t5_6|t1_c02s8c8|t3_647ht|c02s9s7|         0.0|
|           lilmiss2|You can march for...| 1199145620| reddit.com|        t5_6|t1_c02s9rv|t3_648oh|c02s9s8|         0.3|
|EverybodysAnAsshole|Damn, I'm really ...| 1199145644| reddit.com|        t5_6|t1_c02s976|t3_648et|c02s9sc|  0.06666667|
|           generalk|Wait, what? 

You...| 1199145647|programming|     t5_2fwo|t1_c02s8md|t3_647yd|c02s9sd| 0.102678575|
|          seeker135|And if you 

## Pivot

Show # average polarity by subreddit

## Map

## FlatMap