**Работа с большими объемами данных**

### Малые данные

Малые данные представляют собой ограниченные наборы данных, которые могут быть легко обработаны и проанализированы с использованием традиционных инструментов и методов. Они часто структурированы и могут быть представлены в виде таблиц или баз данных. Примеры малых данных включают:
- Финансовые отчеты компаний
- Ежедневные транзакции
- Исторические данные по акциям

### Большие данные

Большие данные, с другой стороны, характеризуются объемом, разнообразием и скоростью. Они требуют использования специализированных технологий и фреймворков для обработки и анализа. Большие данные могут быть как структурированными, так и неструктурированными, и включают:
- Потоки данных с финансовых рынков в реальном времени
- Социальные медиа и новостные ленты
- Данные о поведении клиентов и транзакциях

### Основные различия

1. **Объем**: Малые данные обычно имеют ограниченный объем, в то время как большие данные могут достигать терабайтов и петабайтов.
2. **Скорость**: Большие данные часто поступают в реальном времени, требуя мгновенной обработки, в отличие от малых данных, которые могут быть обработаны в пакетном режиме.
3. **Разнообразие**: Большие данные могут включать текст, изображения, видео и другие форматы, тогда как малые данные обычно структурированы.


# Управление данными и кэширование в финансовых приложениях

В этом разделе мы рассмотрим, как использовать **Redis** для управления состоянием и кэшированием данных в финансовых приложениях. Эти [инструменты](https://db-engines.com/en/ranking) позволяют эффективно обрабатывать большие объемы данных, обеспечивая высокую производительность и надежность систем.

In [None]:
%%capture
!apt-get update
!apt-get install -y redis-server
!redis-server --daemonize yes # Запуск Redis в фоновом режиме
!redis-server --version

## Пример использования Redis для кэширования данных

В этом примере мы создадим простое приложение на Python, которое будет использовать Redis для кэширования результатов вычислений. Это позволит ускорить доступ к часто запрашиваемым данным.

### Преимущества использования Redis в финансовых приложениях:
- **Высокая скорость**: Данные хранятся в оперативной памяти, что обеспечивает быстрый доступ.
- **Поддержка различных структур данных**: Позволяет хранить и обрабатывать данные в удобной форме.
- **Масштабируемость**: Легко масштабируется для обработки больших объемов данных.
- **Надежность**: Поддерживает механизмы репликации и персистентности данных.

In [None]:
%%capture
!pip install redis
import redis

r = redis.Redis(host='localhost', port=6379, db=0) # Создаем объект подключения к Redis

In [None]:
redis.__version__

'6.2.0'

In [None]:
import pandas as pd
import numpy as np
import timeit

from math import sin

In [None]:
# Функция для вычисления и кэширования данных
def expensive_computation(x):
    # Проверяем, есть ли результат в кэше
    if r.exists(f'computation:{x[0]}'):
        #print('есть результат в кэше !')
        return r.get(f'computation:{x[0]}')
    else:
        #print('нет в кеше, делаем вычисления')
        result = float([sin(i**2) ** 2 for i in x][0])  # Пример сложных вычислений
        r.set(f'computation:{x[0]}', result) # Сохраняем результат в кэше
        return result

print(f"-кеширование, 1-ый запуск")
%timeit -n1 -r1 expensive_computation([5])
print(f"+кеширование, 2-ый запуск")
%timeit expensive_computation([5])

-кеширование, 1-ый запуск
1.92 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
+кеширование, 2-ый запуск
230 µs ± 10.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


## Apache Spark

Apache Spark — это распределенная вычислительная платформа с открытым исходным кодом, которая позволяет обрабатывать большие объемы данных с высокой скоростью. Spark поддерживает различные языки программирования, включая Python, Java и Scala, и предоставляет API для работы с данными в режиме реального времени и пакетной обработки.

### Основные компоненты Apache Spark:
- **Spark Core**: Основной компонент, отвечающий за распределенные вычисления и управление ресурсами.
- **Spark SQL**: Модуль для работы с данными в формате SQL, который позволяет выполнять запросы к данным, хранящимся в различных источниках.
- **Spark Streaming**: Модуль для обработки потоковых данных в реальном времени.
- **MLlib**: Библиотека машинного обучения, предоставляющая алгоритмы для анализа данных.
- **GraphX**: API для работы с графами и выполнения графовых вычислений.

In [None]:
! RESTART SESSION

In [None]:
!pip freeze > freeze.txt

In [None]:
%%capture
!pip install pyspark # Устанавливаем PySpark, интерфейс для работы со Spark на Python
!wget https://drive.google.com/uc?id=11V38AUYhTokBYaRf9Glr0isuqrSeuzqx -O /content/EURUSD_Candlestick_1_Hour_BID_01.07.2020-15.07.2023.csv

In [None]:
from pyspark.sql import SparkSession # Импортируем SparkSession из PySpark

# Создаем SparkSession
spark = SparkSession.builder \
    .appName("FinanceDataProcessing") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Указываем путь к нашему датасету
s3_path = "/content/EURUSD_Candlestick_1_Hour_BID_01.07.2020-15.07.2023.csv"
# Загружаем датасет в DataFrame
sdf = spark.read.csv(s3_path, header=True, inferSchema=True)

# Выводим схему загруженного DataFrame
sdf.printSchema()

root
 |-- Gmt time: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)



