# Start a SparkSession and configure it

In [1]:
# pyspark.sql is a module, SparkSession is a class in it
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

## Behavior of SparkConf() and spark.conf

Before starting a SparkSession, `SparkConf().getAll()` returns an empty dict_item:

In [2]:
SparkConf().getAll()

dict_items([])

We can add configuration key-value pairs by calling the `set()` or `setAll()` method of `SparkConf()`, which returns a new `SparkConf` object, which is then passed to `.config` during the building of the `SparkSession`:

In [3]:
my_conf = SparkConf().setAll([('spark.driver.maxResultSize', '10g'),('spark.driver.memory','2g')])

spark = SparkSession.builder \
        .master('local[4]') \
        .appName('Learn Spark') \
        .config(conf=my_conf) \
        .enableHiveSupport() \
        .getOrCreate()

SparkConf().getAll()

[('spark.driver.maxResultSize', '10g'),
 ('spark.master', 'local[4]'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.driver.memory', '2g'),
 ('spark.app.name', 'Learn Spark'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

To change the configuration after the `SparkSession` was created, use the `SparkSession.conf` interface:

In [4]:
spark.conf.get('spark.driver.maxResultSize')

'10g'

In [5]:
spark.conf.set('spark.driver.maxResultSize', '5g')
spark.conf.get('spark.driver.maxResultSize')

'5g'

But calling `spark.conf.set` only affect the configuration of the particular `SparkSession`. It will NOT change `my_conf` or the result of calling `SparkConf()` in the future:

In [6]:
SparkConf().getAll()

[('spark.driver.maxResultSize', '10g'),
 ('spark.master', 'local[4]'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.driver.memory', '2g'),
 ('spark.app.name', 'Learn Spark'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

If a new session with different configuration is created, it won't change the system setting. In other words, the result of `SparkConf()` is still the same with the first `my_conf` created by `SparkConf`:

In [7]:
new_conf = SparkConf().set('spark.driver.maxResultSize','2g')
print(new_conf.getAll(), '\n')

spark2 = SparkSession.builder \
        .master('local[4]') \
        .appName('Learn Spark') \
        .config(conf=new_conf) \
        .enableHiveSupport() \
        .getOrCreate()

SparkConf().getAll()

[('spark.master', 'local[4]'), ('spark.sql.catalogImplementation', 'hive'), ('spark.driver.memory', '2g'), ('spark.driver.maxResultSize', '2g'), ('spark.app.name', 'Learn Spark'), ('spark.submit.deployMode', 'client'), ('spark.ui.showConsoleProgress', 'true')] 



[('spark.driver.maxResultSize', '10g'),
 ('spark.master', 'local[4]'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.driver.memory', '2g'),
 ('spark.app.name', 'Learn Spark'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

## Useful Configurations and Checking

### Get Spark Version

In [8]:
spark.version

'2.3.1'

### Use sparkContext interface

In [9]:
sc = spark.sparkContext

### Only output ERROR into log

In [10]:
sc.setLogLevel('ERROR') # equivalent to set up the log4j.properties

### Resize the max file size the excutors send to driver

**To solve the Error**: Total size of serialized results of tasks is bigger than spark.driver.maxResultSize

**Solution**: Set `spark.driver.maxResultSize` to a larger value when starting `SparkSession`

In [11]:
spark.conf.set('spark.driver.maxResultSize', '3g')
spark.conf.get('spark.driver.maxResultSize')

'3g'

# RDD

## Create an RDD

In [9]:
# Create an RDD from a text file

lines = sc.textFile('data/word_count.text')

print(lines.collect())

["The history of New York begins around 10,000 BC, when the first Native Americans arrived. By 1100 AD, New York's main native cultures, the Iroquoian and Algonquian, had developed. European discovery of New York was led by the French in 1524 and the first land claim came in 1609 by the Dutch. As part of New Netherland, the colony was important in the fur trade and eventually became an agricultural resource thanks to the patroon system. In 1626 the Dutch bought the island of Manhattan from Native Americans.[1] In 1664, England renamed the colony New York, after the Duke of York (later James II & VII.) New York City gained prominence in the 18th century as a major trading port in the Thirteen Colonies.", '', "New York played a pivotal role during the American Revolution and subsequent war. The Stamp Act Congress in 1765 brought together representatives from across the Thirteen Colonies to form a unified response to British policies. The Sons of Liberty were active in New York City to ch

In [6]:
# Create an RDD from a Python list

series = sc.parallelize([1,2,3,4])
print(series.collect())

[1, 2, 3, 4]


## Operations on RDD

There are two types of operations on RDD:

1. **Transformation**

    Apply some **functions** to the data in RDD to **create a new RDD**;
    
2. **Action**:

    Compute a **result** based on an RDD;

In [None]:
# Exmaple of transformation: filter
