<a href="https://colab.research.google.com/github/MatveiV/ML_Fin_Notebooks/blob/main/%D0%A2%D0%B5%D0%BC%D0%B0_14_BigData_%D0%B4%D0%BB%D1%8F_%D1%84%D0%B8%D0%BD%D0%B0%D0%BD%D1%81%D0%BE%D0%B2%D0%BE%D0%B3%D0%BE_%D0%B0%D0%BD%D0%B0%D0%BB%D0%B8%D0%B7%D0%B0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Big Data vs Small Data в финансах


## Примеры применения больших данных в финансовой индустрии

Большие данные открывают новые возможности для финансовой индустрии, позволяя компаниям улучшать свои продукты и услуги, а также принимать более обоснованные решения. Рассмотрим несколько примеров:

### Управление рисками

Финансовые учреждения используют большие данные для анализа рыночных тенденций и прогнозирования рисков. Это позволяет им разрабатывать стратегии управления рисками и минимизировать потенциальные убытки.

### Персонализация услуг

С помощью больших данных компании могут анализировать поведение клиентов и предлагать персонализированные продукты и услуги. Это повышает удовлетворенность клиентов и увеличивает их лояльность.

### Обнаружение мошенничества

Анализ больших данных позволяет выявлять аномалии и подозрительные активности, что помогает в обнаружении и предотвращении мошенничества. Это особенно важно для защиты финансовых транзакций и данных клиентов.

### Инвестиционные стратегии

Большие данные используются для разработки и оптимизации инвестиционных стратегий. Анализ исторических данных и текущих рыночных условий позволяет инвесторам принимать более обоснованные решения.

### Прогнозирование рыночных трендов

С помощью больших данных компании могут прогнозировать будущие рыночные тренды и адаптировать свои стратегии в соответствии с изменяющимися условиями. Это дает им конкурентное преимущество и позволяет оставаться на шаг впереди конкурентов.

## Понимание различий между большими и малыми данными

В современном мире данные играют ключевую роль в принятии решений, особенно в финансовой индустрии. Однако не все данные одинаковы, и понимание различий между большими и малыми данными является важным аспектом для эффективного использования информации.

### Малые данные

Малые данные представляют собой ограниченные наборы данных, которые могут быть легко обработаны и проанализированы с использованием традиционных инструментов и методов. Они часто структурированы и могут быть представлены в виде таблиц или баз данных. Примеры малых данных включают:
- Финансовые отчеты компаний
- Ежедневные транзакции
- Исторические данные по акциям

### Большие данные

Большие данные, с другой стороны, характеризуются объемом, разнообразием и скоростью. Они требуют использования специализированных технологий и фреймворков для обработки и анализа. Большие данные могут быть как структурированными, так и неструктурированными, и включают:
- Потоки данных с финансовых рынков в реальном времени
- Социальные медиа и новостные ленты
- Данные о поведении клиентов и транзакциях

### Основные различия

1. **Объем**: Малые данные обычно имеют ограниченный объем, в то время как большие данные могут достигать терабайтов и петабайтов.
2. **Скорость**: Большие данные часто поступают в реальном времени, требуя мгновенной обработки, в отличие от малых данных, которые могут быть обработаны в пакетном режиме.
3. **Разнообразие**: Большие данные могут включать текст, изображения, видео и другие форматы, тогда как малые данные обычно структурированы.


In [None]:
%matplotlib notebook
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib, sys, os
matplotlib.get_backend()
#%matplotlib --list
%matplotlib inline
import warnings
warnings.filterwarnings("ignore")
if 'google.colab' in sys.modules:
    print('Colab environment detected.')
if (os.environ.get('DISPLAY', '') == '') and (os.name != 'nt'):
    print('No display found. Using non-interactive Agg backend')
    mpl.use('Agg')
import pandas as pd
import numpy as np

Colab environment detected.
No display found. Using non-interactive Agg backend


In [None]:
small_data = pd.DataFrame({
    'Date': pd.date_range(start='2023-01-01', periods=5, freq='D'),  # даты
    'Revenue': [100, 150, 200, 250, 300],  # доходы
    'Expenses': [80, 120, 160, 200, 240]  # расходы
})

print("Малые данные:\n",small_data,small_data.info())

large_data = pd.DataFrame({
    'Timestamp': pd.date_range(start='2023-01-01', periods=1e8, freq='T'),
    'Value': [np.random.rand(1000000) * 1000.0]*int(1e18)
})

print("\nИнформация о наборе данных:", large_data.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 3 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   Date      5 non-null      datetime64[ns]
 1   Revenue   5 non-null      int64         
 2   Expenses  5 non-null      int64         
dtypes: datetime64[ns](1), int64(2)
memory usage: 248.0 bytes
Малые данные:
         Date  Revenue  Expenses
0 2023-01-01      100        80
1 2023-01-02      150       120
2 2023-01-03      200       160
3 2023-01-04      250       200
4 2023-01-05      300       240 None
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 2 columns):
 #   Column     Dtype         
---  ------     -----         
 0   Timestamp  datetime64[ns]
 1   Value      object        
dtypes: datetime64[ns](1), object(1)
memory usage: 1.5+ GB

Информация о наборе данных: None


# Управление данными и кэширование в финансовых приложениях