In [None]:
sdf.select("Open", "High", "Low", "Close", "Volume").describe().show()

+-------+-------------------+-------------------+-------------------+-------------------+--------------------+
|summary|               Open|               High|                Low|              Close|              Volume|
+-------+-------------------+-------------------+-------------------+-------------------+--------------------+
|  count|              17768|              17768|              17768|              17768|               17768|
|   mean| 1.1246986346240468| 1.1254486104232424| 1.1239660214993197| 1.1247025866726719|  11686.864068479525|
| stddev|0.06896329376638866|0.06882445176700097|0.06908021172626208|0.06895785696887373|   16224.18962253226|
|    min|             0.9539|            0.95592|            0.95357|             0.9539|2.293899999999999...|
|    max|            1.23398|            1.23494|            1.23334|            1.23401|         688879.8125|
+-------+-------------------+-------------------+-------------------+-------------------+--------------------+



In [None]:
# создадим гибрид pandas-on-Spark используя Spark DataFrame.
psdf = sdf.pandas_api()
psdf.describe()

In [None]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
import os
import warnings
warnings.filterwarnings("ignore")
os.environ["PYARROW_IGNORE_TIMEZONE"]="false"
pdf = pd.read_csv(s3_path)
pdf.drop(["Gmt time","Volume"],axis=1)

prev = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")
ps.set_option("compute.default_index_type", "distributed")
print("формат Arrow оптимизирует хранение данных в памяти, поддерживает быструю передачу данных по сети.")
print("выключить Arrow")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False)
%timeit ps.range(300000).to_pandas()
print("включить Arrow")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
%timeit ps.range(300000).to_pandas()
ps.reset_option("compute.default_index_type")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", prev)



формат Arrow оптимизирует хранение данных в памяти, поддерживает быструю передачу данных по сети.
выключить Arrow
2.39 s ± 584 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
включить Arrow
324 ms ± 36.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
spark = SparkSession.builder.master("local[*]").appName("BigDataFinance").getOrCreate() # Создаем локальную сессию
spark # Выводим информацию о созданной сессии

In [None]:
print(psdf.head())
def add(data):return data[2] / data[3]  #вычисляем индикатор High/Low
# psdf["индикатор"] = psdf.apply(add,axis=1) не сработает
new_column = psdf.apply(add,axis=1)
print(new_column.head())

                  Gmt time     Open     High      Low    Close     Volume
0  01.07.2020 00:00:00.000  1.12336  1.12336  1.12275  1.12306  4148.0298
1  01.07.2020 01:00:00.000  1.12306  1.12395  1.12288  1.12385  5375.5801
2  01.07.2020 02:00:00.000  1.12386  1.12406  1.12363  1.12382  4131.6099
3  01.07.2020 03:00:00.000  1.12382  1.12388  1.12221  1.12265  4440.6001
4  01.07.2020 04:00:00.000  1.12265  1.12272  1.12151  1.12179  4833.1001
0    1.000543
1    1.000953
2    1.000383
3    1.001488
4    1.001079
dtype: float64


Подитожим преимущества использования датафрейма PySpark по сравнению с Pandas:

