<center><h1>Introduction to Spark</h1></center>

### Корисні посилання
- [Spark Documentation](http://spark.apache.org/docs/latest/)
- [Spark Programming Guide](http://spark.apache.org/docs/latest/programming-guide.html)
- [DataBricks Login](https://community.cloud.databricks.com)
- [Pyspark](https://github.com/jupyter/docker-stacks)
- [Conda]

```conda install -c anaconda-cluster/label/dev_spark
   conda install -c conda-forge pyspark
```


### План
- Історія
- Структури даних
- Використання Apache Spark + Python


## History

- Вперше Apache Spark був випущений 2014.

- Він був розроблений [Matei Zaharia](http://people.csail.mit.edu/matei) як докторська дисертація в Каліфорнійському університеті в Берклі.
- написаний на Scala.

- На відміну від Hadoop, Apache Spark:

    - легкий для встановлення та налаштування.
    - забезпечує більш природний ітеративний робочий процес


## Життєвий цикл програм Spark

1. Створення деякі вхідні RDD із зовнішніх даних або паралелізуйте колекцію у програмі драйвера.
2. "Лінива" трансформація їх, щоб визначити нові RDD, використовуючи такі перетворення, як filter () або map ()
3. Кешування  будь -яких проміжних RDD, які потрібно буде використовувати повторно.
4. Запуск таких дій, як count () та collect (), щоб розпочати паралельне обчислення, яке потім оптимізується та виконується за допомого. Spark.

## Стійкі розподілені набори даних (RDD - Resilient Distributed Datasets)

- Фундаментальна абстракція Apache Spark - це доступна тільки для читання, паралельна, розподілена, відмовостійка колекція, яка називається стійкими розподіленими наборами даних (RDD).

- При роботі з Apache Spark ми ітеративно застосовуємо функції до кожного елементу цих колекцій паралельно для створення нових RDD.

- Здебільшого ви можете думати про та використовувати RDD як розподілені фрейми даних.

## Властивості стійких розподілених наборів даних:

 - Дані розподіляються по вузлах в кластері комп'ютерів.
 - При виході з ладу одного вузла дані не втрачаються.
 - Дані зазвичай зберігаються в таблицях HBase або файлах HDFS.
 - Функції `map` та `reduce` можуть працювати паралельно з різними ключами або різними елементами колекції.

- Фреймворк, що лежить в основі (наприклад, Hadoop або Apache Spark) розподіляє дані і обробку між різними вузлами без будь-якого втручання з боку програміста.


## Операції над розподіленими даними

Операції з розподіленими даними
-  Два типи операцій: **transformations** та **actions**
- transformations "ледачі" (не обчислюються відразу)
- actions виконуються під час виконання дії

## [Transformations](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) (lazy)

```
map() flatMap()
filter()
mapPartitions() mapPartitionsWithIndex()
sample()
union() intersection() distinct()
groupBy() groupByKey()
reduceBy() reduceByKey()
sortBy() sortByKey()
join()
cogroup()
cartesian()
pipe()
coalesce()
repartition()
partitionBy()
...
```

## [Actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)

```
reduce()
collect()
count()
first()
take()
takeSample()
saveToCassandra()
takeOrdered()
saveAsTextFile()
saveAsSequenceFile()
saveAsObjectFile()
countByKey()
foreach()
```

## Додаткові перетворення RDD

Apache Spark пропонує ще багато методів для роботи над колекціями кортежів, що розширюють стандартну структуру Map-Reduce:

    - Сортування: `sortByKey`,` sortBy`, `takeOrdered`
    - Mapping: `flatMap`
    - Фільтрація: `фільтр`
    - Підрахунок: `count`
    - Операції з масивами: `перетин`,` об'єднання`
    - Багато інших: [див. (https://spark.apache.org/docs/latest/programming-guide.html#transformations)
    

## Python API

![PySpark Internals](https://github.com/pnavaro/big-data/blob/master/notebooks/images/YlI8AqEl.png?raw=1)


## Map-Reduce на кластері комп'ютерів

- Код, що наведений вище, не дозволить нам використовувати паралелізм на основі декількох комп'ютерів в кластері.

- Розробка такого фреймворку була б дуже великим проектом з розробки програмного забезпечення.

- Існуючі фреймворки, які можна використовувати:
    - [Apache Hadoop](https://hadoop.apache.org/)
    - [Apache Spark](https://spark.apache.org/)
    
- Приклад нижче для Apache Spark.

## Apache Spark and Map-Reduce

- Ми обробляємо дані, використовуючи функції вищого порядку для перетворення RDD на нові RDD за рахунок map.

- Кожен екземпляр RDD має як мінімум два методи, що відповідають робочому процесу Map-Reduce:
    - `map`
    - `reduceByKey`
    
- Ці методи працюють так само, як відповідні функції, які ми визначили раніше для роботи зі стандартними колекціями Python.  

- В API Apache Spark також є додаткові методи RDD, в тому числі для SQL.  

## Приклад

PySpark не знаходиться на sys.path за замовчуванням, але це не означає, що його не можна використовувати як звичайну бібліотеку. Ви можете зробити це додавши pyspark до sys.path під час виконання. Можна скористатись [findspark](https://github.com/minrk/findspark) робить останнє.

В даному прикладі ми маємо spark контекст sc для використання з маленьким локальним кластером spark з 4 вузлами.

In [1]:
import os, sys
sys.executable

'c:\\Users\\Kolyanys\\miniconda3\\python.exe'

In [2]:
#os.environ["SPARK_HOME"] = "/opt/spark-3.0.1-bin-hadoop2.7"
os.environ["PYSPARK_PYTHON"] = sys.executable

In [4]:
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
     ---------------------------------------- 0.0/317.0 MB ? eta -:--:--
     ---------------------------------------- 0.0/317.0 MB ? eta -:--:--
     -------------------------------------- 0.0/317.0 MB 325.1 kB/s eta 0:16:15
     -------------------------------------- 0.1/317.0 MB 651.6 kB/s eta 0:08:07
     ---------------------------------------- 0.3/317.0 MB 1.6 MB/s eta 0:03:15
     ---------------------------------------- 0.7/317.0 MB 3.4 MB/s eta 0:01:35
     ---------------------------------------- 1.2/317.0 MB 4.7 MB/s eta 0:01:08
     ---------------------------------------- 1.7/317.0 MB 5.8 MB/s eta 0:00:55
     ---------------------------------------- 2.3/317.0 MB 6.6 MB/s eta 0:00:48
     ---------------------------------------- 2.8/317.0 MB 7.1 MB/s eta 0:00:44
     ---------------------------------------- 3.3/317.0 MB 7.7 MB/s eta

In [None]:
import pyspark

sc = pyspark.SparkContext(master="local[*]", appName="FirstExample")
sc.setLogLevel("ERROR")

In [4]:
print(sc) # it is like a Pool Processor executor

<SparkContext master=local[*] appName=FirstExample>


## Cтворення першого RDD

In [5]:
data = list(range(8))
rdd = sc.parallelize(data) # create collection
rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

### Завдання 1
Створіть файл `sample.txt` з пакетом lorem. Прочитайте та завантажте його у RDD із функцією spark `textFile`

In [5]:
!pip install lorem
!pip install Faker

Collecting lorem
  Downloading lorem-0.1.1-py3-none-any.whl (5.0 kB)
Installing collected packages: lorem
Successfully installed lorem-0.1.1
Collecting Faker
  Downloading Faker-25.3.0-py3-none-any.whl (1.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m20.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: Faker
Successfully installed Faker-25.3.0


In [18]:
### Завдання 1
from faker import Faker
fake = Faker()
Faker.seed(0)

with open("sample.txt","w") as f:
    f.write(fake.text(max_nb_chars=1000))

rdd = sc.textFile("sample.txt")

### Збирання (Collect)

Action / To Driver: Повернути всі елементи в RDD у драйвер в єдиному списку

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/DUO6ygB.png?raw=1)

### Завдання 2

Зберіть текст, який ви раніше прочитали, з файлу `sample.txt`.

In [16]:
dataCollect = rdd.collect()
print(type(dataCollect))
print(dataCollect)

<class 'list'>
['American whole magazine truth stop whose. On traditional measure example sense peace. Would mouth relate own chair.', 'Together range line beyond. First policy daughter need kind miss.', 'Trouble behavior style report size personal partner. During foot that course nothing draw.', 'Language ball floor meet usually board necessary. Natural sport music white.', 'Onto knowledge other his offer face country. Almost wonder employee attorney. Theory type successful together. Raise study modern miss dog Democrat quickly.', 'Every manage political record word group food break. Picture suddenly drug rule bring determine some forward. Beyond chair recently and.', 'Own available buy country store build before. Already against which continue. Look road article quickly.', 'Per structure attorney author feeling job. Mean always beyond write. Employee toward like total now.', 'Small citizen class morning. Others kind company likely.']


### Map

Transformation / Narrow: повернення нового RDD шляхом застосування функції до кожного елемента цього RDD

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/PxNJf0U.png?raw=1)


In [11]:
rdd = sc.parallelize(list(range(20)))
rdd.map(lambda x: x ** 2).collect() # Square each element

[0,
 1,
 4,
 9,
 16,
 25,
 36,
 49,
 64,
 81,
 100,
 121,
 144,
 169,
 196,
 225,
 256,
 289,
 324,
 361]

### Завдання 3

Замініть функцію лямбда на функцію, яка містить паузу (сон (1)) і перевірте, чи `map` операція паралелізована.

In [12]:
import time
rdd.map(lambda x: time.sleep(1)).collect()

[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]

### Фільтр

Transformation / Narrow:  поверніть новий RDD, що містить лише елементи, що задовольняють предикату

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/GFyji4U.png?raw=1)


In [14]:
rdd = sc.parallelize(list(range(20)))
rdd.filter(lambda x: x % 2 == 0).collect()

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

### FlatMap

Transformation / Narrow: Поверніть новий RDD, спочатку застосувавши функцію до всіх елементів цього RDD, а потім вирівнявши результати

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/TsSUex8.png?raw=1)

In [15]:
rdd = sc.parallelize([1,2,3])
rdd.flatMap(lambda x: (x, x*100, 42)).collect()

[1, 100, 42, 2, 200, 42, 3, 300, 42]

In [17]:
rdd = sc.parallelize([[1,2,3],[4,5,6],[7,8,9]])
rdd.flatMap(lambda x: x).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]

### Завдання 4

Використайте FlatMap, щоб очистити текст з файлу `sample.txt` file. Хменшіть регістр, приберіть точки і розділіть на слова.

In [30]:
import re
def clean_text(text):
  text = text.lower()
  text = re.sub(r'[^\w\s]', '', text)
  return text.split()

rdd = sc.textFile("sample.txt")
cleaned_text = rdd.flatMap(clean_text)
words = cleaned_text.collect()

print(words)

['american', 'whole', 'magazine', 'truth', 'stop', 'whose', 'on', 'traditional', 'measure', 'example', 'sense', 'peace', 'would', 'mouth', 'relate', 'own', 'chair', 'together', 'range', 'line', 'beyond', 'first', 'policy', 'daughter', 'need', 'kind', 'miss', 'trouble', 'behavior', 'style', 'report', 'size', 'personal', 'partner', 'during', 'foot', 'that', 'course', 'nothing', 'draw', 'language', 'ball', 'floor', 'meet', 'usually', 'board', 'necessary', 'natural', 'sport', 'music', 'white', 'onto', 'knowledge', 'other', 'his', 'offer', 'face', 'country', 'almost', 'wonder', 'employee', 'attorney', 'theory', 'type', 'successful', 'together', 'raise', 'study', 'modern', 'miss', 'dog', 'democrat', 'quickly', 'every', 'manage', 'political', 'record', 'word', 'group', 'food', 'break', 'picture', 'suddenly', 'drug', 'rule', 'bring', 'determine', 'some', 'forward', 'beyond', 'chair', 'recently', 'and', 'own', 'available', 'buy', 'country', 'store', 'build', 'before', 'already', 'against', 'whi

### GroupBy

Transformation / Wide: погрупуйте дані у початковому RDD. Створіть пари, де ключ - це результат користувацької функції, а значення - це всі елементи, для яких функція дає цей ключ.

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/gdj0Ey8.png?raw=1)

In [35]:
rdd = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
rdd = rdd.groupBy(lambda w: w[0])
[(k, list(v)) for (k, v) in rdd.collect()]

[('J', ['John', 'James']), ('F', ['Fred']), ('A', ['Anna'])]

### GroupByKey

Transformation / Wide: згрупуйте значення для кожного ключа в початковому RDD. Створіть нову пару, в якій почататковий ключ відповідає цій зібраній групі значень.

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/TlWRGr2.png?raw=1)

In [36]:
rdd = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])
rdd = rdd.groupByKey()
[(j[0], list(j[1])) for j in rdd.collect()]

[('B', [5, 4]), ('A', [3, 2, 1])]

### Join

Transformation / Wide:  поверніть новий RDD, що містить всі пари елементів, що мають однаковий ключ у початкових RDD.

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/YXL42Nl.png?raw=1)

In [37]:
x = sc.parallelize([("a", 1), ("b", 2)])
y = sc.parallelize([("a", 3), ("a", 4), ("b", 5)])
x.join(y).collect()

[('b', (2, 5)), ('a', (1, 3)), ('a', (1, 4))]

In [39]:
x = sc.parallelize([("a", 1), ("b", 2)])
y = sc.parallelize([("a", 1), ("a", 3), ("b", 2)])
x.join(y).collect()

[('b', (2, 2)), ('a', (1, 1)), ('a', (1, 3))]

### Розділювач

Transformation / Wide: поверніть новий RDD, що містить елементи, відмінні від початкового RDD (виключаючи все дублікати)

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/Vqgy2a4.png?raw=1)

In [38]:
rdd = sc.parallelize([1,2,3,3,4])
rdd.distinct().collect()

[2, 4, 1, 3]

### KeyBy

Transformation / Narrow: створіть парний RDD, утворюючи по одній парі для кожного елемента в початковому RDD. Ключ пари обчислюється із значення за допомогою функції, що задається користувачем.

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/nqYhDW5.png?raw=1)

In [40]:
rdd = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
rdd.keyBy(lambda w: w[0]).collect()

[('J', 'John'), ('F', 'Fred'), ('A', 'Anna'), ('J', 'James')]

## Actions

### Map-Reduce operation

Action / To Driver:: агрегуйте всі елементи RDD, попарно застосовуючи  користувацьку функцію до елементів і часткових результатів, і поверніть результат драйверу.

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/R72uzwX.png?raw=1)

