In [None]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import pandas as pd

# Список годов
years = range(2014, 2022)

# Создание пустого списка для хранения объектов DataFrame
dfs = []

# Цикл для открытия и обработки каждого файла
for year in years:
    # Форматирование имени файла с учетом изменения года
    filename = f"/content/DAT_MT_EURUSD_M1_{year}.csv"

    # Открытие файла CSV с помощью pandas
    df = pd.read_csv(filename, header=None)

    # Удаляем первые два столбца и последний столбец
    df = df.iloc[:, 2:-1]

    # Переименовываем оставшиеся столбцы
    df.columns = ['Open', 'High', 'Low', 'Close']

    # Добавляем объект DataFrame в список
    dfs.append(df)

# Объединяем объекты DataFrame в один большой DataFrame
data = pd.concat(dfs, axis=0, ignore_index=True)

In [None]:
data.to_csv('/content/data.csv', index=True)

In [None]:
data

Unnamed: 0,Open,High,Low,Close
0,1.37615,1.37615,1.37586,1.37586
1,1.37592,1.37592,1.37569,1.37569
2,1.37559,1.37571,1.37536,1.37536
3,1.37543,1.37554,1.37543,1.37554
4,1.37550,1.37550,1.37550,1.37550
...,...,...,...,...
2969970,1.13797,1.13797,1.13774,1.13774
2969971,1.13775,1.13781,1.13748,1.13748
2969972,1.13749,1.13765,1.13704,1.13707
2969973,1.13709,1.13740,1.13679,1.13740


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import DenseVector
from pyspark.sql.functions import monotonically_increasing_id
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

# Создание SparkSession
spark = SparkSession.builder \
    .appName("Forex Clustering with PySpark") \
    .getOrCreate()

data = spark.read.csv('/content/data.csv', header=True, inferSchema=True)
# Создание новых признаков
data = data.withColumn('Close-Open', data['Close'] - data['Open']) \
    .withColumn('High-Low', data['High'] - data['Low']) \
    .withColumn('High-Close', data['High'] - data['Close']) \
    .withColumn('High-Open', data['High'] - data['Open']) \
    .withColumn('Low-Close', data['Low'] - data['Close']) \
    .withColumn('Low-Open', data['Low'] - data['Open']) \
    .withColumn('index', monotonically_increasing_id())

