In [None]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=6a1850de38687ff54bfe7988e240c691e6b6ddb4cafeedcf8fb2c04d15643f77
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
from pyspark import SparkConf, SparkContext
import pandas as pd

# Создаем экземпляр SparkConf и настраиваем его
conf = SparkConf()
conf.setAppName("SparkSession")
conf.setMaster("local[*]")
conf.set("spark.executor.memory", "10g") # память для выполнения запросов
conf.set("spark.executor.cores", "10") # количество ядер процессора

# Запускаем Spark на порту 3000
sc = SparkContext(conf=conf)


# Chapter 3 - Learning PySpark
## Resilient Distributed Datasets

### Создание RDDs

Существует 2 способа создания RDD в PySpark. <p>1) Вы можете применить метод parallelize к списку:</p>

In [None]:
data = sc.parallelize(
    [('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12),
     ('Amber', 9)])

In [None]:
own_data = [('Key_1', 'Value_1'), ('Key_2', 'Value_2')] #создали свой список из кортежей
rdd_own_data = sc.parallelize(own_data) #преобразовали список в rdd-коллекцию

Проверяем типы данных

In [None]:
print(type(data))
print(type(rdd_own_data))

<class 'pyspark.rdd.RDD'>
<class 'pyspark.rdd.RDD'>


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


2) можно получить rdd-коллекцию из файла

In [None]:
data_from_file = sc.\
    textFile(
        '/content/drive/MyDrive/Spark_training/VS14MORT.txt.gz',
        4)

In [None]:
!pip install googletrans==4.0.0-rc1

