# Beginning Spark

We will be analyzing clickstream data to learn how to use Spark.

Example:

{"session": "session_36", "domain": "youtube.com", "cost": 118, "user": "user_9", "campaign": "campaign_19", "ip": "ip_4", "action": "clicked", "timestamp": 1420070400000}
{"session": "session_96", "domain": "facebook.com", "cost": 5, "user": "user_5", "campaign": "campaign_12", "ip": "ip_3", "action": "blocked", "timestamp": 1420070400864}


In [25]:
clickstreamDF = spark.read.json("../../data/click-stream/clickstream.json")

In [26]:
clickstreamDF.printSchema()

root
 |-- action: string (nullable = true)
 |-- campaign: string (nullable = true)
 |-- cost: long (nullable = true)
 |-- domain: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- session: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- user: string (nullable = true)



In [27]:
clickstreamDF.show()

+-------+-----------+----+-----------------+----+----------+-------------+------+
| action|   campaign|cost|           domain|  ip|   session|    timestamp|  user|
+-------+-----------+----+-----------------+----+----------+-------------+------+
|clicked|campaign_19| 118|      youtube.com|ip_4|session_36|1420070400000|user_9|
|blocked|campaign_12|   5|     facebook.com|ip_3|session_96|1420070400864|user_5|
|clicked| campaign_3|  54|sf.craigslist.org|ip_9|session_61|1420070401728|user_8|
|blocked|campaign_18| 110|    wikipedia.org|ip_5|session_55|1420070402592|user_6|
|clicked| campaign_6|  15|comedycentral.com|ip_9|session_49|1420070403456|user_4|
|blocked| campaign_9| 139|          cnn.com|ip_8|session_13|1420070404320|user_7|
|blocked| campaign_4| 171|   funnyordie.com|ip_1|session_92|1420070405184|user_9|
|blocked|campaign_17|  20|       amazon.com|ip_4|session_13|1420070406048|user_1|
|blocked|campaign_20|  78|        zynga.com|ip_5|session_36|1420070406912|user_3|
|blocked|campaig

### Query and Filter Operations

Let us see how to do some query and filter operations

In [28]:
clickstreamDF.filter("cost > 100").show()

+-------+-----------+----+-----------------+----+----------+-------------+------+
| action|   campaign|cost|           domain|  ip|   session|    timestamp|  user|
+-------+-----------+----+-----------------+----+----------+-------------+------+
|clicked|campaign_19| 118|      youtube.com|ip_4|session_36|1420070400000|user_9|
|blocked|campaign_18| 110|    wikipedia.org|ip_5|session_55|1420070402592|user_6|
|blocked| campaign_9| 139|          cnn.com|ip_8|session_13|1420070404320|user_7|
|blocked| campaign_4| 171|   funnyordie.com|ip_1|session_92|1420070405184|user_9|
|blocked|campaign_19| 147|      nytimes.com|ip_2|session_65|1420070407776|user_6|
|clicked|campaign_16| 141|    wikipedia.org|ip_1|session_57|1420070409504|user_1|
|blocked| campaign_9| 153|       google.com|ip_3|session_22|1420070413824|user_9|
|blocked| campaign_8| 140|comedycentral.com|ip_8| session_4|1420070414688|user_2|
| viewed|campaign_20| 133|       google.com|ip_9|session_69|1420070416416|user_7|
+-------+-------

In [29]:
clickstreamDF.filter("action == 'clicked'").show()

+-------+-----------+----+-----------------+----+----------+-------------+------+
| action|   campaign|cost|           domain|  ip|   session|    timestamp|  user|
+-------+-----------+----+-----------------+----+----------+-------------+------+
|clicked|campaign_19| 118|      youtube.com|ip_4|session_36|1420070400000|user_9|
|clicked| campaign_3|  54|sf.craigslist.org|ip_9|session_61|1420070401728|user_8|
|clicked| campaign_6|  15|comedycentral.com|ip_9|session_49|1420070403456|user_4|
|clicked|campaign_16| 141|    wikipedia.org|ip_1|session_57|1420070409504|user_1|
|clicked| campaign_5|  74|       sfgate.com|ip_6|session_27|1420070412096|user_3|
+-------+-----------+----+-----------------+----+----------+-------------+------+



In [30]:
clickstreamDF.groupBy("domain").count().show()