<table>
<thead>
<tr>
<th><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrame</font></font></th>
<th><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrame</font></font></th>
</tr>
</thead>
<tbody>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrame поддерживает распараллеливание.&nbsp;</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrame не поддерживает распараллеливание.&nbsp;</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrame имеет несколько узлов.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrame имеет один узел.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;" class="">Он следует принципу ленивого выполнения - это означает,<br> что задача не выполняется до тех пор, пока не будет выполнено действие.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Он следует принципу Eager Execution, что означает, что задача выполняется немедленно.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrame является неизменяемым.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrame является изменяемым.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Сложные операции сложнее выполнять по сравнению с Pandas DataFrame.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Сложные операции выполнять проще по сравнению со Spark DataFrame.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrame является распределенным, <br>поэтому обработка в Spark DataFrame больших объемов данных происходит быстрее.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrame не является распределенным, <br>поэтому обработка в Pandas DataFrame больших объемов данных будет выполняться медленнее.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">sparkDataFrame.count() возвращает количество строк.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">pandasDataFrame.count() возвращает количество наблюдений, отличных от NA/null, для каждого столбца.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrames отлично подходят для создания масштабируемых приложений.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrames нельзя использовать для создания масштабируемого приложения.</font></font></td>
</tr>
<tr>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Spark DataFrame обеспечивает отказоустойчивость.</font></font></td>
<td><font style="vertical-align: inherit;"><font style="vertical-align: inherit;">Pandas DataFrame не гарантирует отказоустойчивость. <br>Для ее обеспечения нам необходимо реализовать собственную структуру.</font></font></td>
</tr>
</tbody>
</table>

В этом разделе мы рассмотрели, как загружать и предварительно обрабатывать большие финансовые датасеты с использованием PySpark, а также как использовать Trino для выполнения распределенных SQL-запросов. Эти инструменты позволяют эффективно работать с большими объемами данных и извлекать из них полезную информацию.

In [None]:
! RESTART SESSION

https://www.dask.org/

In [None]:
!pip install dask
!pip install dask-ml



In [None]:
import dask.dataframe as dd
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LogisticRegression
from dask.distributed import Client
from sklearn.datasets import load_iris
import pandas as pd
import numpy as np

In [None]:
# Создаем клиент Dask для распределенных вычислений
client = Client()

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:45795
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:40447'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:35121'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:46613 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:46613
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:58456
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:33677 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:33677
INFO:distributed.core:Starting established connection to tcp://127

In [None]:
# Загружаем данные Ирисов Фишера
iris = load_iris()
X = iris.data
y = iris.target

In [None]:
# Преобразуем данные в Pandas DataFrame
df = pd.DataFrame(X, columns=iris.feature_names)
df['target'] = y

In [None]:
# Преобразуем Pandas DataFrame в Dask DataFrame
ddf = dd.from_pandas(df, npartitions=2)

In [None]:
ddf

Unnamed: 0_level_0,sepal length (cm),sepal width (cm),petal length (cm),petal width (cm),target
npartitions=2,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
0,float64,float64,float64,float64,int64
75,...,...,...,...,...
149,...,...,...,...,...


In [None]:
# Разделяем данные на обучающую и тестовую выборки
X_train, X_test, y_train, y_test = train_test_split(ddf[iris.feature_names], ddf['target'], test_size=0.3, random_state=42, shuffle=True)

In [None]:
X_train = X_train.to_dask_array(lengths=True)
y_train = y_train.to_dask_array(lengths=True)
X_test = X_test.to_dask_array(lengths=True)
y_test = y_test.to_dask_array(lengths=True)

In [None]:
# Создаем модель логистической регрессии
model = LogisticRegression()

In [None]:
# Обучаем модель
model.fit(X_train, y_train)

In [None]:
model.decision_function(X_train)

Unnamed: 0,Array,Chunk
Bytes,848 B,456 B
Shape,"(106,)","(57,)"
Dask graph,2 chunks in 6 graph layers,2 chunks in 6 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 848 B 456 B Shape (106,) (57,) Dask graph 2 chunks in 6 graph layers Data type float64 numpy.ndarray",106  1,

Unnamed: 0,Array,Chunk
Bytes,848 B,456 B
Shape,"(106,)","(57,)"
Dask graph,2 chunks in 6 graph layers,2 chunks in 6 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [None]:
model.predict(X_test)