Collecting googletrans==4.0.0-rc1
  Downloading googletrans-4.0.0rc1.tar.gz (20 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting httpx==0.13.3 (from googletrans==4.0.0-rc1)
  Downloading httpx-0.13.3-py3-none-any.whl (55 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m55.1/55.1 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
Collecting hstspreload (from httpx==0.13.3->googletrans==4.0.0-rc1)
  Downloading hstspreload-2023.1.1-py3-none-any.whl (1.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m24.7 MB/s[0m eta [36m0:00:00[0m
Collecting chardet==3.* (from httpx==0.13.3->googletrans==4.0.0-rc1)
  Downloading chardet-3.0.4-py2.py3-none-any.whl (133 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m133.4/133.4 kB[0m [31m13.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting idna==2.* (from httpx==0.13.3->googletrans==4.0.0-rc1)
  Downloading idna-2.10-py2.py3-none-any.whl (58 kB)
[2K     [90m━

In [None]:
def translate(text, language='ru'):
    from googletrans import Translator

    # Создание экземпляра класса Translator
    translator = Translator()

    # Пример использования переводчика

    translated_text = translator.translate(text, dest=language)

    # Вывод переведенного текста
    print(translated_text.text)

In [None]:
translate('Fetch Immediately', language='ru')

Сразу же


Note, that to execute the code above you will have to change the path where the data is stored. The dataset can be downloaded from http://tomdrabas.com/data/VS14MORT.txt.gz

#### Schema

RDDs - это не структурированный способ хранения данных

In [None]:
data_heterogenous = sc.parallelize([('Ferrari', 'fast'), {'Porsche': 100000},
                                    ['Spain','visited', 4504]]).collect()
data_heterogenous

[('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain', 'visited', 4504]]

Вы можете получить доступ к данным в объекте, как это обычно делаете в Python

In [None]:
data_heterogenous[1]['Porsche'] #из списка извлекли элемент №1 (это {'Porsche': 100000}).
# И обратились к нему по ключу 'Porsche'

100000

In [None]:
data_heterogenous[0][1]

'fast'

In [None]:
data_heterogenous[2][1]

'visited'

#### Чтение из файлов

Когда вы читаете из текстового файла, каждая строка из файла образует элемент RDD.

In [None]:
data_from_file.take(1)

['                   1                                          2101  M1087 432311  4M4                2014U7CN                                    I64 238 070   24 0111I64                                                                                                                                                                           01 I64                                                                                                  01  11                                 100 601']

#### Пользовательские функции

Вы можете создать более длинный метод для преобразования ваших данных вместо использования выражения Lambda.

In [None]:
def extractInformation(row):
    import re
    import numpy as np

    selected_indices = [
         2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
         19,21,22,23,24,25,27,28,29,30,32,33,34,
         36,37,38,39,40,41,42,43,44,45,46,47,48,
         49,50,51,52,53,54,55,56,58,60,61,62,63,
         64,65,66,67,68,69,70,71,72,73,74,75,76,
         77,78,79,81,82,83,84,85,87,89
    ]

    '''
        Input record schema
        schema: n-m (o) -- xxx
            n - position from
            m - position to
            o - number of characters
            xxx - description
        1. 1-19 (19) -- reserved positions
        2. 20 (1) -- resident status
        3. 21-60 (40) -- reserved positions
        4. 61-62 (2) -- education code (1989 revision)
        5. 63 (1) -- education code (2003 revision)
        6. 64 (1) -- education reporting flag
        7. 65-66 (2) -- month of death
        8. 67-68 (2) -- reserved positions
        9. 69 (1) -- sex
        10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated
        11. 71-73 (3) -- number of units (years, months etc)
        12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)
        13. 75-76 (2) -- age recoded into 52 categories
        14. 77-78 (2) -- age recoded into 27 categories
        15. 79-80 (2) -- age recoded into 12 categories
        16. 81-82 (2) -- infant age recoded into 22 categories
        17. 83 (1) -- place of death
        18. 84 (1) -- marital status
        19. 85 (1) -- day of the week of death
        20. 86-101 (16) -- reserved positions
        21. 102-105 (4) -- current year
        22. 106 (1) -- injury at work
        23. 107 (1) -- manner of death
        24. 108 (1) -- manner of disposition
        25. 109 (1) -- autopsy
        26. 110-143 (34) -- reserved positions
        27. 144 (1) -- activity code
        28. 145 (1) -- place of injury
        29. 146-149 (4) -- ICD code
        30. 150-152 (3) -- 358 cause recode
        31. 153 (1) -- reserved position
        32. 154-156 (3) -- 113 cause recode
        33. 157-159 (3) -- 130 infant cause recode
        34. 160-161 (2) -- 39 cause recode
        35. 162 (1) -- reserved position
        36. 163-164 (2) -- number of entity-axis conditions
        37-56. 165-304 (140) -- list of up to 20 conditions
        57. 305-340 (36) -- reserved positions
        58. 341-342 (2) -- number of record axis conditions
        59. 343 (1) -- reserved position
        60-79. 344-443 (100) -- record axis conditions
        80. 444 (1) -- reserve position
        81. 445-446 (2) -- race
        82. 447 (1) -- bridged race flag
        83. 448 (1) -- race imputation flag
        84. 449 (1) -- race recode (3 categories)
        85. 450 (1) -- race recode (5 categories)
        86. 461-483 (33) -- reserved positions
        87. 484-486 (3) -- Hispanic origin
        88. 487 (1) -- reserved
        89. 488 (1) -- Hispanic origin/race recode
     '''

    record_split = re\
        .compile(
            r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' +
            r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' +
            r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +
            r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +
            r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' +
            r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' +
            r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' +
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' +
            r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
    try:
        rs = np.array(record_split.split(row))[selected_indices]
    except:
        rs = np.array(['-99'] * len(selected_indices))
    return rs
#     return record_split.split(row)

<br>Теперь, вместо использования Lambda, мы будем <br>использовать метод extractInformation(...) для <br>разделения и преобразования нашего набора данных.


In [None]:
data_from_file_conv = data_from_file.map(extractInformation)
data_from_file_conv.map(lambda row: row).take(1)

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

### Преобразования

#### .map(...)

Метод применяется к каждому элементу RDD: в случае для датасета data_from_file_conv можно считать это как преобразование каждой строки

In [None]:
data_from_file_conv.take(20)[15]

array(['1', '  ', '4', '1', '01', 'F', '1', '070', ' ', '40', '20', '09',
       '  ', '1', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I251',
       '215', '063', '   ', '21', '08', '11I251 ', '12I469 ', '21J449 ',
       '22J459 ', '31I500 ', '61I10  ', '62D649 ', '63E669 ', '       ',
       '       ', '       ', '       ', '       ', '       ', '       ',
       '       ', '       ', '       ', '       ', '       ', '07',
       'I251 ', 'D649 ', 'E669 ', 'I10  ', 'I469 ', 'I500 ', 'J448 ',
       '     ', '     ', '     ', '     ', '     ', '     ', '     ',
       '     ', '     ', '     ', '     ', '     ', '     ', '03', ' ',
       ' ', '2', '3', '100', '8'], dtype='<U40')

In [None]:
data_2014 = data_from_file_conv.map(lambda row: int(row[15]))
data_2014.take(20)
#берём из каждой строки файла элемент №15,
#преобразуем его в целое число и возвращаем 20 таких элементов'

[4, 3, 2, 1, 2, 4, 6, 6, 6, -99, 2, 3, -99, 4, 6, 4, 2, 5, 4, 1]

In [None]:
translate('Unexpected bad things will happen if you don’t read this!')

Неожиданные плохие вещи произойдут, если вы не прочитаете это!


In [None]:
import math

In [None]:
data_from_file_conv.map(lambda x: math.sqrt(int(x[9]))-2.0).take(5)

[4.557438524302,
 4.082762530298219,
 4.4031242374328485,
 4.324555320336759,
 4.164414002968976]

Вы можете комбинировать колонки

In [None]:
data_2014_2 = data_from_file_conv.map(lambda row: (int(row[10])**2, 2023-int(row[16])))
data_2014_2.take(10)

[(529, 9),
 (289, 9),
 (441, 9),
 (400, 9),
 (324, 9),
 (576, 9),
 (484, 9),
 (289, 9),
 (529, 9),
 (9801, 2122)]

#### .filter(...)

Метод .filter (...) позволяет выбрать
Элементы вашего набора данных, которые соответствуют указанным критериям.

In [None]:
data_from_file_conv.take(5)

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40'),
 array(['1', '  ', '2', '1', '01', 'M', '1', '058', ' ', '37', '17', '08',
        '  ', '4', 'D', '3', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I250',
        '214', '062', '   ', '21', '03', '11I250 ', '61I272 ', '62E669 ',
        '       ', '       ', '       ', '       ', '       ', '     

In [None]:
data_from_file_conv.filter(lambda row: row[3] == '1' and row[-1] == '6').take(1)

[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

In [None]:
data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')
data_filtered.count()

6

#### .flatMap(...)

In [None]:
translate('The .flatMap(...) method works similarly to .map(...) but returns a flattened results instead of a list.')

Метод .flatmap (...) работает аналогично .map (...), но возвращает сплющенные результаты вместо списка.


Метод .flatmap (...) работает аналогично .map (...), но возвращает сжатые результаты вместо списка.

In [None]:
data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))
data_2014_flat.take(10)

['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]

#### .distinct()

This method returns a list of distinct values in a specified column.

In [None]:
distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()
distinct_gender

#### .sample(...)

The `.sample()` method returns a randomized sample from the dataset.

In [None]:
fraction = 0.1
data_sample = data_from_file_conv.sample(False, fraction, 666)

data_sample.take(1)

Let's confirm that we really got 10% of all the records.

In [None]:
print('Original dataset: {0}, sample: {1}'.format(data_from_file_conv.count(), data_sample.count()))

#### .leftOuterJoin(...)

Left outer join, just like the SQL world, joins two RDDs based on the values found in both datasets, and returns records from the left RDD with records from the right one appended where the two RDDs match.

In [None]:
rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])
rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])

rdd3 = rdd1.leftOuterJoin(rdd2)
rdd3.take(5)

If we used `.join(...)` method instead we would have gotten only the values for `'a'` and `'b'` as these two values intersect between these two RDDs.

In [None]:
rdd4 = rdd1.join(rdd2)
rdd4.collect()

Another useful method is the `.intersection(...)` that returns the records that are *equal* in both RDDs.

In [None]:
rdd5 = rdd1.intersection(rdd2)
rdd5.collect()

#### .repartition(...)

Repartitioning the dataset changes the number of partitions the dataset is divided into.

In [None]:
rdd1 = rdd1.repartition(4)

len(rdd1.glom().collect())

In [None]:
type(rdd1)

### Actions

#### .take(...)

The method returns `n` top rows from a single data partition.

In [None]:
data_first = data_from_file_conv.take(1)
data_first

If you want somewhat randomized records you can use `.takeSample(...)` instead.

In [None]:
data_take_sampled = data_from_file_conv.takeSample(False, 1, 667)
data_take_sampled

#### .reduce(...)

Another action that processes your data, the `.reduce(...)` method *reduces* the elements of an RDD using a specified method.

In [None]:
rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)

If the reducing function is not associative and commutative you will sometimes get wrong results depending how your data is partitioned.

In [None]:
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1)

I we were to reduce the data in a manner that we would like to *divide* the current result by the subsequent one, we would expect a value of 10

In [None]:
works = data_reduce.reduce(lambda x, y: x / y)
works

However, if you were to partition the data into 3 partitions, the result will be wrong.

In [None]:
data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 3)
data_reduce.reduce(lambda x, y: x / y)

The `.reduceByKey(...)` method works in a similar way to the `.reduce(...)` method but performs a reduction on a key-by-key basis.

In [None]:
data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
data_key.reduceByKey(lambda x, y: x + y).collect()

#### .count()

The `.count()` method counts the number of elements in the RDD.

In [None]:
data_reduce.count()

It has the same effect as the method below but does not require shifting the data to the driver.

In [None]:
len(data_reduce.collect()) # WRONG -- DON'T DO THIS!

If your dataset is in a form of a *key-value* you can use the `.countByKey()` method to get the counts of distinct keys.

In [None]:
data_key.countByKey().items()

#### .saveAsTextFile(...)

As the name suggests, the `.saveAsTextFile()` the RDD and saves it to text files: each partition to a separate file.

In [None]:
data_key.saveAsTextFile('/Users/drabast/Documents/PySpark_Data/data_key.txt')

To read it back, you need to parse it back as, as before, all the rows are treated as strings.

In [None]:
def parseInput(row):
    import re

    pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
    row_split = pattern.split(row)

    return (row_split[1], int(row_split[2]))

data_key_reread = sc \
    .textFile('/Users/drabast/Documents/PySpark_Data/data_key.txt') \
    .map(parseInput)
data_key_reread.collect()

**.foreach(...)**

A method that applies the same function to each element of the RDD in an iterative way.

In [None]:
def f(x):
    print(x)

data_key.foreach(f)