In [41]:
from operator import add
rdd = sc.parallelize(list(range(8)))
rdd.map(lambda x: x ** 2).reduce(add) # reduce is an action!

140

### Max, Min, Sum, середнє (Mean), дисперсія (Variance), стандарне квадратичне відхилення (Stdev)
Action / To Driver: обчисліть відповідну функцію (з наведених у назві) з числового RDD.

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/HUCtib1.png?raw=1)

In [48]:
rdd = sc.parallelize([1,4,2])
rdd.stdev()

1.247219128924647

### CountByKey

Action / To Driver: поверніть  карту ключів (map of keys) і кількість їх появ в RDD.

![](https://github.com/pnavaro/big-data/blob/master/notebooks/images/jvQTGv6.png?raw=1)

In [49]:
rdd = sc.parallelize([('J', 'James'), ('F','Fred'),
                    ('A','Anna'), ('J','John')])

rdd.countByKey()

defaultdict(int, {'J': 2, 'F': 1, 'A': 1})

In [53]:
rdd = sc.parallelize(['James','Fred','Anna','John'])
rdd.keyBy(lambda w: w[0]).countByKey()

defaultdict(int, {'J': 2, 'F': 1, 'A': 1})

In [54]:
# Stop the local spark cluster
sc.stop()

## Приклад Word Count

- У цьому випадку input має бути - це набір URL-адрес, кожен запис - це документ <br> <br> <br>

- **Завдання: Підрахувати, скільки разів кожне слово зустрічається в наборі даних.**

## Word Count: Map


Вхідними даними для $\operatorname{map}$ є процес mapping:
- Key: URL
- Value: вміст документа <br>
$\left< document1, to \; be \; or \; not \; to \; be \right>$  
    

- У цьому прикладі функція $\operatorname{map}$ обробляє даний URL і створює mapping:
- Таким чином, наш початковий набір даних буде перетворений в:
  
  $\left< to, 1 \right>$
  $\left< be, 1 \right>$
  $\left< or, 1 \right>$
  $\left< not, 1 \right>$
  $\left< to, 1 \right>$
  $\left< be, 1 \right>$

## Word Count: Reduce


- Операція reduce групує значення відповідно до їх ключів, а потім виконує reduce для кожного ключа.

- Таким чином, колекції розділені по різним одиницям зберігання.

- Map-Reduce згортає дані таким чином, щоб мінімізувати копіювання даних в кластері.

- Дані в різних блоках скорочуються (reduce) окремо паралельно.

- Кінцевий результат - скорочення скорочених даних в кожному блоці.

- Тому дуже важливо, щоб наш оператор був комутативним і асоціативним.

- В наведеному прикладі функцією є оператор +

  $\left< be, 2 \right>$  
  $\left< not, 1 \right>$  
  $\left< or, 1 \right>$  
  $\left< to, 2 \right>$  



## Word-count in Apache Spark



In [55]:
words = "to be or not to be".split()
words

['to', 'be', 'or', 'not', 'to', 'be']

In [7]:
#Don't Execute this on Databricks
#To be used if executing via docker

import pyspark
sc = pyspark.SparkContext('local[*]')

In [59]:
words_rdd = sc.parallelize(words)
words_rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289

### Проведення mapping для RDD

- Тепер, коли ми викликаємо методи `map` або `reduceByKey`  на `my_rdd` w ми можемо налаштувати паралельне обчислення обробки в кластері.


In [60]:
word_tuples_rdd = words_rdd.map(lambda x: (x, 1))
word_tuples_rdd

PythonRDD[1] at RDD at PythonRDD.scala:53

### Збирання RDD
- Зауважте, що у нас поки немає результату.

- Обчислення не виконуються, поки ми не попросимо зібрати кінцевий результат.

- Це можна зробити викликавши метод `collect()` .

- Будьте обережні з методом `collect`, оскільки всі зібрані вами дані повинні міститися в пам'яті.

- Метод `take` подібний до` collect`, але повертає лише перші елементи $ n $.

In [61]:
word_tuples_rdd.collect()

[('to', 1), ('be', 1), ('or', 1), ('not', 1), ('to', 1), ('be', 1)]

In [62]:
word_tuples_rdd.take(4)

[('to', 1), ('be', 1), ('or', 1), ('not', 1)]

### Редукція для RDD

- However, we require additional processing to reduce the data using the word key.

- Однак ми потребуємо додаткової обробки, щоб зменшити дані за допомогою ключа слова.

In [63]:
word_counts_rdd = word_tuples_rdd.reduceByKey(lambda x, y: x + y)
word_counts_rdd

PythonRDD[8] at RDD at PythonRDD.scala:53

- Тепер можна отримати остаточний результат:

In [64]:
word_counts = word_counts_rdd.collect()
word_counts

[('to', 2), ('be', 2), ('or', 1), ('not', 1)]

### "Лінива" оцінка

- Реалізується лише коли ми викликаємо `collect () ', обробка виконується в кластері.

- Виклик `collect ()` спричинить виконання операцій `map` та` reduceByKey`.

- Якщо отримана колекція дуже велика, це може бути дорогою операцією.


In [None]:
word_counts_rdd.take(2)

[('to', 2), ('be', 2)]

### Об'єднання MapReduce в одну команду


In [65]:
text = "to be or not to be".split()
rdd = sc.parallelize(text)
counts = rdd.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
counts.collect()

[('to', 2), ('be', 2), ('or', 1), ('not', 1)]

## Фактично, роботу з Word-count в Apache Spark можна звести до (згідно попередніх прикладів):
- Створіть rdd за допомогою методу `SparkContext.textFile method`
- понизьте регістр, видаліть розділові знаки та розділити за допомогою `rdd.flatMap`
- використовуйте `rdd.map` для створення списку пари ключ/значення (слово, 1)
- використовуйте`rdd.reduceByKey`, щоб отримати всі частоти слів
-використовуйте`rdd.takeOrdered`, щоб отримати відсортовані частоти слів

Вся документація доступна [тут](https://spark.apache.org/docs/2.1.0/api/python/pyspark.html?highlight=textfile#pyspark.SparkContext) для textFile і [тут](https://spark.apache.org/docs/2.1.0/api/python/pyspark.html?highlight=textfile#pyspark.RDD) для RDD.

Для загального огляду [див.](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

In [68]:
from lorem import text
with open("sample.txt","w") as f:
  for i in range(2):
    f.write(text())

In [73]:
rdd = sc.textFile("./sample.txt")
rdd.flatMap(clean_text).map(lambda x: (x,1)).reduceByKey(add).collect()

[('magnam', 23),
 ('ipsum', 18),
 ('labore', 12),
 ('modi', 14),
 ('quiquia', 10),
 ('est', 16),
 ('tempora', 11),
 ('consectetur', 16),
 ('porro', 17),
 ('sed', 15),
 ('amet', 19),
 ('eius', 20),
 ('quaerat', 9),
 ('situt', 1),
 ('etincidunt', 18),
 ('non', 15),
 ('voluptatem', 19),
 ('aliquam', 19),
 ('velit', 16),
 ('ut', 23),
 ('quisquam', 8),
 ('sit', 15),
 ('adipisci', 17),
 ('dolore', 17),
 ('dolor', 13),
 ('numquam', 16),
 ('neque', 20),
 ('dolorem', 12)]

## Підрахунок $\pi$ з використанням Spark

- Ми можемо оцінити приблизне значення для $ \ pi $, використовуючи метод Монте-Карло:


1. Впишіть коло у квадрат
2. Згенеруйте випадкові точки у квадраті
3. Визначте кількість точок квадрата, які також знаходяться в колі
4. Нехай $ r $ - це кількість точок у колі, поділена на кількість точок у квадраті, тоді $ \ pi \ приблизно    4 r $.
    
- Зверніть увагу, що чим більше згенерованих точок, тим кращим буде наближення

Див. (https://computing.llnl.gov/tutorials/parallel_comp/#ExamplesPI).

In [74]:
import numpy as np

def sample(p):
    #here x,y are the x,y coordinate
    x, y = np.random.random(), np.random.random()
    #Because the circle is of
    return 1 if x*x + y*y < 1 else 0

NUM_SAMPLES = 1000000

count = sc.parallelize(range(0, NUM_SAMPLES)).map(sample) \
             .reduce(lambda a, b: a + b)
#Area  = 4*PI*r
r = float(count) / float(NUM_SAMPLES)
r
print ("Pi is approximately %f" % (4.0 * r))

Pi is approximately 3.143368


### Завдання 5

Обчислити інтеграл, використовуючи той самий метод, що й у прикладі обчислення PI
$$
I = \int_0^1 \exp(-x^2) dx
$$
Ви можете перевірити свій результат за допомогою numpy



In [None]:
# numpy evaluates solution using numeric computation.
# It uses discrete values of the function
import numpy as np
x = np.linspace(0,1,1000)
np.trapz(np.exp(-x*x),x)

numpy і scipy оцінює рішення за допомогою числових обчислень. Він використовує дискретні значення функції

In [None]:
import numpy as np
from scipy.integrate import quad
quad(lambda x: np.exp(-x*x), 0, 1)
# note: the solution returned is complex

### Аналіз кореляції між цінами закриття акцій різних компаній та виборі пари компаній з найвищою кореляцією для подальшого аналізу або торгівлі

доопрцювання коду:
- Визначення змінних start та end: У коді не визначені змінні start та end, які використовуються для встановлення періоду завантаження історичних даних про ціни акцій. Ці змінні повинні бути задані перед виконанням завдання, щоб визначити, який період історичних даних слід завантажити.

- Імпорт бібліотеки yf (yfinance): В коді не вказано імпорт бібліотеки yf (ймовірно, yfinance), яка використовується для завантаження історичних даних про ціни акцій. Перед використанням методу yf.download(), потрібно імпортувати цю бібліотеку, щоб вона була доступна для використання.

Завдання:

1. **Створення RDD з даних про ціни акцій**:
   - `spark.read.csv` або `spark.read.parquet` - для завантаження даних з CSV або Parquet файлів.
   - `spark.sparkContext.textFile` - для завантаження даних з текстових файлів.
   - `rdd.map` - для перетворення рядків даних у кортежі (тикер, часовий ряд цін).

2. **Фільтрація даних**:
   - `rdd.filter` - для відфільтровування RDD за певною умовою (діапазон дат, підмножина тикерів).

3. **Обчислення кореляції між цінами акцій**:
   - `rdd.cartesian` - для створення декартового добутку всіх пар часових рядів цін акцій.
   - `rdd.filter` - для фільтрації пар часових рядів, де ряди не однакові.
   - `func.corr` з `pyspark.sql.functions` - для обчислення коефіцієнта кореляції Пірсона між двома часовими рядами.
   - `rdd.map` - для перетворення кожної пари часових рядів у кортеж (тикер1, тикер2, коефіцієнт кореляції).

4. **Знаходження пари компаній з найвищою кореляцією**:
   - `rdd.reduceByKey` - для знаходження максимального коефіцієнта кореляції серед усіх пар.
   - `rdd.top` - для отримання кортежу з найвищим коефіцієнтом кореляції.

5. **Збереження результатів**:
   - `rdd.coalesce(1).saveAsTextFile` або `rdd.coalesce(1).write.csv` - для збереження RDD у текстовому або CSV форматі.
   - `rdd.coalesce(1).write.parquet` - для збереження RDD у форматі Parquet.

6. **Візуалізація результатів (опціонально)**:
   - `toLocalIterator` - для перетворення RDD у Python-ітератор для подальшої обробки.
   - Бібліотеки візуалізації даних, такі як Matplotlib або Seaborn, для побудови графіків часових рядів цін закриття акцій.


набори даних, що можуть бути використані (csv):
1. https://www.kaggle.com/ehallmar/daily-historical-stock-prices-1970-2018 -

2. https://pkgstore.datahub.io/core/nasdaq-listings/nasdaq-listed_csv/data/7665719fb51081ba0bd834fde71ce822/nasdaq-listed_csv.csv

In [None]:
#для json

import os  # library to get directory and file paths
import tarfile # this module makes possible to read and write tar archives

def extract_data(name, where):
    datadir = os.path.join(where,name)
    if not os.path.exists(datadir):
       print("Extracting data...")
       tar_path = os.path.join(where, name+'.tgz')
       with tarfile.open(tar_path, mode='r:gz') as data:
          data.extractall(where)

extract_data('daily-stock','data') # this function call will extract json files

In [77]:
import json
import pandas as pd
import os, glob

here = os.getcwd()
datadir = os.path.join(here,'data','daily-stock')
filenames = sorted(glob.glob(os.path.join(datadir, '*.json')))
filenames

[]

In [None]:
%rm data/daily-stock/*.h5

In [None]:
from glob import glob
import os, json
import pandas as pd

for fn in filenames:
    with open(fn) as f:
        data = [json.loads(line) for line in f]

    df = pd.DataFrame(data)

    out_filename = fn[:-5] + '.h5'
    df.to_hdf(out_filename, '/data')
    print("Finished : %s" % out_filename.split(os.path.sep)[-1])

filenames = sorted(glob(os.path.join('data', 'daily-stock', '*.h5')))  # data/json/*.json

In [5]:
import requests  # імпортуємо модуль requests для виконання HTTP-запитів
import pandas as pd  # імпортуємо бібліотеку pandas для роботи з даними у вигляді DataFrame
import io  # імпортуємо модуль io для роботи з рядками

# URL, за яким знаходиться CSV-файл з інформацією про компанії
url = "https://pkgstore.datahub.io/core/nasdaq-listings/nasdaq-listed_csv/data/7665719fb51081ba0bd834fde71ce822/nasdaq-listed_csv.csv"

# Виконуємо GET-запит за вказаним URL та отримуємо вміст відповіді
s = requests.get(url).content

# Завантажуємо вміст відповіді у DataFrame за допомогою бібліотеки pandas
companies = pd.read_csv(io.StringIO(s.decode('utf-8')))
companies

Unnamed: 0,Symbol,Company Name,Security Name,Market Category,Test Issue,Financial Status,Round Lot Size
0,AAIT,iShares MSCI All Country Asia Information Tech...,iShares MSCI All Country Asia Information Tech...,G,N,N,100.0
1,AAL,"American Airlines Group, Inc.","American Airlines Group, Inc. - Common Stock",Q,N,N,100.0
2,AAME,Atlantic American Corporation,Atlantic American Corporation - Common Stock,G,N,N,100.0
3,AAOI,"Applied Optoelectronics, Inc.","Applied Optoelectronics, Inc. - Common Stock",G,N,N,100.0
4,AAON,"AAON, Inc.","AAON, Inc. - Common Stock",Q,N,N,100.0
...,...,...,...,...,...,...,...
2962,ZN,Zion Oil & Gas Inc,Zion Oil & Gas Inc - Common Stock,G,N,N,100.0
2963,ZNGA,Zynga Inc.,Zynga Inc. - Class A Common Stock,Q,N,N,100.0
2964,ZSPH,"ZS Pharma, Inc.","ZS Pharma, Inc. - Common Stock",G,N,N,100.0
2965,ZU,"zulily, inc.","zulily, inc. - Class A Common Stock",Q,N,N,100.0


In [None]:
!pip install yfinance

In [None]:
import yfinance as yf
# Отримуємо список символів акцій з DataFrame
Symbols = companies['Symbol'].tolist()

# Створюємо порожній DataFrame, в якому будуть зберігатися дані про ціни акцій
filenames = pd.DataFrame()

# Ітеруємося по кожному символу акції
for i in Symbols:

    # Друк символу, який завантажується
    print(str(Symbols.index(i)) + str(' : ') + i, sep=',', end=',', flush=True)

    try:
        # Завантажуємо історичні дані про ціни акцій за вказаний період
        stock = yf.download(i, start='2020-01-01', end='2023-12-31', progress=False)

        # Додаємо інформацію про ціни акцій до DataFrame
        if len(stock) == 0:
            None
        else:
            stock['Name'] = i
            filenames = filenames.append(stock, sort=False)
            break
    except Exception:
        None

In [None]:
filenames.head(10)

### Робота з вже завантаженими даними

In [None]:
filenames

In [None]:
with pd.HDFStore('data/daily-stock/aet.h5') as hdf:
    # This prints a list of all group names:
    print(hdf.keys())

In [None]:
df_test = pd.read_hdf('data/daily-stock/aet.h5')

In [None]:
%%time

series = []
for fn in filenames:   # Simple map over filenames
    # Завантажуємо дані про ціни закриття з файлу та додаємо до списку
    series.append(pd.read_hdf(fn)["close"])

    # Ініціалізуємо порожній список для зберігання результатів обчислення кореляцій
    results = []

    # Поелементно порівнюємо кожну пару даних про ціни закриття та обчислюємо кореляцію
    for a in series:
        for b in series:
            if not (a == b).all():  # Фільтруємо порівняння однакових серій
                results.append(a.corr(b))  # Додаємо кореляцію до списку результатів

    # Знаходимо максимальну кореляцію
    result = max(results)

#### my

In [3]:
import pandas as pd
fn = "historical_stock_prices.csv"
series = [pd.read_csv(fn)["close"]]

# Ініціалізуємо порожній список для зберігання результатів обчислення кореляцій
results = []

# Поелементно порівнюємо кожну пару даних про ціни закриття та обчислюємо кореляцію
for a in series:
    for b in series:
        if not (a == b).all():  # Фільтруємо порівняння однакових серій
            results.append(a.corr(b))  # Додаємо кореляцію до списку результатів

# Знаходимо максимальну кореляцію
print(max(results))

ValueError: max() iterable argument is empty

### Завдання 6

Паралелізуйте наведений вище код за допомогою Apache Spark.

In [None]:
fn = "historical_stock_prices.csv"
rdd = sc.textFile(fn)
print(rdd.collect())

Час обчислень буде повільніший, тому що є багато налаштувань, створення workers, багато комунікацій та ін.

### Завдання 7 Робота з генетичними даними

Використовуйте RDD для розрахунку вмісту GC у файлі:

$$\frac{G+C}{A+T+G+C}\times100 \% $$

Створіть rdd з вашого файлу у каталозі даних і порахуйте «G» та «C», а потім поділіть на загальну кількість баз.

https://www.kaggle.com/datasets?search=nucleotide

### Завдання 8
Обчисліть найчастішу послідовність з 5 основ