Unnamed: 0,Array,Chunk
Bytes,44 B,26 B
Shape,"(44,)","(26,)"
Dask graph,2 chunks in 17 graph layers,2 chunks in 17 graph layers
Data type,bool numpy.ndarray,bool numpy.ndarray
"Array Chunk Bytes 44 B 26 B Shape (44,) (26,) Dask graph 2 chunks in 17 graph layers Data type bool numpy.ndarray",44  1,

Unnamed: 0,Array,Chunk
Bytes,44 B,26 B
Shape,"(44,)","(26,)"
Dask graph,2 chunks in 17 graph layers,2 chunks in 17 graph layers
Data type,bool numpy.ndarray,bool numpy.ndarray


In [None]:
model.predict_proba(X_test)

Unnamed: 0,Array,Chunk
Bytes,704 B,208 B
Shape,"(44, 2)","(26, 1)"
Dask graph,4 chunks in 15 graph layers,4 chunks in 15 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 704 B 208 B Shape (44, 2) (26, 1) Dask graph 4 chunks in 15 graph layers Data type float64 numpy.ndarray",2  44,

Unnamed: 0,Array,Chunk
Bytes,704 B,208 B
Shape,"(44, 2)","(26, 1)"
Dask graph,4 chunks in 15 graph layers,4 chunks in 15 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [None]:
np.array(model.predict_proba(X_test))

