In [1]:
!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName("PySpark_Stepic_Tutorials") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

import numpy as np
import pandas as pd
#Функция для передачи результата sql-запроса в pandas DataFrame.
#Напиши select = """ SELECT * FROM table""" и вызови функцию sql(select)
def sql(select):
    SQLdf = spark.sql(select)
    return SQLdf.toPandas()

import pyspark.sql.functions as F

from google.colab import data_table
data_table.disable_dataframe_formatter()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")



# <font color='yellow'>Чтение данных

Используя spark.read мы может считывать данные из файлов различных форматов, таких как CSV, JSON, Parquet и других. Вот несколько примеров получения данных из файлов:

In [2]:
# spark.read.csv
# spark.read.json
# spark.read.parquet

In [3]:
path = '/content/sample_data/california_housing_train.csv'
df = spark.read.csv(path)
df.show()

+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|        _c0|      _c1|               _c2|        _c3|           _c4|        _c5|        _c6|          _c7|               _c8|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000| 472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000| 463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000| 117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000| 226.000000|     3.191700|    

# <font color='yellow'>Запись/сохранение данных в файл

Метод write.save() используется для сохранения данных в различных форматах, таких как CSV, JSON, Parquet и других. Давайте рассмотрим, как записать данные в файлы разных форматов. Мы можем сохранить как все строки, так и только выбранные с помощью метода select()

In [4]:
# # CSV
# data.write.csv('dataset.csv')

# # JSON
# data.write.save('dataset.json', format='json')

# # Parquet
# data.write.save('dataset.parquet', format='parquet')

In [5]:
df.columns

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8']

In [6]:
#df.select(['_c0', '_c1', '_c2']).write.csv('dataset_part.csv')

# <font color = 'yellow'>Структурирование данных с помощью схемы Spark

In [7]:
path = '/content/stocks_price_final.csv'
data = spark.read.csv(path, sep = ',', header=True)

