In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions as F
from pyspark import SparkContext, SparkConf

In [3]:
sc = pyspark.SparkContext()

In [4]:
sqlContext = SQLContext(sc)

In [5]:
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")

DataFrame[key: string, value: string]

In [1]:
# Отключаем дефолтное поведение, а именно broadcast join
# spark.sql.autoBroadcastJoinThreshold = -1

In [4]:
sc

In [5]:
spark = SparkSession(sc)

In [None]:
spark.sql('''SELECT * table''')

## Устройство Spark Dataframe API

**Dataframe:**
+ структурированная колоночная структура данных
+ может быть создана на основе:
  - локальной коллекции
  - файла (файлов)
  - базы данных
+ в python работает значительно быстрее, чем RDD
+ под капотом использует RDD
+ позволяет выполнять произвольные SQL операции с данными
+ аналогично RDD являются ленивыми и неизменяеыми

## Из чего состоит Dataframe
+ схема [pyspsark.sql.StructType](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.types.StructType)
+ колонки [pyspark.sql.Column](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column)
+ данные [pyspark.sql.Row](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Row)

## Spark SQL and DataFrames

source: https://spark.apache.org/docs/latest/sql-programming-guide.html

**Additional Reading:** [A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets](https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html)

## Чтение данных из источника
Основной метод чтения любых источников

```df = spark.read.format(datasource_type).option(datasource_options).load(object_name)```

+ ```datasource_type``` - тип источника ("parquet", "json", "cassandra") и т. д.
+ ```datasource_options``` - опции для работы с источником (логины, пароли, адреса для подключения и т. д.)
+ ```object_name``` - имя таблицы/файла/топика/индекса

[DataframeReader](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader):
+ по умолчанию выводит схему данных
+ является трансформацией (ленивый)
+ возвращает [Dataframe](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)

### Список (неполный) поддерживаемых источников данных
+ Файлы:
  - json
  - text
  - csv
  - orc
  - parquet
+ Базы данных
  - elasticsearch
  - cassandra
  - jdbc
  - hive
+ Брокеры сообщений
  - kafka
  

# Read tabular data

In [6]:
mtcars = spark.read.csv('./learning-apache-spark-master/data/mtcars.csv', header=True, inferSchema=True)

In [12]:
mtcars = spark.read.format('csv').load('./learning-apache-spark-master/data/mtcars.csv', header = True, inferSchema=True)

In [13]:
mtcars.show(3)

+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|   Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 3 rows



In [7]:
mtcars.show(3)

+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|          _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|   Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 3 rows



## Rename individual column


In [14]:
mtcars = mtcars.withColumnRenamed('_c0', 'rown_ames')
mtcars.show(3)

+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    rown_ames| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|    Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|   Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
+-------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 3 rows



## Rename multple columns

In [15]:
new_col_names = [ 'x_' + x for x in mtcars.columns]
new_col_names

['x_rown_ames',
 'x_mpg',
 'x_cyl',
 'x_disp',
 'x_hp',
 'x_drat',
 'x_wt',
 'x_qsec',
 'x_vs',
 'x_am',
 'x_gear',
 'x_carb']

In [19]:
mtcars = mtcars.toDF(*new_col_names)

## Read non-tabular data

In [20]:
twitter = sc.textFile('./learning-apache-spark-master/data/twitter.txt')
twitter.take(5)

['Fresh install of XP on new computer. Sweet relief! fuck vista\t1018769417\t1.0',
 'Well. Now I know where to go when I want my knives. #ChiChevySXSW http://post.ly/RvDl\t10284216536\t1.0',
 '"Literally six weeks before I can take off ""SSC Chair"" off my email. Its like the torturous 4th mile before everything stops hurting."\t10298589026\t1.0',
 'Mitsubishi i MiEV - Wikipedia, the free encyclopedia - http://goo.gl/xipe Cutest car ever!\t109017669432377344\t1.0',
 "'Cheap Eats in SLP' - http://t.co/4w8gRp7\t109642968603963392\t1.0"]

## Запись данных
Основной метод записи в любые системы

```df.write.format(datasource_type).options(datasource_options).mode(savemode).save(object_name)```

+ ```datasource_type``` - тип источника ("parquet", "json", "cassandra") и т. д.
+ ```datasource_options``` - опции для работы с источником (логины, пароли, адреса для подключения и т. д.)
+ ```savemode``` - режим записи данных (добавление, перезапись и т. д.)
+ ```object_name``` - имя таблицы/файла/топика/индекса