array([[1.00000000e+00, 2.87838990e-50],
       [1.00000000e+00, 2.29781215e-41],
       [1.00000000e+00, 4.02563625e-42],
       [1.00000000e+00, 5.49542915e-47],
       [0.00000000e+00, 1.00000000e+00],
       [1.00000000e+00, 2.55778761e-44],
       [0.00000000e+00, 1.00000000e+00],
       [0.00000000e+00, 1.00000000e+00],
       [4.44089210e-16, 1.00000000e+00],
       [0.00000000e+00, 1.00000000e+00],
       [1.00000000e+00, 2.83081442e-42],
       [0.00000000e+00, 1.00000000e+00],
       [0.00000000e+00, 1.00000000e+00],
       [0.00000000e+00, 1.00000000e+00],
       [1.00000000e+00, 4.43537147e-48],
       [0.00000000e+00, 1.00000000e+00],
       [1.00000000e+00, 2.85876044e-44],
       [1.00000000e+00, 2.09759718e-40],
       [1.00000000e+00, 1.04621010e-36],
       [1.00000000e+00, 2.32829149e-36],
       [1.00000000e+00, 3.75555703e-41],
       [0.00000000e+00, 1.00000000e+00],
       [1.00000000e+00, 6.84089286e-34],
       [1.00000000e+00, 5.53392516e-42],
       [7.771561

In [None]:
model.score(X_test, y_test)

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 21 graph layers,1 chunks in 21 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Dask graph 1 chunks in 21 graph layers Data type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Dask graph,1 chunks in 21 graph layers,1 chunks in 21 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [None]:
np.array(model.score(X_test, y_test))

array(0.65909091)

In [None]:
# Закрываем клиент Dask
client.close()

INFO:distributed.scheduler:Remove client Client-7aef6109-4add-11f0-97bc-0242ac1c000c
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:58466; closing.
INFO:distributed.scheduler:Remove client Client-7aef6109-4add-11f0-97bc-0242ac1c000c
INFO:distributed.scheduler:Close client connection: Client-7aef6109-4add-11f0-97bc-0242ac1c000c
INFO:distributed.scheduler:Retire worker addresses (stimulus_id='retire-workers-1750097710.9056675') (0, 1)
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:40447'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:35121'. Reason: nanny-close
INFO:distributed.nanny:Nanny asking worker to close. Reason: nanny-close
INFO:distributed.core:Received 'close-stream' from tcp://127.0.0.1:58456; closing.
INFO:distributed.scheduler:Remove worker addr: tcp://127.0.0.1:46613 name: 0 (stimulus_id='handle-worker-cleanup-1750097710.9176917')
INFO:distr

In [None]:
from dask.distributed import Client
client = Client(n_workers=2, threads_per_worker=1)
client

INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:43411
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:33993'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:45377'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:42045 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:42045
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:39534
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:45307 name: 1
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:45307
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:39550
INFO:distributed.scheduler:Receive client connection: Client-e306514d-4add-11f0-97bc-0242ac1c000c
INFO:distributed.core:Starting establish

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 2
Total threads: 2,Total memory: 12.67 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43411,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:42045,Total threads: 1
Dashboard: http://127.0.0.1:36411/status,Memory: 6.34 GiB
Nanny: tcp://127.0.0.1:33993,
Local directory: /tmp/dask-scratch-space/worker-raz8b9n5,Local directory: /tmp/dask-scratch-space/worker-raz8b9n5

0,1
Comm: tcp://127.0.0.1:45307,Total threads: 1
Dashboard: http://127.0.0.1:36015/status,Memory: 6.34 GiB
Nanny: tcp://127.0.0.1:45377,
Local directory: /tmp/dask-scratch-space/worker-8pb877e9,Local directory: /tmp/dask-scratch-space/worker-8pb877e9


In [None]:
!pip3 install --upgrade yfinance > None

In [None]:
import pandas as pd
import numpy as np
import yfinance as yf

# Download Market Data from Yahoo! Finance’s API

In [None]:
# Получение данных:

# Указываем тикер ПАО "Газпром" на Московской бирже
ticker = "GAZP.ME"

# Создаем объект тикера
gazprom = yf.Ticker(ticker)

# Получение исторических данных по акциям ПАО "Газпром"
# period="max" означает получение всех доступных данных
historical_data = gazprom.history(period="max")

In [None]:
historical_data.index.max()

Timestamp('2022-10-10 00:00:00+0300', tz='Europe/Moscow')

In [None]:
historical_data

Unnamed: 0_level_0,Open,High,Low,Close,Volume,Dividends,Stock Splits
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2010-03-03 00:00:00+03:00,91.197370,92.418093,90.268327,91.877953,15012689,0.00,0.0
2010-03-04 00:00:00+03:00,91.289177,94.222146,90.943488,93.050041,22078020,0.00,0.0
2010-03-05 00:00:00+03:00,94.011493,95.907384,93.854847,95.496880,27804988,0.00,0.0
2010-03-09 00:00:00+03:00,95.394241,96.193646,94.168119,94.778481,20023702,0.00,0.0
2010-03-10 00:00:00+03:00,94.416604,95.988416,93.255297,93.860260,19300420,0.00,0.0
...,...,...,...,...,...,...,...
2022-05-19 00:00:00+03:00,268.000000,269.779999,262.649994,266.679993,24980910,0.00,0.0
2022-05-20 00:00:00+03:00,266.679993,266.679993,266.679993,266.679993,0,0.00,0.0
2022-05-23 00:00:00+03:00,266.679993,266.679993,266.679993,266.679993,0,0.00,0.0
2022-05-24 00:00:00+03:00,266.679993,266.679993,266.679993,266.679993,0,0.00,0.0


In [None]:
historical_data = historical_data.reset_index()
historical_data.to_csv('gazprom_historical_data.csv')

In [None]:
historical_data.to_parquet('df.parquet.gzip', compression='gzip')
pd.read_parquet('df.parquet.gzip')

Unnamed: 0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits
0,2010-03-03 00:00:00+03:00,91.197370,92.418093,90.268327,91.877953,15012689,0.00,0.0
1,2010-03-04 00:00:00+03:00,91.289177,94.222146,90.943488,93.050041,22078020,0.00,0.0
2,2010-03-05 00:00:00+03:00,94.011493,95.907384,93.854847,95.496880,27804988,0.00,0.0
3,2010-03-09 00:00:00+03:00,95.394241,96.193646,94.168119,94.778481,20023702,0.00,0.0
4,2010-03-10 00:00:00+03:00,94.416604,95.988416,93.255297,93.860260,19300420,0.00,0.0
...,...,...,...,...,...,...,...,...
3039,2022-05-19 00:00:00+03:00,268.000000,269.779999,262.649994,266.679993,24980910,0.00,0.0
3040,2022-05-20 00:00:00+03:00,266.679993,266.679993,266.679993,266.679993,0,0.00,0.0
3041,2022-05-23 00:00:00+03:00,266.679993,266.679993,266.679993,266.679993,0,0.00,0.0
3042,2022-05-24 00:00:00+03:00,266.679993,266.679993,266.679993,266.679993,0,0.00,0.0


In [None]:
historical_data.to_parquet('df.parquet')
pd.read_parquet('df.parquet')

Unnamed: 0,Date,Open,High,Low,Close,Volume,Dividends,Stock Splits
0,2010-03-03 00:00:00+03:00,91.197370,92.418093,90.268327,91.877953,15012689,0.00,0.0
1,2010-03-04 00:00:00+03:00,91.289177,94.222146,90.943488,93.050041,22078020,0.00,0.0
2,2010-03-05 00:00:00+03:00,94.011493,95.907384,93.854847,95.496880,27804988,0.00,0.0
3,2010-03-09 00:00:00+03:00,95.394241,96.193646,94.168119,94.778481,20023702,0.00,0.0
4,2010-03-10 00:00:00+03:00,94.416604,95.988416,93.255297,93.860260,19300420,0.00,0.0
...,...,...,...,...,...,...,...,...
3039,2022-05-19 00:00:00+03:00,268.000000,269.779999,262.649994,266.679993,24980910,0.00,0.0
3040,2022-05-20 00:00:00+03:00,266.679993,266.679993,266.679993,266.679993,0,0.00,0.0
3041,2022-05-23 00:00:00+03:00,266.679993,266.679993,266.679993,266.679993,0,0.00,0.0
3042,2022-05-24 00:00:00+03:00,266.679993,266.679993,266.679993,266.679993,0,0.00,0.0


# sqlite3

In [None]:
import sqlite3


# Создание базы данных:

# Создание базы данных и подключение
conn = sqlite3.connect('gazprom_data.db')

# Проверка подключения
print("База данных gazprom_data.db создана и подключена.")

# Закрытие соединения
conn.close()


База данных gazprom_data.db создана и подключена.


In [None]:
# Создание таблиц в базе данных:

# Подключение к базе данных
conn = sqlite3.connect('gazprom_data.db')
cursor = conn.cursor()

# Получение списка всех таблиц
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()

# Удаление каждой таблицы, кроме системной таблицы sqlite_sequence
for table_name in tables:
    if table_name[0] != 'sqlite_sequence':  # Пропускаем системную таблицу
        print(f"Удаляем таблицу {table_name[0]}")
        cursor.execute(f"DROP TABLE IF EXISTS {table_name[0]}")

# Создание таблицы historical_data
cursor.execute('''
CREATE TABLE IF NOT EXISTS historical_data (
    date TEXT PRIMARY KEY,
    open REAL,
    high REAL,
    low REAL,
    close REAL,
    volume REAL,
    dividends REAL,
    stock_splits REAL
)
''')

# Закрытие соединения
conn.commit()
conn.close()

print("Таблицы созданы успешно.")



Таблицы созданы успешно.


In [None]:
conn = sqlite3.connect('gazprom_data.db')
cursor = conn.cursor()

# Получение исторических данных из базы данных
historical_data = pd.read_sql_query("SELECT * FROM historical_data", conn)
conn.close()
historical_data

Unnamed: 0,date,open,high,low,close,volume,dividends,stock_splits


In [None]:
# Подключение к базе данных
conn = sqlite3.connect('gazprom_data.db')
cursor = conn.cursor()

# Функция для загрузки данных и проверки на дублирование
def insert_or_ignore(cursor, query, data):
    cursor.execute(query, data)

# Загрузка данных в таблицу historical_data
historical_data = pd.read_csv('gazprom_historical_data.csv')

# Явно указываем utc=True для корректного преобразования даты
historical_data['Date'] = pd.to_datetime(historical_data['Date'], utc=True).dt.date

for i, row in historical_data.iterrows():
    insert_or_ignore(cursor, '''
    INSERT OR IGNORE INTO historical_data (date, open, high, low, close, volume, dividends, stock_splits)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    ''', (row['Date'], row['Open'], row['High'], row['Low'], row['Close'], row['Volume'], row['Dividends'], row['Stock Splits']))

# Закрытие соединения
conn.commit()
conn.close()

print("Данные успешно загружены в таблицы.")


Данные успешно загружены в таблицы.


In [None]:
conn = sqlite3.connect('gazprom_data.db')
cursor = conn.cursor()

# Получение исторических данных из базы данных
historical_data = pd.read_sql_query("SELECT * FROM historical_data", conn)
conn.close()
historical_data

Unnamed: 0,date,open,high,low,close,volume,dividends,stock_splits
0,2010-03-02,91.197370,92.418093,90.268327,91.877953,15012689.0,0.00,0.0
1,2010-03-03,91.289177,94.222146,90.943488,93.050041,22078020.0,0.00,0.0
2,2010-03-04,94.011493,95.907384,93.854847,95.496880,27804988.0,0.00,0.0
3,2010-03-08,95.394241,96.193646,94.168119,94.778481,20023702.0,0.00,0.0
4,2010-03-09,94.416604,95.988416,93.255297,93.860260,19300420.0,0.00,0.0
...,...,...,...,...,...,...,...,...
3039,2022-05-18,268.000000,269.779999,262.649994,266.679993,24980910.0,0.00,0.0
3040,2022-05-19,266.679993,266.679993,266.679993,266.679993,0.0,0.00,0.0
3041,2022-05-22,266.679993,266.679993,266.679993,266.679993,0.0,0.00,0.0
3042,2022-05-23,266.679993,266.679993,266.679993,266.679993,0.0,0.00,0.0


In [None]:
# Пример кода для автоматического обновления данных в базе данных:

# Подключение к базе данных
conn = sqlite3.connect('gazprom_data.db')
cursor = conn.cursor()

# Функция для обновления данных в таблице historical_data
def update_historical_data():
    try:
        # Загрузка новых данных через yfinance (например, за последние 5 дней)
        #new_data = yf.download('GAZP.ME', period='1d')

        ticker = "GAZP.ME"
        gazprom = yf.Ticker(ticker)
        new_data = gazprom.history(period="max")

        if new_data.empty:
            print("Данные для GAZP.ME не найдены. Возможно, тикер был исключен с биржи.")
            return

        # Преобразование индекса в колонку Date
        new_data.reset_index(inplace=True)
        new_data['Date'] = pd.to_datetime(new_data['Date'], utc=True).dt.date

        # Обработка каждого ряда данных
        for i, row in new_data.iterrows():
            cursor.execute('SELECT 1 FROM historical_data WHERE date = ?', (row['Date'],))
            if cursor.fetchone() is None:
                cursor.execute('''
                INSERT INTO historical_data (date, open, high, low, close, volume, dividends, stock_splits)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                ''', (row['Date'], row['Open'], row['High'], row['Low'], row['Close'], row['Volume'], row.get('Dividends', 0), row.get('Stock Splits', 0)))
                print(f"Данные за {row['Date']} добавлены.")
            else:
                print(f"Данные за {row['Date']} уже существуют.")
    except Exception as e:
        print(f"Ошибка при обновлении исторических данных: {e}")

# Запуск обновления данных
update_historical_data()

# Закрытие соединения с базой данных после обновления
conn.commit()
conn.close()

Данные за 2010-03-02 уже существуют.
Данные за 2010-03-03 уже существуют.
Данные за 2010-03-04 уже существуют.
Данные за 2010-03-08 уже существуют.
Данные за 2010-03-09 уже существуют.
Данные за 2010-03-10 уже существуют.
Данные за 2010-03-11 уже существуют.
Данные за 2010-03-14 уже существуют.
Данные за 2010-03-15 уже существуют.
Данные за 2010-03-16 уже существуют.
Данные за 2010-03-17 уже существуют.
Данные за 2010-03-18 уже существуют.
Данные за 2010-03-21 уже существуют.
Данные за 2010-03-22 уже существуют.
Данные за 2010-03-23 уже существуют.
Данные за 2010-03-24 уже существуют.
Данные за 2010-03-25 уже существуют.
Данные за 2010-03-28 уже существуют.
Данные за 2010-03-29 уже существуют.
Данные за 2010-03-30 уже существуют.
Данные за 2010-03-31 уже существуют.
Данные за 2010-04-01 уже существуют.
Данные за 2010-04-04 уже существуют.
Данные за 2010-04-05 уже существуют.
Данные за 2010-04-06 уже существуют.
Данные за 2010-04-07 уже существуют.
Данные за 2010-04-08 уже существуют.
Д

In [None]:
https://colab.research.google.com/drive/1dgE8BfEQl7yB7G8xS2TkRaVAmc3joTvE?usp=sharing

In [None]:
https://docs-python.ru/packages/modul-pandas-analiz-dannykh-python/explode-series-dataframe/