# Spark Session

Spark can work with data located on HDFS or a non-distributed filesystem. It can also use YARN from Hadoop, or [Mesos](https://mesos.apache.org/), or a resource manager of its own.

All distributed operations with Spark are done using so-called Spark Session. Usually one is already created by your cluster's administrator:

In [2]:
spark

# Reading Data

Spark can consume data in a variety of formats, e.g. in JSON. We use the [YELP Dataset](https://www.yelp.com/dataset) for this example. It's easily obtainable and free to use in education and research.

In [1]:
!hdfs dfs -ls /user/qlr

Found 1 items
drwxr-xr-x   - root hadoop          0 2020-11-26 13:18 /user/qlr/data


In [None]:
#!hdfs dfs -mkdir /user/qlr
!hdfs dfs -mkdir /user/qlr/data

In [4]:
!hdfs dfs -put file:///home/user/Downloads/yelp_academic_dataset_review.json /user/qlr/data

put: `/user/qlr/data/yelp_academic_dataset_review.json': File exists


In [2]:
!hdfs dfs -du -h /user/qlr/data

5.9 G  /user/qlr/data/yelp_academic_dataset_review.json


In [5]:
reviews_on_hdfs = "/user/qlr/data/yelp_academic_dataset_review.json"

In [7]:
%%time

spark.read.text(reviews_on_hdfs).count()

CPU times: user 0 ns, sys: 6.66 ms, total: 6.66 ms
Wall time: 9.92 s


8021122

This code simply reads a JSON file as a text, line by line, and counts the number of lines. Let's compare the speed with `wc` tool:

In [8]:
%%time

!wc -l /home/user/Downloads/yelp_academic_dataset_review.json

8021122 /home/user/Downloads/yelp_academic_dataset_review.json
CPU times: user 38.4 ms, sys: 19.6 ms, total: 58 ms
Wall time: 1.54 s


Although `wc` is implemented in C and is more efficient in general than JVM code behind Spark, it uses only one CPU, and sometimes may work slower than it's distributed counterpart from Spark.

Parsing JSON in Spark is really simple:

In [6]:
reviews = spark.read.json(reviews_on_hdfs)
reviews.show(n=5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|-MhfebM0QIsKt87iD...|   0|2015-04-15 05:21:16|    0|xQY8N_XvtGbearJ5X...|  2.0|As someone who ha...|     5|OwjRMXRC0KyPrIlcj...|
|lbrU8StCq3yDfr-QM...|   0|2013-12-07 03:16:52|    1|UmFMZ8PyXZTY2Qcwz...|  1.0|I am actually hor...|     1|nIJD_7ZXHq-FX8byP...|
|HQl28KMwrEKHqhFrr...|   0|2015-12-05 03:18:11|    0|LG2ZaYiOgpr2DK_90...|  5.0|I love Deagan's. ...|     1|V34qejxNsCbcgD8C0...|
|5JxlZaqCnk1MnbgRi...|   0|2011-05-27 05:30:52|    0|i6g_oA9Yf9Y31qt0w...|  1.0|Dismal, lukewarm,...|     0|ofKDkJKXSKZXu5xJN...|
|IS4cv902ykd8wj1TR...|   0|2017-01-14 21:56:57|    0|6TdNDKywdbjoTkize...|  4.0|Oh happy d

In [10]:
# Spark can be used similarly to Pandas
from pyspark.sql import functions as sf

(
    reviews
    .groupby(sf.date_trunc("day", "date").alias("day"))
    .agg(sf.sum("cool").alias("total_cool"))
    .sort(sf.desc("total_cool"))
    .show(n=5)
)

+-------------------+----------+
|                day|total_cool|
+-------------------+----------+
|2019-01-06 00:00:00|      3133|
|2019-01-02 00:00:00|      3017|
|2018-04-09 00:00:00|      2897|
|2018-03-13 00:00:00|      2810|
|2018-04-23 00:00:00|      2709|
+-------------------+----------+
only showing top 5 rows



In [11]:
# or you can use Spark as an SQL engine

reviews.createOrReplaceTempView("reviews")
spark.sql("""
SELECT
    date_trunc('day', date) AS day,
    SUM(cool) AS total_cool
FROM reviews
GROUP BY
    day
ORDER BY
    total_cool DESC
LIMIT 5
""").show()

+-------------------+----------+
|                day|total_cool|
+-------------------+----------+
|2019-01-06 00:00:00|      3133|
|2019-01-02 00:00:00|      3017|
|2018-04-09 00:00:00|      2897|
|2018-03-13 00:00:00|      2810|
|2018-04-23 00:00:00|      2709|
+-------------------+----------+



# Do It Yourself

[Spark Manual](http://spark.apache.org/docs/2.4.3/api/python/index.html) is your best friend!

* count number of users, buisenesses
* count average number of reviews and stars per business and per user
* find histograms for distributions of cool, funny, and useful columns
* find ten most frequent words from the reviews
* save results to disk

In [7]:
%%time

reviews.count()

CPU times: user 2.76 ms, sys: 0 ns, total: 2.76 ms
Wall time: 8.1 s


8021122

In [12]:
business = reviews.select("business_id").distinct().cache()

In [8]:
users = reviews.select("user_id").distinct().cache()

In [13]:
%%time

business.count()

CPU times: user 3.6 ms, sys: 840 µs, total: 4.44 ms
Wall time: 10.2 s


209393

In [10]:
%%time

users.count()

CPU times: user 4.4 ms, sys: 1.06 ms, total: 5.46 ms
Wall time: 14.5 s


1968703

In [11]:
users.explain()

== Physical Plan ==
InMemoryTableScan [user_id#14]
   +- InMemoryRelation [user_id#14], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *(2) HashAggregate(keys=[user_id#14], functions=[])
            +- Exchange hashpartitioning(user_id#14, 200)
               +- *(1) HashAggregate(keys=[user_id#14], functions=[])
                  +- *(1) FileScan json [user_id#14] Batched: false, Format: JSON, Location: InMemoryFileIndex[hdfs://cluster-0298-m/user/qlr/data/yelp_academic_dataset_review.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<user_id:string>