[DataFrameWriter](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter):
+ метод ```save``` является действием
+ позволяет работать с партиционированными данными (parquet, orc)
+ не всегда валидирует схему и формат данных


### Список (неполный) поддерживаемых источников данных
+ Файлы:
  - json
  - text
  - csv
  - orc
  - parquet
+ Базы данных
  - elasticsearch
  - cassandra
  - jdbc
  - hive
+ Брокеры сообщений
  - kafka

In [21]:
ratings_df = spark.read.csv("./movielens/rating.csv", header=True)
tag_df = spark.read.csv("./movielens/tag.csv", header=True)
movie_df = spark.read.csv("./movielens/movie.csv", header=True)
link_df = spark.read.csv("./movielens/link.csv", header=True)
genome_tags_df = spark.read.csv("./movielens/genome_tags.csv", header=True)
genome_scores_df = spark.read.csv("./movielens/genome_scores.csv", header=True)

In [42]:
ratings_df.coalesce(1).write.parquet('./test_ratings_one_partition.parquet')

In [27]:
!ls ./test_ratings.parquet

part-00000-5d0bd2aa-acbe-47dd-9d60-c908c208f6a5-c000.snappy.parquet
part-00001-5d0bd2aa-acbe-47dd-9d60-c908c208f6a5-c000.snappy.parquet
part-00002-5d0bd2aa-acbe-47dd-9d60-c908c208f6a5-c000.snappy.parquet
part-00003-5d0bd2aa-acbe-47dd-9d60-c908c208f6a5-c000.snappy.parquet
part-00004-5d0bd2aa-acbe-47dd-9d60-c908c208f6a5-c000.snappy.parquet
part-00005-5d0bd2aa-acbe-47dd-9d60-c908c208f6a5-c000.snappy.parquet
_SUCCESS


In [38]:
pd_movie = movie_df.toPandas()

In [36]:
# !pip3 install -U pyarrow

Collecting pyarrow
  Downloading pyarrow-3.0.0-cp38-cp38-manylinux2014_x86_64.whl (20.7 MB)