# Создание конвейера преобразования данных
assembler = VectorAssembler(inputCols=['Close-Open', 'High-Low', 'High-Close', 'High-Open', 'Low-Close', 'Low-Open'], outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
pipeline = Pipeline(stages=[assembler, scaler])

# Обучение и применение конвейера преобразования данных
model = pipeline.fit(data)
scaled_data = model.transform(data)

# Кластеризация с использованием KMeans
n_clusters = 4000
kmeans = KMeans(featuresCol='scaled_features', k=n_clusters)
kmeans_model = kmeans.fit(scaled_data)
predictions = kmeans_model.transform(scaled_data)

# Создание нового DataFrame с кластерами
data_with_clusters = data.join(predictions.select('index', 'prediction'), predictions['index'] == data['index'])

# Д
# Добавление столбца с метками кластеров
data_with_clusters = data_with_clusters.withColumnRenamed('prediction', 'Cluster')

# Сохранение меток кластеров в текстовый файл
cluster_labels = data_with_clusters.select('Cluster').toPandas()


# Сохранение меток кластеров в текстовый файл в одну строку с одиночным пробелом между ними
with open(r'C:\Users\ext17\Downloads\clusterkeras.txt', "w") as f:
    for idx, label in enumerate(cluster_labels['Cluster']):
        if idx != 0:
            f.write(" ")
        f.write(str(label))

# Подсчет и вывод количества сохраненных меток


# Подсчет количества элементов в каждом кластере
pd.set_option('display.max_rows', None)
print(cluster_labels['Cluster'].value_counts())
value_counts = cluster_labels['Cluster'].value_counts()
value_counts.to_csv(r'C:\Users\ext17\Downloads\cluster_countkeras.txt', index=True, header=['Count'])

# Вычисление средних арифметических моделей свеч для каждого класса
mean_values = data_with_clusters.groupBy('Cluster').mean('Close-Open', 'High-Low', 'High-Close', 'High-Open', 'Low-Close', 'Low-Open')

# Сохранение средних значений в файл

mean_values.toPandas().to_csv(r'C:\Users\ext17\Downloads\meangigkeras.csv', index=False)

# Отображение графиков
mean_values_pd = mean_values.toPandas()
n_plots = 20
bar_width = 0.15

for plot_counter in range(n_plots):
    cluster_label = plot_counter % n_clusters
    mean_data = mean_values_pd.loc[mean_values_pd['Cluster'] == cluster_label]

    fig, ax = plt.subplots()
    index = np.arange(len(mean_data.columns) - 1)
    rects1 = ax.bar(index, mean_data.iloc[0, 1:], bar_width, label=f'Cluster {cluster_label}')

    ax.set_title(f'Средние значения атрибутов свечей для кластера {cluster_label}')
    ax.set_xticks(index)
    ax.set_xticklabels(['Close-Open', 'High-Low', 'High-Close', 'High-Open', 'Low-Close', 'Low-Open'])
    ax.legend()
 
    plt.show()
    print(f"Количество сохраненных меток: {len(cluster_labels)}")



In [None]:
with open(r'C:\Users\ext17\Downloads\clusterkeras1.txt', "w") as f:
    for idx, label in enumerate(cluster_labels['Cluster']):
        if idx != 0:
            f.write(" ")
        f.write(str(label))

In [None]:
mean_values = data_with_clusters.groupBy('Cluster').mean('Close-Open', 'High-Low', 'High-Close', 'High-Open', 'Low-Close', 'Low-Open')

# Сохранение средних значений в файл

mean_values.toPandas().to_csv(r'C:\Users\ext17\Downloads\meangigkeras1.csv', index=False)

In [None]:
pd.set_option('display.max_rows', None)
print(cluster_labels['Cluster'].value_counts())
value_counts = cluster_labels['Cluster'].value_counts()
value_counts.to_csv(r'C:\Users\ext17\Downloads\cluster_countkeras1.txt', index=True, header=['Count'])


In [None]:
df = pd.read_csv('/content/rrr.txt')

# Calculate the sum of the 'Count' column
total = df['Count'].sum()

print(f"The total count is {total}.")

The total count is 2969975.


In [None]:
idx

2969974

In [None]:
label

635

In [None]:
type(cluster_labels['Cluster'])

pandas.core.series.Series

In [None]:
num_rows = cluster_labels.size
print(f"The Series contains {num_rows} rows.")

The Series contains 2969975 rows.


In [None]:
type(data_with_clusters )

pyspark.sql.dataframe.DataFrame

In [None]:
num_rows = data.count()
print(f"The DataFrame contains {num_rows} rows.")

The DataFrame contains 2969975 rows.


In [None]:
print(f"The DataFrame contains {num_rows} rows.")

In [None]:
num_rows = data_with_clusters.count()
print(f"The DataFrame contains {num_rows} rows.")

The DataFrame contains 2969975 rows.


In [None]:
data 

DataFrame[_c0: int, Open: double, High: double, Low: double, Close: double, Close-Open: double, High-Low: double, High-Close: double, High-Open: double, Low-Close: double, Low-Open: double, index: bigint]

In [None]:
cluster_labels

KeyboardInterrupt: ignored

In [None]:
pip install  mplfinance 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import mplfinance as mpf

In [None]:

# Create market colors for the plot
mc = mpf.make_marketcolors(up='g', down='r')
s = mpf.make_mpf_style(marketcolors=mc)

# Number of plots and quotes in each plot
n_plots = 30
quotes_per_plot = 20

# Sort clusters by their size
sorted_clusters = cluster_labels['Cluster'].value_counts().index.tolist()

# Convert the PySpark DataFrame to a pandas DataFrame
data_with_clusters_pd = data_with_clusters.toPandas()

# Create the plots
for cluster_label in sorted_clusters[:n_plots]:
    plot_data = data_with_clusters_pd[data_with_clusters_pd['Cluster'] == cluster_label].iloc[:quotes_per_plot]

    # Only plot if the cluster has more than 20 members
    if len(plot_data) >= 20:
        # Visualize quotes in the form of candles
        fig, axes = mpf.plot(plot_data, type='candle', title=f'Quotes with classification labels {cluster_label}', ylabel='Price', returnfig=True, style=s)
        
        # Add classification labels above each candle
        for idx, (index, row) in enumerate(plot_data.iterrows()):
            offset = 0.01 * (row['High'] - row['Low'])  # Offset is 1% of the candle range
            label_position = row['High'] + offset
            axes[0].annotate(str(cluster_label), xy=(mpf.date2num(index), label_position), fontsize=8, color='black', backgroundcolor='yellow')
        
        plt.show()


Py4JJavaError: ignored

In [None]:
scaled_data_row_count = data_with_clusters.count()
print(f"Количество строк в scaled_data: {scaled_data_row_count}")

Количество строк в scaled_data: 111469


In [None]:
row_count = data.count()

In [None]:
print(f"Количество строк в DataFrame: {row_count}")

Количество строк в DataFrame: 111469


In [None]:
rows_after_filter

111469

In [None]:
predictions

DataFrame[index: bigint, Open: double, High: double, Low: double, Close: double, Volume: bigint, Close_diff_1: double, Close_diff_2: double, Close_diff_3: double, Close_diff_4: double, Close_diff_5: double, Close_diff_6: double, Close_diff_7: double, Close_diff_8: double, Close_diff_9: double, features: vector, scaled_features: vector, prediction: int]

In [None]:
data

Unnamed: 0,Open,High,Low,Close,Volume,Close_diff_1,Close_diff_2,Close_diff_3,Close_diff_4,Close_diff_5,Close_diff_6,Close_diff_7,Close_diff_8,Close_diff_9
0,1.12106,1.12135,1.12106,1.12135,0,,,,,,,,,
10,1.12127,1.12127,1.12127,1.12127,0,0.00000,0.00000,0.00000,0.00000,0.00000,0.00000,-0.00002,-0.00005,0.00012
20,1.12139,1.12149,1.12137,1.12147,0,-0.00007,-0.00001,0.00004,0.00005,-0.00020,-0.00020,-0.00020,-0.00020,-0.00020
30,1.12146,1.12148,1.12144,1.12147,0,-0.00001,-0.00001,-0.00008,-0.00006,-0.00006,-0.00001,-0.00001,-0.00002,0.00000
40,1.12154,1.12155,1.12154,1.12155,0,-0.00001,-0.00003,-0.00002,-0.00004,-0.00005,-0.00005,-0.00008,-0.00010,-0.00007
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1114650,1.07038,1.07038,1.07032,1.07032,0,0.00003,0.00006,0.00001,0.00011,0.00007,0.00011,0.00009,0.00028,0.00020
1114660,1.07031,1.07032,1.07026,1.07026,0,0.00005,0.00004,0.00007,0.00009,0.00007,0.00006,0.00006,0.00009,0.00004
1114670,1.07044,1.07046,1.07034,1.07041,0,0.00000,-0.00005,-0.00005,-0.00009,-0.00006,-0.00023,-0.00036,-0.00021,-0.00023
1114680,1.07016,1.07016,1.07010,1.07011,0,0.00007,0.00007,0.00009,0.00009,0.00016,0.00019,0.00017,0.00020,0.00016