In [8]:
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- date: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- adjusted: string (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



In [9]:
data.show(2)

+---+------+----------+-----+------+---------+-----+-------+--------+----------+-------------+--------------------+--------+
|_c0|symbol|      date| open|  high|      low|close| volume|adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+-----+------+---------+-----+-------+--------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|   54|    58|       51|52.75|7326300|   52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|52.75|54.355|49.150002|52.27|1025200|   52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
+---+------+----------+-----+------+---------+-----+-------+--------+----------+-------------+--------------------+--------+
only showing top 2 rows



Примечание: видим, что некоторые поля содержат числовые данные, но в схеме они определились как строки. Чтобы изменить это на начальном этапе - используем свою схему

In [10]:
from pyspark.sql.types import *

In [11]:

data_schema =[

               StructField('_c0', IntegerType(), True),
               StructField('symbol', StringType(), True),
               StructField('date', DateType(), True),
               StructField('open', DoubleType(), True),
               StructField('high', DoubleType(), True),
               StructField('low', DoubleType(), True),
               StructField('close', DoubleType(), True),
               StructField('volume', IntegerType(), True),
               StructField('adjusted', DoubleType(), True),
               StructField('market.cap', StringType(), True),
               StructField('sector', StringType(), True),
               StructField('industry', StringType(), True),
               StructField('exchange', StringType(), True)]

In [12]:
final_structure = StructType(fields=data_schema)

In [13]:
data = spark.read.csv(path=path, schema=final_structure, sep=',', header=True)
data.show(10)

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      date|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  5|   TXG|2019-09-18|56.849998|    62.2

In [14]:
data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- symbol: string (nullable = true)
 |-- date: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjusted: double (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



# <font color = 'yellow'> Различные методы инспекции данных

In [15]:
# schema(): этот метод возвращает схему данных (фрейма данных). Ниже показан пример с ценами на акции

In [16]:
data.schema

StructType([StructField('_c0', IntegerType(), True), StructField('symbol', StringType(), True), StructField('date', DateType(), True), StructField('open', DoubleType(), True), StructField('high', DoubleType(), True), StructField('low', DoubleType(), True), StructField('close', DoubleType(), True), StructField('volume', IntegerType(), True), StructField('adjusted', DoubleType(), True), StructField('market.cap', StringType(), True), StructField('sector', StringType(), True), StructField('industry', StringType(), True), StructField('exchange', StringType(), True)])

In [17]:
# dtypes возвращает список кортежей с именами столбцов и типами данных

In [18]:
data.dtypes

[('_c0', 'int'),
 ('symbol', 'string'),
 ('date', 'date'),
 ('open', 'double'),
 ('high', 'double'),
 ('low', 'double'),
 ('close', 'double'),
 ('volume', 'int'),
 ('adjusted', 'double'),
 ('market.cap', 'string'),
 ('sector', 'string'),
 ('industry', 'string'),
 ('exchange', 'string')]

In [19]:
# show() по умолчанию отображает первые 20 строк,
# а также принимает число в качестве параметра для выбора их количества

In [20]:
data.show()

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      date|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  5|   TXG|2019-09-18|56.849998|    62.2

In [21]:
# head(n) возвращает n строк в виде списка

In [22]:
data.head(10)

[Row(_c0=1, symbol='TXG', date=datetime.date(2019, 9, 12), open=54.0, high=58.0, low=51.0, close=52.75, volume=7326300, adjusted=52.75, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=2, symbol='TXG', date=datetime.date(2019, 9, 13), open=52.75, high=54.355, low=49.150002, close=52.27, volume=1025200, adjusted=52.27, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=3, symbol='TXG', date=datetime.date(2019, 9, 16), open=52.450001, high=56.0, low=52.009998, close=55.200001, volume=269900, adjusted=55.200001, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=4, symbol='TXG', date=datetime.date(2019, 9, 17), open=56.209999, high=60.900002, low=55.423, close=56.779999, volume=602800, adjusted=56.779999, market.cap='$9.31B', sector='Capit

In [23]:
# first() возвращает первую строку данных

In [24]:
data.first()

Row(_c0=1, symbol='TXG', date=datetime.date(2019, 9, 12), open=54.0, high=58.0, low=51.0, close=52.75, volume=7326300, adjusted=52.75, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ')

In [25]:
# take(n) возвращает первые n строк

In [26]:
data.take(3)

[Row(_c0=1, symbol='TXG', date=datetime.date(2019, 9, 12), open=54.0, high=58.0, low=51.0, close=52.75, volume=7326300, adjusted=52.75, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=2, symbol='TXG', date=datetime.date(2019, 9, 13), open=52.75, high=54.355, low=49.150002, close=52.27, volume=1025200, adjusted=52.27, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=3, symbol='TXG', date=datetime.date(2019, 9, 16), open=52.450001, high=56.0, low=52.009998, close=55.200001, volume=269900, adjusted=55.200001, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ')]

In [27]:
# describe() вычисляет некоторые статистические значения для столбцов с числовым типом данных

In [28]:
data.columns

['_c0',
 'symbol',
 'date',
 'open',
 'high',
 'low',
 'close',
 'volume',
 'adjusted',
 'market.cap',
 'sector',
 'industry',
 'exchange']

In [29]:
data = data.withColumnRenamed('market.cap', 'market_cap')

In [30]:
data.columns

['_c0',
 'symbol',
 'date',
 'open',
 'high',
 'low',
 'close',
 'volume',
 'adjusted',
 'market_cap',
 'sector',
 'industry',
 'exchange']

In [31]:
data.select('volume').describe()

DataFrame[summary: string, volume: string]

In [32]:
data.count()

1729034

In [33]:
data.distinct().count()

1729034

# <font color = "yellow"> Манипуляции со столбцами

In [34]:
data.columns

['_c0',
 'symbol',
 'date',
 'open',
 'high',
 'low',
 'close',
 'volume',
 'adjusted',
 'market_cap',
 'sector',
 'industry',
 'exchange']

In [35]:
# Добавление столбца: используйте withColumn, чтобы добавить новый столбец к существующим.
# Метод принимает два параметра: имя столбца и данные

In [36]:
data = data.withColumn('new_column_date', data.date)

In [37]:
data.show()

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+---------------+
|_c0|symbol|      date|     open|     high|      low|    close| volume| adjusted|market_cap|       sector|            industry|exchange|new_column_date|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+---------------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|     2019-09-12|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|     2019-09-13|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|     2019-09-16|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999| 

In [38]:
# Обновление столбца: используйте withColumnRenamed, чтобы переименовать существующий столбец.
# Метод принимает два параметра: название существующего столбца и его новое имя.

In [39]:
data = data.withColumnRenamed('new_column_date', 'added_date')

In [40]:
data.show()

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+----------+
|_c0|symbol|      date|     open|     high|      low|    close| volume| adjusted|market_cap|       sector|            industry|exchange|added_date|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+----------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|2019-09-12|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|2019-09-13|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|2019-09-16|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotec

In [41]:
# Удаление столбца: используйте метод drop, который принимает имя столбца и возвращает данные.

In [42]:
data = data.drop('added_date')

In [43]:
data.show()

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      date|     open|     high|      low|    close| volume| adjusted|market_cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  5|   TXG|2019-09-18|56.849998|    62.2

In [57]:
# Задача

# Прочитать уже знакомый нам датасет stocks_price_final;
# Создать в конце новый столбец new_volume с данными из столбца volume;
# Избавиться от следующих столбцов: symbol, close, volume , adjusted, market.cap, exchange;
# Поменять местами названия столбцов high и low;
# Столбец date переместить в конец;
# Вывести первую строку, скопировав её полностью и вставить в ответ;
# Примечание

# Код для инициализации DataFrame:

# data_schema = [
#                StructField('_c0', IntegerType(), nullable=True),
#                StructField('symbol', StringType(), True),
#                StructField('date', DateType(), True),
#                StructField('open', DoubleType(), True),
#                StructField('high', DoubleType(), True),
#                StructField('low', DoubleType(), True),
#                StructField('close', DoubleType(), True),
#                StructField('volume', IntegerType(), True),
#                StructField('adjusted', DoubleType(), True),
#                StructField('market.cap', StringType(), True),
#                StructField('sector', StringType(), True),
#                StructField('industry', StringType(), True),
#                StructField('exchange', StringType(), True),
#             ]

# final_struc = StructType(fields = data_schema)

# data = spark.read.csv(df_path, sep=',', header=True, schema=final_struc)

In [58]:
data_copy = data.withColumn('new_volume', data.volume)


In [59]:
data_copy.show()

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+----------+
|_c0|symbol|      date|     open|     high|      low|    close| volume| adjusted|market_cap|       sector|            industry|exchange|new_volume|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+----------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|   7326300|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|   1025200|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|    269900|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotec

In [60]:
data_copy = data_copy.drop('symbol', 'close', 'volume', 'adjusted', 'market_cap','exchange')

In [61]:
data_copy.show()

+---+----------+---------+---------+---------+-------------+--------------------+----------+
|_c0|      date|     open|     high|      low|       sector|            industry|new_volume|
+---+----------+---------+---------+---------+-------------+--------------------+----------+
|  1|2019-09-12|     54.0|     58.0|     51.0|Capital Goods|Biotechnology: La...|   7326300|
|  2|2019-09-13|    52.75|   54.355|49.150002|Capital Goods|Biotechnology: La...|   1025200|
|  3|2019-09-16|52.450001|     56.0|52.009998|Capital Goods|Biotechnology: La...|    269900|
|  4|2019-09-17|56.209999|60.900002|   55.423|Capital Goods|Biotechnology: La...|    602800|
|  5|2019-09-18|56.849998|    62.27|55.650002|Capital Goods|Biotechnology: La...|   1589600|
|  6|2019-09-19|62.810001|   63.375|61.029999|Capital Goods|Biotechnology: La...|    425200|
|  7|2019-09-20|61.709999|62.419998|    59.82|Capital Goods|Biotechnology: La...|    392000|
|  8|2019-09-23|60.220001|61.485001|59.939999|Capital Goods|Biotechnol

In [64]:
data_copy = data_copy.select('_c0', 'open', 'low', 'high', 'sector', 'industry', 'new_volume', 'date')

In [68]:
data_copy.take(1)

[Row(_c0=1, open=54.0, low=51.0, high=58.0, sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', new_volume=7326300, date=datetime.date(2019, 9, 12))]