[K     |████████████████████████████████| 20.7 MB 2.2 MB/s eta 0:00:01
Installing collected packages: pyarrow
Successfully installed pyarrow-3.0.0


In [44]:
!du -h  ./test_ratings_one_partition.parquet/*

174M	./test_ratings_one_partition.parquet/part-00000-df38dc6d-2cde-40eb-85ca-39e9cf14bba4-c000.snappy.parquet
0	./test_ratings_one_partition.parquet/_SUCCESS


In [29]:
!du -h  ./test_ratings.parquet/*

35M	./test_ratings.parquet/part-00000-5d0bd2aa-acbe-47dd-9d60-c908c208f6a5-c000.snappy.parquet
34M	./test_ratings.parquet/part-00001-5d0bd2aa-acbe-47dd-9d60-c908c208f6a5-c000.snappy.parquet
35M	./test_ratings.parquet/part-00002-5d0bd2aa-acbe-47dd-9d60-c908c208f6a5-c000.snappy.parquet
34M	./test_ratings.parquet/part-00003-5d0bd2aa-acbe-47dd-9d60-c908c208f6a5-c000.snappy.parquet
34M	./test_ratings.parquet/part-00004-5d0bd2aa-acbe-47dd-9d60-c908c208f6a5-c000.snappy.parquet
4.8M	./test_ratings.parquet/part-00005-5d0bd2aa-acbe-47dd-9d60-c908c208f6a5-c000.snappy.parquet
0	./test_ratings.parquet/_SUCCESS


In [48]:
import pyspark.sql.functions as F

ratings_df.groupby().agg(F.min('rating'), F.max('rating')).show()

+-----------+-----------+
|min(rating)|max(rating)|
+-----------+-----------+
|        0.5|          5|
+-----------+-----------+



In [50]:
ratings_df.repartition('rating').write.partitionBy('rating').parquet('./tets_rating.parquet')

In [55]:
ratings_df.repartition('rating').write.mode('overwrite').parquet('./tets_rating.parquet')

In [57]:
ratings_df.write.mode('overwrite').partitionBy('rating').parquet('./tets_rating.parquet')

In [59]:
!du -h  ./tets_rating.parquet/*

3.0M	./tets_rating.parquet/rating=0.5
8.1M	./tets_rating.parquet/rating=1
3.6M	./tets_rating.parquet/rating=1.5
16M	./tets_rating.parquet/rating=2
9.9M	./tets_rating.parquet/rating=2.5
42M	./tets_rating.parquet/rating=3
23M	./tets_rating.parquet/rating=3.5
53M	./tets_rating.parquet/rating=4
16M	./tets_rating.parquet/rating=4.5
29M	./tets_rating.parquet/rating=5
0	./tets_rating.parquet/_SUCCESS


In [62]:
!du -h  ./tets_rating.parquet/*

35M	./tets_rating.parquet/part-00000-246a6945-bae7-48e0-a5a4-7f7a9927c824-c000.snappy.parquet
68M	./tets_rating.parquet/part-00001-246a6945-bae7-48e0-a5a4-7f7a9927c824-c000.snappy.parquet
34M	./tets_rating.parquet/part-00002-246a6945-bae7-48e0-a5a4-7f7a9927c824-c000.snappy.parquet
38M	./tets_rating.parquet/part-00003-246a6945-bae7-48e0-a5a4-7f7a9927c824-c000.snappy.parquet
0	./tets_rating.parquet/_SUCCESS


In [61]:
ratings_df.coalesce(4).write.mode('overwrite').parquet('./tets_rating.parquet')

In [69]:
tag_df.limit(1).show()# Зарегистрировать DataFrame, как таблицу SQL
ratings_df.createOrReplaceTempView("ratings_df")
movie_df.createOrReplaceTempView("movie_df")

+------+-------+-----------+-------------------+
|userId|movieId|        tag|          timestamp|
+------+-------+-----------+-------------------+
|    18|   4141|Mark Waters|2009-04-24 18:19:40|
+------+-------+-----------+-------------------+



In [65]:
spark.sql('''SELECT * from ratings_df limit 10''').show()

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
|     1|    112|   3.5|2004-09-10 03:09:00|
|     1|    151|     4|2004-09-10 03:08:54|
|     1|    223|     4|2005-04-02 23:46:13|
|     1|    253|     4|2005-04-02 23:35:40|
|     1|    260|     4|2005-04-02 23:33:46|
+------+-------+------+-------------------+



In [70]:
spark.sql('''SELECT * from movie_df limit 10''').show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
+-------+--------------------+--------------------+



In [71]:
movie_df.count()

27278

In [74]:
import pyspark.sql.functions as F

In [84]:
tmp = ratings_df.join(movie_df, on=['movieId'], how='left')

In [85]:
tmp.count()

20000263

In [56]:
!du -h  ./tets_rating.parquet/*

4.0K	./tets_rating.parquet/part-00000-539f8400-497b-4de6-88f2-eac30c6767fa-c000.snappy.parquet
40M	./tets_rating.parquet/part-00018-539f8400-497b-4de6-88f2-eac30c6767fa-c000.snappy.parquet
16M	./tets_rating.parquet/part-00031-539f8400-497b-4de6-88f2-eac30c6767fa-c000.snappy.parquet
9.3M	./tets_rating.parquet/part-00043-539f8400-497b-4de6-88f2-eac30c6767fa-c000.snappy.parquet
27M	./tets_rating.parquet/part-00049-539f8400-497b-4de6-88f2-eac30c6767fa-c000.snappy.parquet
22M	./tets_rating.parquet/part-00065-539f8400-497b-4de6-88f2-eac30c6767fa-c000.snappy.parquet
2.7M	./tets_rating.parquet/part-00086-539f8400-497b-4de6-88f2-eac30c6767fa-c000.snappy.parquet
3.3M	./tets_rating.parquet/part-00133-539f8400-497b-4de6-88f2-eac30c6767fa-c000.snappy.parquet
7.5M	./tets_rating.parquet/part-00144-539f8400-497b-4de6-88f2-eac30c6767fa-c000.snappy.parquet
51M	./tets_rating.parquet/part-00166-539f8400-497b-4de6-88f2-eac30c6767fa-c000.snappy.parquet
15M	./tets_rating.parquet/part-00189-539f8400

In [51]:
!du -h  ./tets_rating.parquet/*

2.8M	./tets_rating.parquet/rating=0.5
7.6M	./tets_rating.parquet/rating=1
3.3M	./tets_rating.parquet/rating=1.5
16M	./tets_rating.parquet/rating=2
9.4M	./tets_rating.parquet/rating=2.5
42M	./tets_rating.parquet/rating=3
23M	./tets_rating.parquet/rating=3.5
54M	./tets_rating.parquet/rating=4
16M	./tets_rating.parquet/rating=4.5
29M	./tets_rating.parquet/rating=5
0	./tets_rating.parquet/_SUCCESS


In [53]:
spark.read.parquet('./tets_rating.parquet/rating=4').show(10)

+------+-------+-------------------+
|userId|movieId|          timestamp|
+------+-------+-------------------+
|     1|    151|2004-09-10 03:08:54|
| 69632|   4226|2009-04-02 23:27:08|
|     1|    223|2005-04-02 23:46:13|
| 69632|   5991|2008-09-21 18:46:36|
|     1|    253|2005-04-02 23:35:40|
| 69632|   6870|2009-04-02 23:36:46|
|     1|    260|2005-04-02 23:33:46|
| 69632|   7153|2008-09-21 18:33:57|
|     1|    293|2005-04-02 23:31:43|
| 69632|   8340|2009-04-02 23:36:01|
+------+-------+-------------------+
only showing top 10 rows



In [None]:
ratings_df.coalesce(1).write.parquet('./test_ratings_one_partition.parquet')

In [22]:
ratings_df.count()

20000263

In [23]:
ratings_df.show(1)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
+------+-------+------+-------------------+
only showing top 1 row



In [25]:
movie_df.count()

27278

In [24]:
movie_df.show(1)

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
+-------+----------------+--------------------+
only showing top 1 row



# Export data

In [13]:
from pyspark.sql import DataFrameWriter

In [None]:
mtcars = mtcars.coalesce(numPartitions=1)

In [None]:
mtcars.write.csv('./learning-apache-spark-master/saved-mtcars', header=True)

In [None]:
twitter = twitter.coalesce(numPartitions=1)

In [None]:
twitter.saveAsTextFile('./learning-apache-spark-master/saved-twitter')

In [30]:
ratings_df.repartition().write.partitionBy('movieId').parquet('./ratings.parquet')

In [27]:
ratings_df

DataFrame[userId: string, movieId: string, rating: string, timestamp: string]

## pySpark SQL

In [21]:
ratings_df = spark.read.csv("./movielens/rating.csv", header=True)
tag_df = spark.read.csv("./movielens/tag.csv", header=True)
movie_df = spark.read.csv("./movielens/movie.csv", header=True)
link_df = spark.read.csv("./movielens/link.csv", header=True)
genome_tags_df = spark.read.csv("./movielens/genome_tags.csv", header=True)
genome_scores_df = spark.read.csv("./movielens/genome_scores.csv", header=True)

In [None]:
# Сделаем SQL запрос к таблице
sql_ratings_df = spark.sql("SELECT movieId, count(*) FROM ratings_df Group By movieId")

In [None]:
# Сделаем SQL запрос к таблице
sql_ratings_df = spark.sql("SELECT movieId, count(*) FROM ratings_df Group By movieId")

In [None]:
# Удаляем дубликаты
df.distinct().show()

In [11]:
ratings_df.limit(1).show()

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
+------+-------+------+-------------------+



In [14]:
ratings_df.count()

20000263

In [22]:
tag_df.count()

465564

In [None]:
ratings_df.join(tag_df, on=['movieId'])

In [12]:
tag_df.limit(1).show()

+------+-------+-----------+-------------------+
|userId|movieId|        tag|          timestamp|
+------+-------+-----------+-------------------+
|    18|   4141|Mark Waters|2009-04-24 18:19:40|
+------+-------+-----------+-------------------+



In [13]:
movie_df.limit(1).show()

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
+-------+----------------+--------------------+



**Accumulators**

Когда мы обычно передаем в Spark функции, такие как функция map () или условие для filter (), они могут использовать переменные, определенные вне них в программе драйвера, но каждая задача, выполняемая в кластере, получает новую копию каждой переменной, и обновления из этих копий не распространяются обратно на драйвер. Общие переменные, **accumulators** и **broadcast** переменные Spark снимают это ограничение для двух распространенных типов коммуникационных шаблонов: агрегирования результатов и широковещательных рассылок.

In [28]:
file = sc.textFile('./learning-apache-spark-master/data/twitter.txt')
# Create Accumulator[Int] initialized to 0
blankLines = sc.accumulator(0)

def extractCallSigns(line):
    global blankLines 
    if not (line == ""):
        blankLines += 1
    return line.split(" ")

callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile('./test_accumulator2')
print ("Blank lines: %d" % blankLines.value)

Blank lines: 10