+-----------------+-----+
|           domain|count|
+-----------------+-----+
|      nytimes.com|    1|
|      youtube.com|    2|
|        zynga.com|    1|
|       google.com|    2|
|        yahoo.com|    1|
|     facebook.com|    1|
|          cnn.com|    1|
|    wikipedia.org|    3|
|       sfgate.com|    1|
|       amazon.com|    2|
|   funnyordie.com|    1|
|sf.craigslist.org|    2|
|comedycentral.com|    2|
+-----------------+-----+




## STEP 7 : Joining Dataframes

Let's load another data set `domain info`
The data is in   `data/click-stream/domain-info.json`
The data looks like this:

```console

    {"domain":"amazon.com","category":"SHOPPING"}
    {"domain":"bbc.co.uk","category":"NEWS"}
    {"domain":"facebook.com","category":"SOCIAL"}
    ...
```

In [31]:
domainsDF = spark.read.json("../../data/click-stream/domain-info.json")
domainsDF.show()

+-----------+-----------------+
|   category|           domain|
+-----------+-----------------+
|   SHOPPING|       amazon.com|
|       NEWS|        bbc.co.uk|
|     SOCIAL|     facebook.com|
|      PHOTO|       flickr.com|
|       NEWS|      foxnews.com|
|     SEARCH|       google.com|
|       NEWS|          npr.org|
|       NEWS|      nytimes.com|
|CLASSIFIEDS|sf.craigslist.org|
|       NEWS|       sfgate.com|
|     SOCIAL|      twitter.com|
|       NEWS|     usatoday.com|
|  REFERENCE|    wikipedia.org|
|     SEARCH|        yahoo.com|
|      VIDEO|      youtube.com|
+-----------+-----------------+



In [32]:
# Let's do an inner join

from pyspark.sql.functions import *

joined = clickstreamDF.join(domainsDF, clickstreamDF['domain'] == domainsDF['domain'])
joined.show()

+-------+-----------+----+-----------------+----+----------+-------------+------+-----------+-----------------+
| action|   campaign|cost|           domain|  ip|   session|    timestamp|  user|   category|           domain|
+-------+-----------+----+-----------------+----+----------+-------------+------+-----------+-----------------+
|clicked|campaign_19| 118|      youtube.com|ip_4|session_36|1420070400000|user_9|      VIDEO|      youtube.com|
|blocked|campaign_12|   5|     facebook.com|ip_3|session_96|1420070400864|user_5|     SOCIAL|     facebook.com|
|clicked| campaign_3|  54|sf.craigslist.org|ip_9|session_61|1420070401728|user_8|CLASSIFIEDS|sf.craigslist.org|
|blocked|campaign_18| 110|    wikipedia.org|ip_5|session_55|1420070402592|user_6|  REFERENCE|    wikipedia.org|
|blocked|campaign_17|  20|       amazon.com|ip_4|session_13|1420070406048|user_1|   SHOPPING|       amazon.com|
|blocked|campaign_19| 147|      nytimes.com|ip_2|session_65|1420070407776|user_6|       NEWS|      nytim

In [33]:
# Let's do an outer join.

joinedOuter = clickstreamDF.join(domainsDF, clickstreamDF['domain'] == domainsDF['domain'], 'outer')
joinedOuter.show()

+-------+-----------+----+-------------+----+----------+-------------+------+---------+-------------+
| action|   campaign|cost|       domain|  ip|   session|    timestamp|  user| category|       domain|
+-------+-----------+----+-------------+----+----------+-------------+------+---------+-------------+
|blocked|campaign_19| 147|  nytimes.com|ip_2|session_65|1420070407776|user_6|     NEWS|  nytimes.com|
|clicked|campaign_19| 118|  youtube.com|ip_4|session_36|1420070400000|user_9|    VIDEO|  youtube.com|
|blocked| campaign_2|   7|  youtube.com|ip_2|session_93|1420070412960|user_1|    VIDEO|  youtube.com|
|blocked|campaign_20|  78|    zynga.com|ip_5|session_36|1420070406912|user_3|     null|         null|
|blocked| campaign_9| 153|   google.com|ip_3|session_22|1420070413824|user_9|   SEARCH|   google.com|
| viewed|campaign_20| 133|   google.com|ip_9|session_69|1420070416416|user_7|   SEARCH|   google.com|
|   null|       null|null|         null|null|      null|         null|  null|     