В этом разделе мы рассмотрим, как использовать Redis и etcd для управления состоянием и кэшированием данных в финансовых приложениях. Эти [инструменты](https://db-engines.com/en/ranking) позволяют эффективно обрабатывать большие объемы данных, обеспечивая высокую производительность и надежность систем.

In [None]:
%%capture
!apt-get update
!apt-get install -y redis-server
!redis-server --daemonize yes # Запуск Redis в фоновом режиме
!redis-server --version

## Пример использования Redis для кэширования данных

В этом примере мы создадим простое приложение на Python, которое будет использовать Redis для кэширования результатов вычислений. Это позволит ускорить доступ к часто запрашиваемым данным.

### Преимущества использования Redis в финансовых приложениях:
- **Высокая скорость**: Данные хранятся в оперативной памяти, что обеспечивает быстрый доступ.
- **Поддержка различных структур данных**: Позволяет хранить и обрабатывать данные в удобной форме.
- **Масштабируемость**: Легко масштабируется для обработки больших объемов данных.
- **Надежность**: Поддерживает механизмы репликации и персистентности данных.

In [None]:
%%capture
!pip install redis -q
!pip install numpy==1.26.4 pandas==2.0.3 --force-reinstall
import pandas as pd
import numpy as np
import timeit
import redis
from math import sin

r = redis.Redis(host='localhost', port=6379, db=0) # Создаем объект подключения к Redis

In [None]:
# Функция для вычисления и кэширования данных
def expensive_computation(x):
    # Проверяем, есть ли результат в кэше
    if r.exists(f'computation:{x[0]}'):
        #print('есть результат в кэше !')
        return r.get(f'computation:{x[0]}')
    else:
        #print('нет в кеше, делаем вычисления')
        result = float([sin(i**2) ** 2 for i in x][0])  # Пример сложных вычислений
        r.set(f'computation:{x[0]}', result) # Сохраняем результат в кэше
        return result

print(f"-кеширование, 1-ый запуск")
%timeit -n1 -r1 expensive_computation([5])
print(f"+кеширование, 2-ый запуск")
%timeit expensive_computation([5])

-кеширование, 1-ый запуск
1.48 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
+кеширование, 2-ый запуск
224 µs ± 56.4 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


## Пример использования etcd для кэширования данных

etcd — это распределенное хранилище ключ-значение, которое обеспечивает надежное хранение данных и управление состоянием в распределенных системах. Оно часто используется для хранения конфигураций и координации сервисов.

### Преимущества использования etcd в финансовых приложениях:
- **Надежность**: Обеспечивает консистентность данных в распределенной среде.
- **Высокая доступность**: Поддерживает кластеризацию и автоматическое восстановление.
- **Простота использования**: Легко интегрируется с другими системами и сервисами.
- **Безопасность**: Поддерживает аутентификацию и шифрование данных.

In [None]:
%%capture
!apt-get install -y etcd
!pip install etcd3 protobuf==3.20.0

In [None]:
from subprocess import Popen, PIPE
proc = Popen("etcd --advertise-client-urls http://127.0.0.1:2379 --listen-client-urls http://127.0.0.1:2379",
    shell=True,
    stdout=PIPE, stderr=PIPE
)

In [None]:
!etcd --version
!export PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python

etcd Version: 3.3.25
Git SHA: Not provided (use ./build instead of go build)
Go Version: go1.18.1
Go OS/Arch: linux/amd64


In [None]:
!etcdctl cluster-health #--help

member 8e9e05c52164694d is healthy: got healthy result from http://127.0.0.1:2379
cluster is healthy


In [None]:
import etcd3
etcd = etcd3.client("127.0.0.1")
etcd.put('это_ключ', 'это_значение') #записали
value, metadata = etcd.get('это_ключ') #считали
print("Ключ: " + metadata.key.decode('utf-8'), "\nЗначение: " + value.decode('utf-8'))

Ключ: это_ключ 
Значение: это_значение


In [None]:
# Функция для вычисления и кэширования данных
def expensive_computation(x):
    # Проверяем, есть ли результат в кэше
    try:
        #print('есть результат в кэше !')
        return etcd.get(f'computation:{x[0]}')
    except:
        #print('нет в кеше, делаем вычисления')
        result = float([sin(i**2) ** 2 for i in x][0])  # Пример сложных вычислений
        etcd.set(f'computation:{x[0]}', result) # Сохраняем результат в кэше
        return result

print(f"-кеширование, 1-ый запуск")
%timeit -n1 -r1 expensive_computation([5])
print(f"+кеширование, 2-ый запуск")
%timeit expensive_computation([5])

-кеширование, 1-ый запуск
702 µs ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
+кеширование, 2-ый запуск
719 µs ± 209 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


Рассмотрим процесс записи или набора ключа-значения в etcd:

Шаг 1: Ведущий добавляет запись ключа-значения в журнал предварительной записи (WAL). Для этого действия требуется запись на диск с помощью функции fsync(), что влияет на задержку на диске.

Шаг 2a: Ведущий уведомляет подписчиков об изменении. Это действие связано с сетевым взаимодействием, что влияет на задержку в сети.

Шаг 2b: Подписчики добавляют запись в свой локальный WAL. Для этого действия требуется запись на диск с помощью функции fsync(), что приводит к задержке на диске.

Шаг 2c: Подписчики уведомляют лидера о том, что они записали ключ-значение в свой WAL. Для этого действия требуется подключение к сети, что влияет на задержку в сети.

Шаг 3: Лидер ожидает подтверждения от большинства (кворума) и фиксирует ввод ключа-значения. Для этого действия требуется еще одна запись на диск с помощью функции fsync(), что влияет на задержку на диске.

Шаг 4a: Ведущий уведомляет подписчиков о том, что запись зафиксирована. Для этого действия требуется подключение к сети, что влияет на задержку в сети.

Шаг 4b: После подтверждения от лидеров подписчики фиксируют ввод значения ключа. Для этого действия требуется запись на диск с помощью функции fsync(), что приводит к задержке на диске.

<img src="https://www.redhat.com/rhdc/managed-files/ohc/WC%201.png" width="50%">

## Вывод

Использование Redis и etcd в финансовых приложениях позволяет эффективно управлять данными и состоянием, обеспечивая высокую производительность и надежность систем. Эти инструменты помогают справляться с большими объемами данных и сложными задачами, характерными для финансовой индустрии.

# SQL базы данных с использованием графических процессоров (GPU)

В этом разделе мы рассмотрим, как SQL базы данных могут использовать графические процессоры (GPU) для ускорения обработки данных. Мы обсудим преимущества использования GPU в финансовых приложениях и приведем примеры таких баз данных.

## Обзор SQL баз данных, использующих GPU

Графические процессоры (GPU) изначально были разработаны для обработки графики и выполнения параллельных вычислений. Однако, благодаря их архитектуре, они также могут быть использованы для ускорения обработки данных в SQL базах данных. GPU могут выполнять тысячи потоков одновременно, что делает их идеальными для задач, требующих высокой степени параллелизма.

### Примеры SQL баз данных с поддержкой GPU

1. **BlazingSQL**: Это распределенная SQL база данных, которая использует GPU для ускорения обработки данных. BlazingSQL интегрируется с экосистемой RAPIDS, что позволяет выполнять анализ данных на GPU.

2. **OmniSci (ранее MapD)**: Это аналитическая платформа, которая использует GPU для выполнения SQL запросов с высокой скоростью. OmniSci позволяет обрабатывать миллиарды строк данных в реальном времени.

3. **Kinetica**: Это база данных, которая использует GPU для ускорения аналитических запросов. Kinetica поддерживает как SQL, так и NoSQL запросы, что делает ее универсальным решением для различных задач.

### Преимущества использования GPU в SQL базах данных

1. **Высокая производительность**: GPU могут обрабатывать данные значительно быстрее, чем традиционные CPU, благодаря своей способности выполнять множество операций параллельно.

2. **Эффективность при работе с большими объемами данных**: GPU могут обрабатывать большие объемы данных за короткое время, что делает их идеальными для финансовых приложений, где скорость обработки данных критически важна.

3. **Снижение затрат**: Использование GPU может снизить затраты на оборудование, так как одна GPU может заменить несколько CPU серверов.

4. **Реализация сложных аналитических задач**: GPU позволяют выполнять сложные аналитические задачи, такие как машинное обучение и глубокое обучение, непосредственно в базе данных.

## Примеры использования GPU в финансовых приложениях

Финансовая индустрия требует обработки огромных объемов данных в реальном времени. GPU могут значительно ускорить этот процесс, предоставляя аналитикам и трейдерам возможность принимать более обоснованные решения.

### Пример 1: Анализ рыночных данных

Финансовые компании могут использовать GPU для анализа рыночных данных в реальном времени. Это позволяет им быстро реагировать на изменения на рынке и принимать решения на основе актуальной информации.

### Пример 2: Управление рисками

GPU могут использоваться для моделирования и анализа рисков. Это позволяет финансовым учреждениям более точно оценивать риски и разрабатывать стратегии их минимизации.

### Пример 3: Обнаружение мошенничества

GPU могут ускорить процессы обнаружения мошенничества, анализируя транзакции в реальном времени и выявляя аномалии, которые могут указывать на мошенническую деятельность.

In [None]:
%%capture
!pip install cudf-cu11 cuml-cu11 cupy-cuda11x
import pandas as pd

In [None]:
print(pd.read_csv("https://github.com/plotly/datasets/raw/master/tips.csv").info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 244 entries, 0 to 243
Data columns (total 7 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   total_bill  244 non-null    float64
 1   tip         244 non-null    float64
 2   sex         244 non-null    object 
 3   smoker      244 non-null    object 
 4   day         244 non-null    object 
 5   time        244 non-null    object 
 6   size        244 non-null    int64  
dtypes: float64(2), int64(1), object(4)
memory usage: 13.5+ KB
None


In [None]:
pdtips_df = pd.read_csv("https://github.com/plotly/datasets/raw/master/tips.csv")
pdtips_df["tip_percentage"] = pdtips_df["tip"] / pdtips_df["total_bill"] * 100

print(f"запуск pandas")
%timeit pdtips_df.groupby("size").tip_percentage.mean()

import cudf

cutips_df = cudf.read_csv("https://github.com/plotly/datasets/raw/master/tips.csv")
cutips_df["tip_percentage"] = cutips_df["tip"] / cutips_df["total_bill"] * 100

print(f"запуск cudf")
%timeit cutips_df.groupby("size").tip_percentage.mean()


запуск pandas
476 µs ± 109 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
запуск cudf
1.91 ms ± 259 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


## Заключение

Использование GPU в SQL базах данных предоставляет значительные преимущества для финансовой индустрии. Это позволяет обрабатывать большие объемы данных с высокой скоростью, снижать затраты и улучшать качество аналитики. Внедрение GPU в финансовые приложения может значительно повысить их эффективность и конкурентоспособность.

# Интеграция с Big Data фреймворками: Trino, Spark, Airflow

В этом разделе мы рассмотрим три ключевых фреймворка для работы с большими данными: Trino, Apache Spark и Apache Airflow. Мы обсудим их особенности, преимущества и способы интеграции для обработки больших объемов данных в финансовой индустрии.

## Trino

Trino (ранее известный как PrestoSQL) — это распределенный SQL-движок с открытым исходным кодом, который позволяет выполнять аналитические запросы к данным, хранящимся в различных источниках. Trino поддерживает множество коннекторов для работы с различными базами данных и хранилищами данных.

### Основные преимущества Trino:
- **Высокая производительность**: Trino оптимизирован для выполнения сложных аналитических запросов с минимальной задержкой.
- **Масштабируемость**: Trino может обрабатывать данные, распределенные по множеству узлов, что позволяет масштабировать систему в зависимости от объема данных.
- **Поддержка различных источников данных**: Trino поддерживает множество коннекторов для работы с различными базами данных и хранилищами данных, включая Hive, MySQL, PostgreSQL и другие.

## Apache Spark

Apache Spark — это распределенная вычислительная платформа с открытым исходным кодом, которая позволяет обрабатывать большие объемы данных с высокой скоростью. Spark поддерживает различные языки программирования, включая Python, Java и Scala, и предоставляет API для работы с данными в режиме реального времени и пакетной обработки.

### Основные компоненты Apache Spark:
- **Spark Core**: Основной компонент, отвечающий за распределенные вычисления и управление ресурсами.
- **Spark SQL**: Модуль для работы с данными в формате SQL, который позволяет выполнять запросы к данным, хранящимся в различных источниках.
- **Spark Streaming**: Модуль для обработки потоковых данных в реальном времени.
- **MLlib**: Библиотека машинного обучения, предоставляющая алгоритмы для анализа данных.
- **GraphX**: API для работы с графами и выполнения графовых вычислений.

## Apache Airflow

Apache Airflow — это платформа для создания, планирования и мониторинга рабочих процессов. Airflow позволяет автоматизировать процессы обработки данных, создавая сложные пайплайны, которые могут выполняться по расписанию или в зависимости от событий.

### Основные компоненты Apache Airflow:
- **DAG (Directed Acyclic Graph)**: Основная структура, описывающая последовательность задач и их зависимости.
- **Operators**: Компоненты, которые выполняют конкретные задачи, такие как запуск скриптов, перемещение данных и т.д.
- **Scheduler**: Компонент, отвечающий за планирование и выполнение задач в соответствии с DAG.
- **Web UI**: Веб-интерфейс для мониторинга и управления рабочими процессами.

# Практическая часть: Установка и настройка окружения

В этом разделе мы установим и настроим окружение для работы с большими данными, используя популярные фреймворки и библиотеки. Мы будем работать в Google Colab, что позволяет нам использовать мощные вычислительные ресурсы без необходимости установки программного обеспечения на локальный компьютер.

## Установка Trino

Trino — это распределенная база, который позволяет выполнять аналитические запросы на больших объемах данных.

In [None]:
!pip install trino -q

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/53.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.8/53.8 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
!curl -s http://185.177.219.168:8085/v1/info # Проверка доступности сервера Trino

{"nodeVersion":{"version":"467"},"environment":"docker","coordinator":true,"starting":false,"uptime":"21.04m"}


In [None]:
from trino.dbapi import connect
import time

host="185.177.219.168"
port=8085
print(f"Посмотреть GUI http://{host}:{port}/ui")
conn = connect(
    host=host,
    port=port,
    user="user",
    catalog="1",
    schema="public",
    timezone='Europe/Moscow',
    legacy_primitive_types=True,
)
cur = conn.cursor()
cur.execute("SELECT * FROM system.runtime.nodes")
rows = cur.fetchall()
print(rows)

Посмотреть GUI http://185.177.219.168:8085/ui
[['inside_trino', 'http://172.18.0.2:8080', '467', True, 'active']]


In [None]:
cur = conn.cursor()
cur.execute("SELECT DATE '-2001-08-22'")
rows = cur.fetchall()
print(rows)
assert rows[0][0] == "-2001-08-22"
assert cur.description[0][1] == "date"

[['-2001-08-22']]


In [None]:
import asyncio
import os
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%d.%m.%Y %H:%M:%S')

async def hard_job():
   while True:
        cur.execute("""
            SELECT COUNT(*) FROM system.runtime.nodes
            UNION ALL
            """+" SELECT COUNT(*) FROM system.runtime.nodes")
        rows = cur.fetchone()

        cur.execute("""SELECT NOW()""")
        rows = cur.fetchone()

if __name__ == "__main__":
  start = time.time()
  try:
      loop = asyncio.get_running_loop()
  except RuntimeError:
      loop = None
  if loop and loop.is_running():
      print('Async event loop already running. Adding coroutine to the event loop.')
      tsk = loop.create_task(hard_job())
      tsk.add_done_callback(lambda t: logging.info(f'Task done with result={t.result()}  << return val of main()'))
  else:
      logging.info('Starting new event loop')
      result = asyncio.run(hard_job())
  print(time.time()-start)

Async event loop already running. Adding coroutine to the event loop.
0.0028867721557617188



## Установка Apache Spark

In [None]:
%%capture
!pip install pyspark -q # Устанавливаем PySpark, интерфейс для работы со Spark на Python
!wget https://drive.google.com/uc?id=11V38AUYhTokBYaRf9Glr0isuqrSeuzqx -O /content/EURUSD_Candlestick_1_Hour_BID_01.07.2020-15.07.2023.csv

In [None]:
from pyspark.sql import SparkSession # Импортируем SparkSession из PySpark

# Создаем SparkSession
spark = SparkSession.builder \
    .appName("FinanceDataProcessing") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Указываем путь к нашему датасету
s3_path = "/content/EURUSD_Candlestick_1_Hour_BID_01.07.2020-15.07.2023.csv"
# Загружаем датасет в DataFrame
sdf = spark.read.csv(s3_path, header=True, inferSchema=True)

# Выводим схему загруженного DataFrame
sdf.printSchema()

root
 |-- Gmt time: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)



In [None]:
sdf.select("Open", "High", "Low", "Close", "Volume").describe().show()

+-------+-------------------+-------------------+-------------------+-------------------+--------------------+
|summary|               Open|               High|                Low|              Close|              Volume|
+-------+-------------------+-------------------+-------------------+-------------------+--------------------+
|  count|              17768|              17768|              17768|              17768|               17768|
|   mean| 1.1246986346240468| 1.1254486104232424| 1.1239660214993197| 1.1247025866726719|  11686.864068479525|
| stddev|0.06896329376638866|0.06882445176700097|0.06908021172626208|0.06895785696887373|   16224.18962253226|
|    min|             0.9539|            0.95592|            0.95357|             0.9539|2.293899999999999...|
|    max|            1.23398|            1.23494|            1.23334|            1.23401|         688879.8125|
+-------+-------------------+-------------------+-------------------+-------------------+--------------------+



In [None]:
# создадим гибрид pandas-on-Spark используя Spark DataFrame.
psdf = sdf.pandas_api()
psdf.describe()

Unnamed: 0,Open,High,Low,Close,Volume
count,17768.0,17768.0,17768.0,17768.0,17768.0
mean,1.124699,1.125449,1.123966,1.124703,11686.864068
std,0.068963,0.068824,0.06908,0.068958,16224.189623
min,0.9539,0.95592,0.95357,0.9539,0.000229
25%,1.07148,1.07219,1.07076,1.07146,2112.9299
50%,1.13237,1.133,1.13175,1.13242,6096.8398
75%,1.18362,1.18431,1.18303,1.18362,14595.0801
max,1.23398,1.23494,1.23334,1.23401,688879.8125


In [None]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
import os
import warnings
warnings.filterwarnings("ignore")
os.environ["PYARROW_IGNORE_TIMEZONE"]="false"
pdf = pd.read_csv(s3_path)
pdf.drop(["Gmt time","Volume"],axis=1)

prev = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")
ps.set_option("compute.default_index_type", "distributed")
print("формат Arrow оптимизирует хранение данных в памяти, поддерживает быструю передачу данных по сети.")
print("выключить Arrow")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False)
%timeit ps.range(300000).to_pandas()
print("включить Arrow")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
%timeit ps.range(300000).to_pandas()
ps.reset_option("compute.default_index_type")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", prev)

формат Arrow оптимизирует хранение данных в памяти, поддерживает быструю передачу данных по сети.
выключить Arrow
2.16 s ± 559 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
включить Arrow
271 ms ± 30.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


ERROR:asyncio:Exception in callback <lambda>(<Task finishe... by peer'))")>) at <ipython-input-19-9d2666b1c2bf>:27
handle: <Handle <lambda>(<Task finishe... by peer'))")>) at <ipython-input-19-9d2666b1c2bf>:27>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/urllib3/connectionpool.py", line 789, in urlopen
    response = self._make_request(
  File "/usr/local/lib/python3.10/dist-packages/urllib3/connectionpool.py", line 536, in _make_request
    response = conn.getresponse()
  File "/usr/local/lib/python3.10/dist-packages/urllib3/connection.py", line 507, in getresponse
    httplib_response = super().getresponse()
  File "/usr/lib/python3.10/http/client.py", line 1375, in getresponse
    response.begin()
  File "/usr/lib/python3.10/http/client.py", line 318, in begin
    version, status, reason = self._read_status()
  File "/usr/lib/python3.10/http/client.py", line 279, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  Fil

In [None]:
spark = SparkSession.builder.master("local[*]").appName("BigDataFinance").getOrCreate() # Создаем локальную сессию
spark # Выводим информацию о созданной сессии

In [None]:
print(psdf.head())
def add(data):return data[2] / data[3]  #вычисляем индикатор High/Low
# psdf["индикатор"] = psdf.apply(add,axis=1) не сработает
new_column = psdf.apply(add,axis=1)
print(new_column.head())

                  Gmt time     Open     High      Low    Close     Volume
0  01.07.2020 00:00:00.000  1.12336  1.12336  1.12275  1.12306  4148.0298
1  01.07.2020 01:00:00.000  1.12306  1.12395  1.12288  1.12385  5375.5801
2  01.07.2020 02:00:00.000  1.12386  1.12406  1.12363  1.12382  4131.6099
3  01.07.2020 03:00:00.000  1.12382  1.12388  1.12221  1.12265  4440.6001
4  01.07.2020 04:00:00.000  1.12265  1.12272  1.12151  1.12179  4833.1001
0    1.000543
1    1.000953
2    1.000383
3    1.001488
4    1.001079
dtype: float64


Подитожим преимущества использования датафрейма PySpark по сравнению с Pandas:

<table>
<thead>
<tr>
<th><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrame</font></font></th>
<th><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrame</font></font></th>
</tr>
</thead>
<tbody>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrame поддерживает распараллеливание.&nbsp;</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrame не поддерживает распараллеливание.&nbsp;</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrame имеет несколько узлов.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrame имеет один узел.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;" class="">Он следует принципу ленивого выполнения - это означает,<br> что задача не выполняется до тех пор, пока не будет выполнено действие.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Он следует принципу Eager Execution, что означает, что задача выполняется немедленно.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrame является неизменяемым.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrame является изменяемым.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Сложные операции сложнее выполнять по сравнению с Pandas DataFrame.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Сложные операции выполнять проще по сравнению со Spark DataFrame.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrame является распределенным, <br>поэтому обработка в Spark DataFrame больших объемов данных происходит быстрее.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrame не является распределенным, <br>поэтому обработка в Pandas DataFrame больших объемов данных будет выполняться медленнее.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">sparkDataFrame.count() возвращает количество строк.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">pandasDataFrame.count() возвращает количество наблюдений, отличных от NA/null, для каждого столбца.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrames отлично подходят для создания масштабируемых приложений.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrames нельзя использовать для создания масштабируемого приложения.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrame обеспечивает отказоустойчивость.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrame не гарантирует отказоустойчивость. <br>Для ее обеспечения нам необходимо реализовать собственную структуру.</font></font></td>
</tr>
</tbody>
</table>

В этом разделе мы рассмотрели, как загружать и предварительно обрабатывать большие финансовые датасеты с использованием PySpark, а также как использовать Trino для выполнения распределенных SQL-запросов. Эти инструменты позволяют эффективно работать с большими объемами данных и извлекать из них полезную информацию.

## Установка Apache Airflow

Apache Airflow — это платформа для создания, планирования и мониторинга рабочих процессов. Мы установим Airflow с помощью pip и настроим его для работы в нашем окружении.

In [None]:
# Загрузка и запуск контейнера с сервером Apache Airflow
#mkdir -p ./dags ./logs ./plugins
#curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.0/docker-compose.yaml'
#echo -e "AIRFLOW_UID=$(id -u)" > .env
#docker compose up
# Airflow будет доступен по адресу http://localhost:8080

In [None]:
%%capture
# Установка Apache Airflow
!pip install 'apache-airflow==2.10.1' \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.1/constraints-3.8.txt"

In [None]:
!curl -s http://185.177.219.168:8080/health  # Используем curl для проверки доступности Airflow и выводим информацию о его состоянии

### Создание простого пайплайна в Airflow

Теперь, когда Airflow запущен, мы можем создать простой пайплайн. Пайплайн будет состоять из трех задач: загрузка данных, обработка данных и сохранение результатов.


### Мониторинг и управление процессами в Airflow

Airflow предоставляет удобный веб-интерфейс для мониторинга и управления процессами. Вы можете просматривать состояние задач, логи выполнения и управлять расписанием выполнения DAG.

### Доступ к веб-интерфейсу Airflow

Откройте браузер и перейдите по адресу [http://localhost:8080](http://localhost:8080). Вы увидите веб-интерфейс Airflow, где сможете управлять созданным пайплайном.

### Управление задачами

В веб-интерфейсе вы можете:
- Запускать и останавливать DAG
- Просматривать состояние задач
- Просматривать логи выполнения задач
- Изменять расписание выполнения DAG


### Заключение

В этом разделе мы рассмотрели, как использовать Apache Airflow для автоматизации процессов обработки данных. Мы создали простой пайплайн, состоящий из трех задач, и научились управлять процессами через веб-интерфейс Airflow. Это лишь базовое введение в возможности Airflow, и вы можете расширять и усложнять пайплайны в зависимости от ваших потребностей.

### Установка Apache Ignite

**Одиночный сервер**

```
sudo docker pull apacheignite/ignite
sudo docker run -d apacheignite/ignite
#sudo docker run -p 10800:10800 -p 8080:8080 apacheignite/ignite

```

**Кластер**

docker-compose.yml

```
services:
  ignite_0:
    image: apacheignite/ignite:latest
    ports:
      - 10800:10800
    restart: always

  ignite_1:
    image: apacheignite/ignite:latest
    ports:
      - 10801:10800
    restart: always

  ignite_2:
    image: apacheignite/ignite:latest
    ports:
      - 10802:10800
    restart: always
```

In [None]:
!pip install pyignite -q

In [None]:
from pyignite import Client

client = Client( use_ssl=False)
with client.connect('185.177.219.168', 10800):
    #my_cache = client.create_cache(cache_config) # так сломается, потому что уже один раз создали
    my_cache = client.get_or_create_cache('new_cache')
    my_cache.put('my key', 42)

    result = my_cache.get('my key')
    print(result)  # 42

    result = my_cache.get('non-existent key')
    print(result)  # None

    result = my_cache.get_all([
        'my key',
        'non-existent key',
        'other-key',
    ])
    print(result)  # {'my key': 42}

    my_cache.clear_key('my key')

    my_cache.destroy()

42
None
{'my key': 42}


In [None]:
from pprint import pprint

from pyignite import Client

client = Client()
with client.connect('185.177.219.168', 10800):
    my_cache = client.create_cache('my cache')
    my_cache.put_all({'key_{}'.format(v): v for v in range(20)})


    with my_cache.scan() as cursor:
        for k, v in cursor:
            print(k, v)

    with my_cache.scan() as cursor:
        pprint(dict(cursor))

    my_cache.destroy()

key_3 3
key_2 2
key_1 1
key_0 0
key_7 7
key_6 6
key_5 5
key_4 4
key_9 9
key_8 8
key_19 19
key_17 17
key_18 18
key_15 15
key_16 16
key_13 13
key_14 14
key_11 11
key_12 12
key_10 10
{'key_0': 0,
 'key_1': 1,
 'key_10': 10,
 'key_11': 11,
 'key_12': 12,
 'key_13': 13,
 'key_14': 14,
 'key_15': 15,
 'key_16': 16,
 'key_17': 17,
 'key_18': 18,
 'key_19': 19,
 'key_2': 2,
 'key_3': 3,
 'key_4': 4,
 'key_5': 5,
 'key_6': 6,
 'key_7': 7,
 'key_8': 8,
 'key_9': 9}


<img src="https://ignite.apache.org/img/usecases/high-peformance/perf-pic.svg">

In [None]:
from decimal import Decimal
from enum import Enum


class TableNames(Enum):
    COUNTRY_TABLE_NAME = 'Country'
    CITY_TABLE_NAME = 'City'
    LANGUAGE_TABLE_NAME = 'CountryLanguage'


class Query:
    COUNTRY_CREATE_TABLE = '''CREATE TABLE Country (
        Code CHAR(3) PRIMARY KEY,
        Name CHAR(52),
        Continent CHAR(50),
        Region CHAR(26),
        SurfaceArea DECIMAL(10,2),
        IndepYear SMALLINT(6),
        Population INT(11),
        LifeExpectancy DECIMAL(3,1),
        GNP DECIMAL(10,2),
        GNPOld DECIMAL(10,2),
        LocalName CHAR(45),
        GovernmentForm CHAR(45),
        HeadOfState CHAR(60),
        Capital INT(11),
        Code2 CHAR(2)
    )'''

    COUNTRY_INSERT = '''INSERT INTO Country(
        Code, Name, Continent, Region,
        SurfaceArea, IndepYear, Population,
        LifeExpectancy, GNP, GNPOld,
        LocalName, GovernmentForm, HeadOfState,
        Capital, Code2
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'''

    CITY_CREATE_TABLE = '''CREATE TABLE City (
        ID INT(11),
        Name CHAR(35),
        CountryCode CHAR(3),
        District CHAR(20),
        Population INT(11),
        PRIMARY KEY (ID, CountryCode)
    ) WITH "affinityKey=CountryCode"'''

    CITY_CREATE_INDEX = 'CREATE INDEX idx_country_code ON city (CountryCode)'

    CITY_INSERT = '''INSERT INTO City(
        ID, Name, CountryCode, District, Population
    ) VALUES (?, ?, ?, ?, ?)'''

    LANGUAGE_CREATE_TABLE = '''CREATE TABLE CountryLanguage (
        CountryCode CHAR(3),
        Language CHAR(30),
        IsOfficial BOOLEAN,
        Percentage DECIMAL(4,1),
        PRIMARY KEY (CountryCode, Language)
    ) WITH "affinityKey=CountryCode"'''

    LANGUAGE_CREATE_INDEX = 'CREATE INDEX idx_lang_country_code ON CountryLanguage (CountryCode)'

    LANGUAGE_INSERT = '''INSERT INTO CountryLanguage(
        CountryCode, Language, IsOfficial, Percentage
    ) VALUES (?, ?, ?, ?)'''

    DROP_TABLE = 'DROP TABLE {} IF EXISTS'


class TestData:
    COUNTRY = [
        [
            'USA', 'United States', 'North America', 'North America',
            Decimal('9363520.00'), 1776, 278357000,
            Decimal('77.1'), Decimal('8510700.00'), Decimal('8110900.00'),
            'United States', 'Federal Republic', 'George W. Bush',
            3813, 'US',
        ],
        [
            'IND', 'India', 'Asia', 'Southern and Central Asia',
            Decimal('3287263.00'), 1947, 1013662000,
            Decimal('62.5'), Decimal('447114.00'), Decimal('430572.00'),
            'Bharat/India', 'Federal Republic', 'Kocheril Raman Narayanan',
            1109, 'IN',
        ],
        [
            'CHN', 'China', 'Asia', 'Eastern Asia',
            Decimal('9572900.00'), -1523, 1277558000,
            Decimal('71.4'), Decimal('982268.00'), Decimal('917719.00'),
            'Zhongquo', 'PeoplesRepublic', 'Jiang Zemin',
            1891, 'CN',
        ],
    ]

    CITY = [
        [3793, 'New York', 'USA', 'New York', 8008278],
        [3794, 'Los Angeles', 'USA', 'California', 3694820],
        [3795, 'Chicago', 'USA', 'Illinois', 2896016],
        [3796, 'Houston', 'USA', 'Texas', 1953631],
        [3797, 'Philadelphia', 'USA', 'Pennsylvania', 1517550],
        [3798, 'Phoenix', 'USA', 'Arizona', 1321045],
        [3799, 'San Diego', 'USA', 'California', 1223400],
        [3800, 'Dallas', 'USA', 'Texas', 1188580],
        [3801, 'San Antonio', 'USA', 'Texas', 1144646],
        [3802, 'Detroit', 'USA', 'Michigan', 951270],
        [3803, 'San Jose', 'USA', 'California', 894943],
        [3804, 'Indianapolis', 'USA', 'Indiana', 791926],
        [3805, 'San Francisco', 'USA', 'California', 776733],
        [1024, 'Mumbai (Bombay)', 'IND', 'Maharashtra', 10500000],
        [1025, 'Delhi', 'IND', 'Delhi', 7206704],
        [1026, 'Calcutta [Kolkata]', 'IND', 'West Bengali', 4399819],
        [1027, 'Chennai (Madras)', 'IND', 'Tamil Nadu', 3841396],
        [1028, 'Hyderabad', 'IND', 'Andhra Pradesh', 2964638],
        [1029, 'Ahmedabad', 'IND', 'Gujarat', 2876710],
        [1030, 'Bangalore', 'IND', 'Karnataka', 2660088],
        [1031, 'Kanpur', 'IND', 'Uttar Pradesh', 1874409],
        [1032, 'Nagpur', 'IND', 'Maharashtra', 1624752],
        [1033, 'Lucknow', 'IND', 'Uttar Pradesh', 1619115],
        [1034, 'Pune', 'IND', 'Maharashtra', 1566651],
        [1035, 'Surat', 'IND', 'Gujarat', 1498817],
        [1036, 'Jaipur', 'IND', 'Rajasthan', 1458483],
        [1890, 'Shanghai', 'CHN', 'Shanghai', 9696300],
        [1891, 'Peking', 'CHN', 'Peking', 7472000],
        [1892, 'Chongqing', 'CHN', 'Chongqing', 6351600],
        [1893, 'Tianjin', 'CHN', 'Tianjin', 5286800],
        [1894, 'Wuhan', 'CHN', 'Hubei', 4344600],
        [1895, 'Harbin', 'CHN', 'Heilongjiang', 4289800],
        [1896, 'Shenyang', 'CHN', 'Liaoning', 4265200],
        [1897, 'Kanton [Guangzhou]', 'CHN', 'Guangdong', 4256300],
        [1898, 'Chengdu', 'CHN', 'Sichuan', 3361500],
        [1899, 'Nanking [Nanjing]', 'CHN', 'Jiangsu', 2870300],
        [1900, 'Changchun', 'CHN', 'Jilin', 2812000],
        [1901, 'Xi´an', 'CHN', 'Shaanxi', 2761400],
        [1902, 'Dalian', 'CHN', 'Liaoning', 2697000],
        [1903, 'Qingdao', 'CHN', 'Shandong', 2596000],
        [1904, 'Jinan', 'CHN', 'Shandong', 2278100],
        [1905, 'Hangzhou', 'CHN', 'Zhejiang', 2190500],
        [1906, 'Zhengzhou', 'CHN', 'Henan', 2107200],
    ]

    LANGUAGE = [
        ['USA', 'Chinese', False, Decimal('0.6')],
        ['USA', 'English', True, Decimal('86.2')],
        ['USA', 'French', False, Decimal('0.7')],
        ['USA', 'German', False, Decimal('0.7')],
        ['USA', 'Italian', False, Decimal('0.6')],
        ['USA', 'Japanese', False, Decimal('0.2')],
        ['USA', 'Korean', False, Decimal('0.3')],
        ['USA', 'Polish', False, Decimal('0.3')],
        ['USA', 'Portuguese', False, Decimal('0.2')],
        ['USA', 'Spanish', False, Decimal('7.5')],
        ['USA', 'Tagalog', False, Decimal('0.4')],
        ['USA', 'Vietnamese', False, Decimal('0.2')],
        ['IND', 'Asami', False, Decimal('1.5')],
        ['IND', 'Bengali', False, Decimal('8.2')],
        ['IND', 'Gujarati', False, Decimal('4.8')],
        ['IND', 'Hindi', True, Decimal('39.9')],
        ['IND', 'Kannada', False, Decimal('3.9')],
        ['IND', 'Malajalam', False, Decimal('3.6')],
        ['IND', 'Marathi', False, Decimal('7.4')],
        ['IND', 'Orija', False, Decimal('3.3')],
        ['IND', 'Punjabi', False, Decimal('2.8')],
        ['IND', 'Tamil', False, Decimal('6.3')],
        ['IND', 'Telugu', False, Decimal('7.8')],
        ['IND', 'Urdu', False, Decimal('5.1')],
        ['CHN', 'Chinese', True, Decimal('92.0')],
        ['CHN', 'Dong', False, Decimal('0.2')],
        ['CHN', 'Hui', False, Decimal('0.8')],
        ['CHN', 'Mantšu', False, Decimal('0.9')],
        ['CHN', 'Miao', False, Decimal('0.7')],
        ['CHN', 'Mongolian', False, Decimal('0.4')],
        ['CHN', 'Puyi', False, Decimal('0.2')],
        ['CHN', 'Tibetan', False, Decimal('0.4')],
        ['CHN', 'Tujia', False, Decimal('0.5')],
        ['CHN', 'Uighur', False, Decimal('0.6')],
        ['CHN', 'Yi', False, Decimal('0.6')],
        ['CHN', 'Zhuang', False, Decimal('1.4')],
    ]

from pyignite import Client

# establish connection
client = Client()
with client.connect('185.177.219.168', 10800):
    # create tables
    for query in [
        Query.COUNTRY_CREATE_TABLE,
        Query.CITY_CREATE_TABLE,
        Query.LANGUAGE_CREATE_TABLE,
    ]:
        client.sql(query)

    # create indices
    for query in [Query.CITY_CREATE_INDEX, Query.LANGUAGE_CREATE_INDEX]:
        client.sql(query)

    # load data
    for row in TestData.COUNTRY:
        client.sql(Query.COUNTRY_INSERT, query_args=row)

    for row in TestData.CITY:
        client.sql(Query.CITY_INSERT, query_args=row)

    for row in TestData.LANGUAGE:
        client.sql(Query.LANGUAGE_INSERT, query_args=row)

    # 10 most populated cities (with pagination)
    with client.sql('SELECT name, population FROM City ORDER BY population DESC LIMIT 10') as cursor:
        print('Most 10 populated cities:')
        for row in cursor:
            print(row)
    # Most 10 populated cities:
    # ['Mumbai (Bombay)', 10500000]
    # ['Shanghai', 9696300]
    # ['New York', 8008278]
    # ['Peking', 7472000]
    # ['Delhi', 7206704]
    # ['Chongqing', 6351600]
    # ['Tianjin', 5286800]
    # ['Calcutta [Kolkata]', 4399819]
    # ['Wuhan', 4344600]
    # ['Harbin', 4289800]
    print('-' * 20)
    # 10 most populated cities in 3 countries (with pagination and header row)
    MOST_POPULATED_IN_3_COUNTRIES = '''
    SELECT country.name as country_name, city.name as city_name, MAX(city.population) AS max_pop FROM country
        JOIN city ON city.countrycode = country.code
        WHERE country.code IN ('USA','IND','CHN')
        GROUP BY country.name, city.name ORDER BY max_pop DESC LIMIT 10
    '''

    with client.sql(MOST_POPULATED_IN_3_COUNTRIES, include_field_names=True) as cursor:
        print('Most 10 populated cities in USA, India and China:')
        table_str_pattern = '{:15}\t| {:20}\t| {}'
        print(table_str_pattern.format(*next(cursor)))
        print('*' * 50)
        for row in cursor:
            print(table_str_pattern.format(*row))
    # Most 10 populated cities in USA, India and China:
    # COUNTRY_NAME   	| CITY_NAME           	| MAX_POP
    # **************************************************
    # India          	| Mumbai (Bombay)     	| 10500000
    # China          	| Shanghai            	| 9696300
    # United States  	| New York            	| 8008278
    # China          	| Peking              	| 7472000
    # India          	| Delhi               	| 7206704
    # China          	| Chongqing           	| 6351600
    # China          	| Tianjin             	| 5286800
    # India          	| Calcutta [Kolkata]  	| 4399819
    # China          	| Wuhan               	| 4344600
    # China          	| Harbin              	| 4289800
    print('-' * 20)

    # Show city info
    with client.sql('SELECT * FROM City WHERE id = ?', query_args=[3802], include_field_names=True) as cursor:
        field_names = next(cursor)
        field = list(*cursor)
        print('City info:')
        for field_name, field_value in zip(field_names * len(field), field):
            print(f'{field_name}: {field_value}')
    # City info:
    # ID: 3802
    # NAME: Detroit
    # COUNTRYCODE: USA
    # DISTRICT: Michigan
    # POPULATION: 951270

    # Clean up
    for table_name in TableNames:
        result = client.sql(Query.DROP_TABLE.format(table_name.value))

Most 10 populated cities:
['Mumbai (Bombay)', 10500000]
['Shanghai', 9696300]
['New York', 8008278]
['Peking', 7472000]
['Delhi', 7206704]
['Chongqing', 6351600]
['Tianjin', 5286800]
['Calcutta [Kolkata]', 4399819]
['Wuhan', 4344600]
['Harbin', 4289800]
--------------------
Most 10 populated cities in USA, India and China:
COUNTRY_NAME   	| CITY_NAME           	| MAX_POP
**************************************************
India          	| Mumbai (Bombay)     	| 10500000
China          	| Shanghai            	| 9696300
United States  	| New York            	| 8008278
China          	| Peking              	| 7472000
India          	| Delhi               	| 7206704
China          	| Chongqing           	| 6351600
China          	| Tianjin             	| 5286800
India          	| Calcutta [Kolkata]  	| 4399819
China          	| Wuhan               	| 4344600
China          	| Harbin              	| 4289800
--------------------
City info:
ID: 3802
NAME: Detroit
COUNTRYCODE: USA
DISTRICT: Mi

In [None]:
import pyignite
from pyignite.exceptions import SQLError

from time import time
from uuid import uuid4
from datetime import datetime
from random import randint, uniform

session = pyignite.Client(handshake_timeout=20.0)
session.connect('185.177.219.168', 10800)

create_table="""CREATE TABLE IF NOT EXISTS public.test(
	trans_id varchar(255) not NULL PRIMARY key,
	amount int not null,
	decimal_amount decimal(19, 3) not null,
	client varchar(255) not null,
	operation_type varchar(10) not null,
	ex_text_field varchar not null,
	transaction_ts timestamp not null,
	transaction_dt date not null
);"""

session.sql(create_table)
LIMIT = 1_000#_000
def gen_test_data():
    _test_data = []
    for index in range(1, LIMIT+1):
        data = {
            "trans_id": f"C{uuid4().hex.upper()}",
            "amount": randint(100, 200),
            "decimal_amount": uniform(0.00001, 25.123),
            "client": f"FFM{randint(115, 720)}",
            "operation_type": "C2C",
            "ex_text_field": "pyignite.exceptions",
            "transaction_ts": datetime.now(),
            "transaction_dt": datetime.now().strftime("%Y-%m-%d"),
        }
        _test_data.append(list(data.values()))
    return _test_data



def insert_(data: tuple, table_name: str):
    columns = ",".join([
        "trans_id", "amount", "decimal_amount", "client", "operation_type", "ex_text_field", "transaction_ts", "transaction_dt"
        ])
    cmd = f"insert into {table_name}({columns}) values (?, ?, ?, ?, ?, ?, ?, ?)"
    try:
        session.sql(cmd, query_args=data)
    except SQLError:
        pass

def speedtest(data: list):
    session.sql("delete from public.test;")

    start = time()
    for row in data:
        #print(row)
        insert_(row, "public.test")

    finish = (time() - start)+1e-10
    print(f"Прошло времени, ss: {round(finish, 3)} // Кол-во {len(data)} // RPS: {round(len(data) / round(finish))}")


speedtest(gen_test_data())

Прошло времени, ss: 222.1 // Кол-во 1000 // RPS: 5


# Заключение

Инструменты больших данных позволяют эффективно управлять большими объемами данных и обеспечивать высокую производительность обработки.