# Spark Session
Точка входа в PySpark - это `SparkSession`. Это некоторый способ инициализации базовой функциональности PySpark (Dataset, Dataframe, ...).  
Внутри `SparkSession` создаются `SparkConfig` и `SparkContext` с некоторой конфигурацией, передаваемой в качестве параметра.
Для создания `SparkSession` используется метод `builder()`, а также ряд дополнительных методов:
 - `getOrCreate()` - создать или вернуть существующую Spark-сессию
 - `appName()` - имя приложения
 - `master("local[x]")` - передача имени кластера (yarn, mesos, local[x], где x - целое число, большее 0, указывающее на количество разделов, создаваемых при использовании Dataframe, Dataset, RDD; x можно установить равному количеству ядер ЦП).

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[4]')\
                    .appName('stepik').getOrCreate()
spark

# Чтение/просмотр данных
* Можно читать данные самых разных форматов с помощью функции `read`.
* Для отображения набора используется `show()`

In [7]:
iris_data = spark.read.json('data/iris.json')
iris_data

DataFrame[_corrupt_record: string, petalLength: double, petalWidth: double, sepalLength: double, sepalWidth: double, species: string]

In [11]:
iris_data.show()

+---------------+-----------+----------+-----------+----------+-------+
|_corrupt_record|petalLength|petalWidth|sepalLength|sepalWidth|species|
+---------------+-----------+----------+-----------+----------+-------+
|              [|       null|      null|       null|      null|   null|
|           null|        1.4|       0.2|        5.1|       3.5| setosa|
|           null|        1.4|       0.2|        4.9|       3.0| setosa|
|           null|        1.3|       0.2|        4.7|       3.2| setosa|
|           null|        1.5|       0.2|        4.6|       3.1| setosa|
|           null|        1.4|       0.2|        5.0|       3.6| setosa|
|           null|        1.7|       0.4|        5.4|       3.9| setosa|
|           null|        1.4|       0.3|        4.6|       3.4| setosa|
|           null|        1.5|       0.2|        5.0|       3.4| setosa|
|           null|        1.4|       0.2|        4.4|       2.9| setosa|
|           null|        1.5|       0.1|        4.9|       3.1| 

* С помощью функции `printSchema()` можно отобразить структуру набора данных

In [8]:
stock_data = spark.read.csv('data/stocks_price_final.csv',
                            sep=',',
                            header=True)
stock_data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- date: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- adjusted: string (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



In [15]:
stock_data.show(n=3)

+---+------+----------+---------+------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      date|     open|  high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+---------+------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|       54|    58|       51|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|    56|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
+---+------+----------+---------+------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
only showing top 3 rows



* Можно задать структуру набора данных данных самому (имя, тип, допускается ли наличие Null и метаданные)

In [18]:
from pyspark.sql.types import *
# StructField, StructType, ItegerType()

#StructField?
custom_fields = [
    StructField(name='_c0', dataType=IntegerType(), nullable=True),
    StructField(name='symbol', dataType=StringType(), nullable=True),
    StructField(name='data', dataType=DateType(), nullable=True),
    StructField(name='open', dataType=DoubleType(), nullable=True),
    StructField(name='high', dataType=DoubleType(), nullable=True),
    StructField(name='low', dataType=DoubleType(), nullable=True),
    StructField(name='close', dataType=DoubleType(), nullable=True),
    StructField(name='volume', dataType=IntegerType(), nullable=True),
    StructField(name='adjusted', dataType=DoubleType(), nullable=True),
    StructField(name='market.cap', dataType=StringType(), nullable=True),
    StructField(name='sector', dataType=StringType(), nullable=True),
    StructField(name='industry', dataType=StringType(), nullable=True),
    StructField(name='exchange', dataType=StringType(), nullable=True)
]

custom_schema = StructType(fields=custom_fields)
stock_data = spark.read.csv('data/stocks_price_final.csv',
                            sep=',',
                            header=True,
                            schema=custom_schema)
stock_data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- symbol: string (nullable = true)
 |-- data: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjusted: double (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



# Методы инспекции данных

##### схема данных в виде StructType

In [27]:
stock_data.schema

StructType([StructField('_c0', IntegerType(), True), StructField('symbol', StringType(), True), StructField('data', DateType(), True), StructField('open', DoubleType(), True), StructField('high', DoubleType(), True), StructField('low', DoubleType(), True), StructField('close', DoubleType(), True), StructField('volume', IntegerType(), True), StructField('adjusted', DoubleType(), True), StructField('market.cap', StringType(), True), StructField('sector', StringType(), True), StructField('industry', StringType(), True), StructField('exchange', StringType(), True)])

##### список кортежей с именами столбцов и типами данных

In [28]:
stock_data.dtypes

[('_c0', 'int'),
 ('symbol', 'string'),
 ('data', 'date'),
 ('open', 'double'),
 ('high', 'double'),
 ('low', 'double'),
 ('close', 'double'),
 ('volume', 'int'),
 ('adjusted', 'double'),
 ('market.cap', 'string'),
 ('sector', 'string'),
 ('industry', 'string'),
 ('exchange', 'string')]

##### возвращение строк в виде спискА

In [29]:
stock_data.head(2) # 2 rows

[Row(_c0=1, symbol='TXG', data=datetime.date(2019, 9, 12), open=54.0, high=58.0, low=51.0, close=52.75, volume=7326300, adjusted=52.75, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=2, symbol='TXG', data=datetime.date(2019, 9, 13), open=52.75, high=54.355, low=49.150002, close=52.27, volume=1025200, adjusted=52.27, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ')]

##### возвращение первой строки данных

In [30]:
stock_data.first()

Row(_c0=1, symbol='TXG', data=datetime.date(2019, 9, 12), open=54.0, high=58.0, low=51.0, close=52.75, volume=7326300, adjusted=52.75, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ')

##### выбор определенных столбцов + вычисление некоторых статистических характеристик (для столбцов с числовым типом)

In [41]:
stock_data.select(['low', 'close']).describe().show()

+-------+------------------+------------------+
|summary|               low|             close|
+-------+------------------+------------------+
|  count|           1726301|           1726301|
|   mean|14557.808227578991|15032.714854330708|
| stddev| 1072968.155843425|1109755.9294000661|
|    min|             0.052|             0.071|
|    max|      1.55151728E8|      1.58376592E8|
+-------+------------------+------------------+



##### список названий столбцов

In [32]:
stock_data.columns

['_c0',
 'symbol',
 'data',
 'open',
 'high',
 'low',
 'close',
 'volume',
 'adjusted',
 'market.cap',
 'sector',
 'industry',
 'exchange']

##### число строк в наборе данных

In [33]:
stock_data.count()

1729034

##### количество различных строк в наборе данных

In [37]:
stock_data.distinct().count()

1729034

# Добавление, удаление и обновление столбцов

Для добавления используется `withColumn(имя, данные)`

In [44]:
stock_data = stock_data.withColumn('date', stock_data.data)
stock_data.select(['_c0', 'data', 'exchange', 'date']).show(1)

+---+----------+--------+----------+
|_c0|      data|exchange|      date|
+---+----------+--------+----------+
|  1|2019-09-12|  NASDAQ|2019-09-12|
+---+----------+--------+----------+
only showing top 1 row



Для переименования столбца используется `withColumnRenamed(столбец, его новое имя)`

In [52]:
stock_data = stock_data.withColumnRenamed('_c0', 'index')
stock_data.show(1)

+-----+------+----------+----+----+----+-----+-------+--------+----------+-------------+--------------------+--------+----------+
|index|symbol|      data|open|high| low|close| volume|adjusted|market.cap|       sector|            industry|exchange|      date|
+-----+------+----------+----+----+----+-----+-------+--------+----------+-------------+--------------------+--------+----------+
|    1|   TXG|2019-09-12|54.0|58.0|51.0|52.75|7326300|   52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|2019-09-12|
+-----+------+----------+----+----+----+-----+-------+--------+----------+-------------+--------------------+--------+----------+
only showing top 1 row



Удаление столбца с помощью `drop(имя столбца)`

In [53]:
stock_data = stock_data.drop('date')
stock_data.show(1)

+-----+------+----------+----+----+----+-----+-------+--------+----------+-------------+--------------------+--------+
|index|symbol|      data|open|high| low|close| volume|adjusted|market.cap|       sector|            industry|exchange|
+-----+------+----------+----+----+----+-----+-------+--------+----------+-------------+--------------------+--------+
|    1|   TXG|2019-09-12|54.0|58.0|51.0|52.75|7326300|   52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
+-----+------+----------+----+----+----+-----+-------+--------+----------+-------------+--------------------+--------+
only showing top 1 row



##### задание

In [59]:
# drop
stock_data_task = stock_data.drop('symbol', 'adjusted', 
                                   'market.cap', 'exchange', 'close')
# rename
stock_data_task = stock_data_task.withColumnRenamed('index', '_c0')

# create new column
stock_data_task = stock_data_task.withColumn('new_volume', stock_data_task.volume)
stock_data_task = stock_data_task.drop('volume')

# change data in both columns `high` and `low`
stock_data_task = stock_data_task.withColumnRenamed('high', 'low_temp')
stock_data_task = stock_data_task.withColumnRenamed('low', 'high')
stock_data_task = stock_data_task.withColumnRenamed('low_temp', 'low')

# move `data` to end
stock_data_task = stock_data_task.withColumn('new_data', stock_data_task.data)
stock_data_task = stock_data_task.drop('data')
stock_data_task = stock_data_task.withColumnRenamed('new_data', 'data')

# showing top 3 rows
stock_data_task.show(3)

+---+---------+------+---------+-------------+--------------------+----------+----------+
|_c0|     open|   low|     high|       sector|            industry|new_volume|      data|
+---+---------+------+---------+-------------+--------------------+----------+----------+
|  1|     54.0|  58.0|     51.0|Capital Goods|Biotechnology: La...|   7326300|2019-09-12|
|  2|    52.75|54.355|49.150002|Capital Goods|Biotechnology: La...|   1025200|2019-09-13|
|  3|52.450001|  56.0|52.009998|Capital Goods|Biotechnology: La...|    269900|2019-09-16|
+---+---------+------+---------+-------------+--------------------+----------+----------+
only showing top 3 rows



# Работа с пропущенными значениями

# Запись и сохранение