In [1]:
# Блок 0: Установка зависимостей
# Устанавливаем все необходимые библиотеки
!pip install requests pandas pyspark pycryptodome

# Блок 1: Подключение Google Диска и настройка окружения
# Подключаем Google Диск и создаем папку для хранения данных
from google.colab import drive
import os

drive.mount('/content/drive')
output_dir = "/content/drive/MyDrive/уник/cripto"
os.makedirs(output_dir, exist_ok=True)

# Блок 2: Инициализация SQLite базы данных
# Создаем базу данных и таблицы для хранения криптовалют, дат и цен
import sqlite3

conn = sqlite3.connect(os.path.join(output_dir, "crypto.db"))
cursor = conn.cursor()

cursor.executescript('''
CREATE TABLE IF NOT EXISTS Cryptocurrency (
    crypto_id INTEGER PRIMARY KEY,
    name TEXT NOT NULL,
    ticker TEXT NOT NULL UNIQUE,
    description TEXT
);
CREATE TABLE IF NOT EXISTS Date (
    date_id INTEGER PRIMARY KEY,
    year INTEGER NOT NULL,
    month INTEGER NOT NULL,
    day INTEGER NOT NULL,
    UNIQUE(year, month, day)
);
CREATE TABLE IF NOT EXISTS PriceData (
    price_id INTEGER PRIMARY KEY,
    crypto_id INTEGER,
    date_id INTEGER,
    avg_price REAL,
    volume REAL,
    market_cap REAL,
    FOREIGN KEY (crypto_id) REFERENCES Cryptocurrency(crypto_id),
    FOREIGN KEY (date_id) REFERENCES Date(date_id)
);
''')
conn.commit()

# Блок 3: Сбор данных с CryptoCompare API
# Запрашиваем исторические данные о ценах и объемах для BTC и ETH
import pandas as pd
import requests
from datetime import datetime, timedelta
import time

def get_crypto_data(symbol, start_date, end_date, api_key=None):
    base_url = "https://min-api.cryptocompare.com/data/v2/histoday"
    days = (end_date - start_date).days
    params = {
        'fsym': symbol,
        'tsym': 'USD',
        'limit': min(days, 2000),
        'toTs': int(end_date.timestamp())
    }
    if api_key:
        params['api_key'] = api_key

    response = requests.get(base_url, params=params)
    data = response.json()
    if data['Response'] != 'Success':
        raise ValueError(f"Ошибка API: {data.get('Message', 'Неизвестная ошибка')}")

    df = pd.DataFrame(data['Data']['Data'])
    df['date'] = pd.to_datetime(df['time'], unit='s')
    df['avg_price'] = (df['high'] + df['low']) / 2
    df['volume'] = df['volumeto']
    df = df[['date', 'avg_price', 'volume']]
    return df[df['date'].notnull() & (df['avg_price'] > 0) & (df['date'] >= start_date)]

coins = {'BTC': 'Bitcoin', 'ETH': 'Ethereum'}
start_date = datetime(2023, 1, 1)
end_date = datetime(2025, 1, 1)
all_data = []

for symbol, name in coins.items():
    df = get_crypto_data(symbol, start_date, end_date)
    df['ticker'] = symbol
    all_data.append(df)
    time.sleep(1)

full_df = pd.concat(all_data)
full_df['market_cap'] = 0.0  # CryptoCompare не предоставляет market_cap
full_df.to_csv(os.path.join(output_dir, "crypto_raw_data.csv"), index=False)

# Блок 4: Агрегация данных с Apache Spark
# Выполняем месячную агрегацию цен и объемов
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, avg, col

spark = SparkSession.builder.appName("CryptoAnalysis").getOrCreate()
df_spark = spark.read.csv(os.path.join(output_dir, "crypto_raw_data.csv"), header=True, inferSchema=True)

monthly_data = df_spark.groupBy(
    'ticker', year('date').alias('year'), month('date').alias('month')
).agg(
    avg('avg_price').alias('monthly_avg_price'),
    avg('volume').alias('monthly_avg_volume'),
    avg('market_cap').alias('monthly_avg_market_cap')
).filter(
    (col('year').between(2023, 2025)) & (col('month').between(1, 12))
).orderBy('ticker', 'year', 'month')

monthly_data.write.csv(os.path.join(output_dir, "crypto_monthly_data"), mode="overwrite")

# Блок 5: Сохранение агрегированных данных в SQLite
# Записываем данные в таблицы Cryptocurrency, Date и PriceData
monthly_df = monthly_data.toPandas().dropna(subset=['year', 'month', 'ticker'])
monthly_df['year'] = monthly_df['year'].astype(int)
monthly_df['month'] = monthly_df['month'].astype(int)

# Массовое добавление криптовалют
crypto_data = [(ticker, ticker) for ticker in monthly_df['ticker'].unique()]
cursor.executemany(
    "INSERT OR IGNORE INTO Cryptocurrency (ticker, name) VALUES (?, ?)",
    crypto_data
)

# Массовое добавление дат
dates = monthly_df[['year', 'month']].drop_duplicates()
date_data = [(row['year'], row['month'], 1) for _, row in dates.iterrows()
             if 1 <= row['month'] <= 12 and 2023 <= row['year'] <= 2025]
cursor.executemany(
    "INSERT OR IGNORE INTO Date (year, month, day) VALUES (?, ?, ?)",
    date_data
)
conn.commit()

# Получение словарей для crypto_id и date_id
cursor.execute("SELECT ticker, crypto_id FROM Cryptocurrency")
crypto_ids = {row[0]: row[1] for row in cursor.fetchall()}
cursor.execute("SELECT year, month, date_id FROM Date")
date_ids = {(row[0], row[1]): row[2] for row in cursor.fetchall()}

# Массовое добавление цен
price_data = []
for _, row in monthly_df.iterrows():
    year, month = int(row['year']), int(row['month'])
    if (year, month) in date_ids and row['ticker'] in crypto_ids:
        price_data.append((
            crypto_ids[row['ticker']],
            date_ids[(year, month)],
            row['monthly_avg_price'],
            row['monthly_avg_volume'],
            row['monthly_avg_market_cap']
        ))

cursor.executemany(
    "INSERT INTO PriceData (crypto_id, date_id, avg_price, volume, market_cap) VALUES (?, ?, ?, ?, ?)",
    price_data
)
conn.commit()

# Блок 6: Шифрование файлов
# Шифруем CSV и базу данных с использованием AES
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes

def encrypt_file(input_file, output_file, key):
    cipher = AES.new(key, AES.MODE_EAX)
    with open(input_file, 'rb') as f:
        data = f.read()
    ciphertext, tag = cipher.encrypt_and_digest(data)
    with open(output_file, 'wb') as f:
        [f.write(x) for x in (cipher.nonce, tag, ciphertext)]

key = get_random_bytes(16)
with open(os.path.join(output_dir, "key.txt"), "wb") as f:
    f.write(key)

encrypt_file(
    os.path.join(output_dir, "crypto_raw_data.csv"),
    os.path.join(output_dir, "crypto_raw_data_encrypted.bin"),
    key
)
encrypt_file(
    os.path.join(output_dir, "crypto.db"),
    os.path.join(output_dir, "crypto_encrypted.db"),
    key
)

# Блок 7: Вывод результатов
# Выполняем SQL-запрос для отображения агрегированных цен
cursor.execute('''
SELECT c.ticker, d.year, d.month, p.avg_price
FROM PriceData p
JOIN Cryptocurrency c ON p.crypto_id = c.crypto_id
JOIN Date d ON p.date_id = d.date_id
ORDER BY c.ticker, d.year, d.month
''')
results = cursor.fetchall()

for row in results:
    print(f"{row[0]} {row[1]}-{row[2]}: ${row[3]:.2f}")

conn.close()

# Блок 8: Очистка
# Останавливаем Spark и синхронизируем Google Диск
spark.stop()
drive.flush_and_unmount()

Collecting pycryptodome
  Downloading pycryptodome-3.22.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Downloading pycryptodome-3.22.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m17.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pycryptodome
Successfully installed pycryptodome-3.22.0
Mounted at /content/drive


In [2]:
# Подключение к Google Диску и базе данных
from google.colab import drive
import sqlite3
import os
import pandas as pd
import glob

# Монтирование Google Диска
drive.mount('/content/drive', force_remount=True)

# Путь к базе данных и создание папки
output_dir = "/content/drive/MyDrive/уник/cripto"
os.makedirs(output_dir, exist_ok=True)

# Проверка существования базы данных
db_path = os.path.join(output_dir, "crypto.db")
if not os.path.exists(db_path):
    print(f"Ошибка: Файл базы данных {db_path} не существует.")
    print("Выполните основной скрипт (crypto_data_processing.py) заново.")
else:
    print(f"База данных найдена: {db_path}")

# Диагностика файлов, созданных основным скриптом
print("\n=== Диагностика файлов ===")
print("Файлы в директории:", os.listdir(output_dir))

# Проверка crypto_raw_data.csv
raw_data_path = os.path.join(output_dir, "crypto_raw_data.csv")
if os.path.exists(raw_data_path):
    raw_df = pd.read_csv(raw_data_path)
    print("\nСодержимое crypto_raw_data.csv:")
    print(f"Количество записей: {len(raw_df)}")
    print(f"Диапазон дат: {raw_df['date'].min()} - {raw_df['date'].max()}")
    print(f"Тикеры: {raw_df['ticker'].unique()}")
else:
    print(f"Файл {raw_data_path} не найден.")

# Проверка crypto_monthly_data
monthly_data_path = os.path.join(output_dir, "crypto_monthly_data")
if os.path.exists(monthly_data_path):
    monthly_files = glob.glob(os.path.join(monthly_data_path, "part-*.csv"))
    if monthly_files:
        monthly_df = pd.concat([pd.read_csv(f) for f in monthly_files])
        print("\nСодержимое crypto_monthly_data:")
        print(f"Количество записей: {len(monthly_df)}")
    else:
        print("Нет CSV-файлов в crypto_monthly_data.")
else:
    print(f"Папка {monthly_data_path} не найдена.")

# Подключение к базе данных
try:
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # Диагностика: Проверка структуры и содержимого базы
    print("\n=== Диагностика базы данных ===")

    # Список таблиц
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
    tables = cursor.fetchall()
    print("Таблицы в базе:", tables)

    # Количество записей в каждой таблице
    if tables:
        cursor.execute("SELECT count(*) FROM Cryptocurrency")
        crypto_count = cursor.fetchone()[0]
        print("Количество записей в Cryptocurrency:", crypto_count)

        cursor.execute("SELECT count(*) FROM Date")
        date_count = cursor.fetchone()[0]
        print("Количество записей в Date:", date_count)

        cursor.execute("SELECT count(*) FROM PriceData")
        price_count = cursor.fetchone()[0]
        print("Количество записей в PriceData:", price_count)

        # Пример содержимого PriceData (первые 5 записей)
        if price_count > 0:
            cursor.execute("""
            SELECT c.ticker, d.year, d.month, p.avg_price, p.volume
            FROM PriceData p
            JOIN Cryptocurrency c ON p.crypto_id = c.crypto_id
            JOIN Date d ON p.date_id = d.date_id
            LIMIT 5
            """)
            sample_data = cursor.fetchall()
            print("\nПример данных из PriceData (первые 5 записей):")
            for row in sample_data:
                print(f"Ticker: {row[0]}, {row[1]}-{row[2]}: Price=${row[3]:.2f}, Volume=${row[4]:.2f}")
        else:
            print("\nТаблица PriceData пуста.")
    else:
        print("В базе данных нет таблиц.")

    # SQL-запрос 1: Получение списка всех криптовалют
    print("\n=== SQL-запрос 1: Список всех криптовалют ===")
    try:
        cursor.execute("SELECT ticker, name, description FROM Cryptocurrency")
        results = cursor.fetchall()
        if results:
            for row in results:
                print(f"Ticker: {row[0]}, Name: {row[1]}, Description: {row[2]}")
        else:
            print("Нет данных в таблице Cryptocurrency")
    except sqlite3.OperationalError as e:
        print(f"Ошибка при выполнении запроса: {e}")

    # SQL-запрос 2: Средние месячные цены для Bitcoin в 2023 году
    print("\n=== SQL-запрос 2: Средние цены Bitcoin за 2023 год ===")
    try:
        cursor.execute("""
        SELECT d.year, d.month, p.avg_price
        FROM PriceData p
        JOIN Cryptocurrency c ON p.crypto_id = c.crypto_id
        JOIN Date d ON p.date_id = d.date_id
        WHERE c.ticker = 'BTC' AND d.year = 2023
        ORDER BY d.year, d.month
        """)
        results = cursor.fetchall()
        if results:
            for row in results:
                print(f"{row[0]}-{row[1]:02d}: ${row[2]:.2f}")
        else:
            print("Нет данных для Bitcoin за 2023 год")
    except sqlite3.OperationalError as e:
        print(f"Ошибка при выполнении запроса: {e}")

    # SQL-запрос 3: Максимальная и минимальная цена для каждой криптовалюты
    print("\n=== SQL-запрос 3: Максимальная и минимальная цена для каждой криптовалюты ===")
    try:
        cursor.execute("""
        SELECT c.ticker, MAX(p.avg_price) as max_price, MIN(p.avg_price) as min_price
        FROM PriceData p
        JOIN Cryptocurrency c ON p.crypto_id = c.crypto_id
        GROUP BY c.ticker
        """)
        results = cursor.fetchall()
        if results:
            for row in results:
                print(f"{row[0]}: Max = ${row[1]:.2f}, Min = ${row[2]:.2f}")
        else:
            print("Нет данных о ценах в таблице PriceData")
    except sqlite3.OperationalError as e:
        print(f"Ошибка при выполнении запроса: {e}")

    # SQL-запрос 4: Общий объем торгов Ethereum за весь период
    print("\n=== SQL-запрос 4: Общий объем торгов Ethereum за весь период ===")
    try:
        cursor.execute("""
        SELECT SUM(p.volume) as total_volume
        FROM PriceData p
        JOIN Cryptocurrency c ON p.crypto_id = c.crypto_id
        JOIN Date d ON p.date_id = d.date_id
        WHERE c.ticker = 'ETH'
        """)
        result = cursor.fetchone()
        if result and result[0] is not None:
            print(f"Total Volume: ${result[0]:.2f}")
        else:
            print("Нет данных об объеме торгов Ethereum")
    except sqlite3.OperationalError as e:
        print(f"Ошибка при выполнении запроса: {e}")

    # SQL-запрос 5: Месяцы с наибольшим объемом торгов для BTC
    print("\n=== SQL-запрос 5: Месяцы с наибольшим объемом торгов для BTC ===")
    try:
        cursor.execute("""
        SELECT c.ticker, d.year, d.month, p.volume
        FROM PriceData p
        JOIN Cryptocurrency c ON p.crypto_id = c.crypto_id
        JOIN Date d ON p.date_id = d.date_id
        WHERE c.ticker = 'BTC'
        ORDER BY p.volume DESC
        LIMIT 5
        """)
        results = cursor.fetchall()
        if results:
            for row in results:
                print(f"{row[0]} {row[1]}-{row[2]:02d}: Volume=${row[3]:.2f}")
        else:
            print("Нет данных об объеме торгов Bitcoin")
    except sqlite3.OperationalError as e:
        print(f"Ошибка при выполнении запроса: {e}")

    # Закрытие соединения
    conn.close()

except sqlite3.OperationalError as e:
    print(f"Ошибка подключения к базе данных: {e}")
    print("Убедитесь, что crypto.db существует и доступен.")

# Синхронизация Google Диска
drive.flush_and_unmount()

Mounted at /content/drive
База данных найдена: /content/drive/MyDrive/уник/cripto/crypto.db

=== Диагностика файлов ===
Файлы в директории: ['crypto.db', 'crypto_raw_data.csv', 'crypto_monthly_data', 'key.txt', 'crypto_raw_data_encrypted.bin', 'crypto_encrypted.db']

Содержимое crypto_raw_data.csv:
Количество записей: 1464
Диапазон дат: 2023-01-01 - 2025-01-01
Тикеры: ['BTC' 'ETH']

Содержимое crypto_monthly_data:
Количество записей: 49

=== Диагностика базы данных ===
Таблицы в базе: [('Cryptocurrency',), ('Date',), ('PriceData',)]
Количество записей в Cryptocurrency: 2
Количество записей в Date: 25
Количество записей в PriceData: 0

Таблица PriceData пуста.

=== SQL-запрос 1: Список всех криптовалют ===
Ticker: BTC, Name: BTC, Description: None
Ticker: ETH, Name: ETH, Description: None

=== SQL-запрос 2: Средние цены Bitcoin за 2023 год ===
Нет данных для Bitcoin за 2023 год

=== SQL-запрос 3: Максимальная и минимальная цена для каждой криптовалюты ===
Нет данных о ценах в таблице Pr

In [3]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import matplotlib.pyplot as plt
import pandas as pd
import os
from google.colab import drive

# Монтирование Google Drive (для Google Colab)
drive.mount('/content/drive')

# Инициализация Spark-сессии
spark = SparkSession.builder.appName("CryptoPricePrediction").getOrCreate()

# Указание пути к файлу
file_path = "/content/drive/MyDrive/уник/cripto/crypto_raw_data.csv"

# Проверка существования файла
if not os.path.exists(file_path):
    raise FileNotFoundError(f"Файл не найден по пути: {file_path}. Пожалуйста, проверьте путь и убедитесь, что файл существует.")

# Загрузка данных
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Очистка данных
# Удаляем строки с пропусками и колонку market_cap
df = df.dropna().drop("market_cap")

# Фильтрация данных для BTC (можно повторить для ETH)
df_btc = df.filter(df.ticker == "BTC")

# Инженерия признаков
# Сортировка по дате
df_btc = df_btc.orderBy("date")

# Создание признаков: процентное изменение цены, скользящее среднее, волатильность
df_btc = df_btc.withColumn("price_change",
    (F.col("avg_price") - F.lag("avg_price").over(Window.partitionBy("ticker").orderBy("date"))) /
    F.lag("avg_price").over(Window.partitionBy("ticker").orderBy("date")))

# Скользящее среднее за 7 дней
df_btc = df_btc.withColumn("ma7",
    F.avg("avg_price").over(Window.partitionBy("ticker").orderBy("date").rowsBetween(-6, 0)))

# Волатильность (стандартное отклонение за 7 дней)
df_btc = df_btc.withColumn("volatility",
    F.stddev("avg_price").over(Window.partitionBy("ticker").orderBy("date").rowsBetween(-6, 0)))

# Логарифм объема торгов
df_btc = df_btc.withColumn("log_volume", F.log1p(F.col("volume")))

# Целевая переменная: 1 если цена вырастет на следующий день, 0 иначе
df_btc = df_btc.withColumn("label",
    F.when(F.lead("avg_price").over(Window.partitionBy("ticker").orderBy("date")) > F.col("avg_price"), 1).otherwise(0))

# Удаление строк с пропусками после создания признаков
df_btc = df_btc.dropna()

# Разделение на тренировочную и тестовую выборки
train_df, test_df = df_btc.randomSplit([0.8, 0.2], seed=42)

# Подготовка данных для модели
feature_columns = ["price_change", "ma7", "volatility", "log_volume"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features_raw")
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=True)

# Определение моделей
lr = LogisticRegression(labelCol="label", featuresCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100)
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=50)

# Создание пайплайнов
pipeline_lr = Pipeline(stages=[assembler, scaler, lr])
pipeline_rf = Pipeline(stages=[assembler, scaler, rf])
pipeline_gbt = Pipeline(stages=[assembler, scaler, gbt])

# Обучение моделей
model_lr = pipeline_lr.fit(train_df)
model_rf = pipeline_rf.fit(train_df)
model_gbt = pipeline_gbt.fit(train_df)

# Оценка моделей
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

# Предсказания и оценка на тестовой выборке
pred_lr = model_lr.transform(test_df)
pred_rf = model_rf.transform(test_df)
pred_gbt = model_gbt.transform(test_df)

roc_lr = evaluator.evaluate(pred_lr)
roc_rf = evaluator.evaluate(pred_rf)
roc_gbt = evaluator.evaluate(pred_gbt)

print(f"ROC-AUC Logistic Regression: {roc_lr}")
print(f"ROC-AUC Random Forest: {roc_rf}")
print(f"ROC-AUC Gradient Boosting: {roc_gbt}")

# Визуализация цен и предсказаний
# Конвертация в Pandas для визуализации
pdf = df_btc.select("date", "avg_price", "label").toPandas()
pred_pdf = pred_rf.select("date", "prediction").toPandas()

# Объединение данных
pdf = pdf.merge(pred_pdf, on="date", how="inner")

# Построение графика
plt.figure(figsize=(12, 6))
plt.plot(pdf["date"], pdf["avg_price"], label="BTC Price", color="blue")
plt.scatter(pdf[pdf["prediction"] == 1]["date"], pdf[pdf["prediction"] == 1]["avg_price"],
            color="green", label="Predicted Up", marker="^")
plt.scatter(pdf[pdf["prediction"] == 0]["date"], pdf[pdf["prediction"] == 0]["avg_price"],
            color="red", label="Predicted Down", marker="v")
plt.xlabel("Date")
plt.ylabel("Average Price (USD)")
plt.title("BTC Price with Predicted Up/Down Movements")
plt.legend()
plt.grid(True)
plt.savefig("btc_price_prediction.png")
plt.close()

# Остановка Spark-сессии
spark.stop()

Mounted at /content/drive
ROC-AUC Logistic Regression: 0.6381045115222331
ROC-AUC Random Forest: 0.5689711132749109
ROC-AUC Gradient Boosting: 0.4589419019798766



ROC-AUC Logistic Regression: 0.6381045115222331
ROC-AUC Random Forest: 0.5657254138266797
ROC-AUC Gradient Boosting: 0.4589419019798766

In [None]:
df_btc.groupBy("label").count().show()

RuntimeError: SparkContext or SparkSession should be created first.

In [None]:
df_btc.select("price_change", "ma7", "volatility", "log_volume").summary().show()

RuntimeError: SparkContext or SparkSession should be created first.

In [None]:
from pyspark.sql import SparkSession

# Создаем SparkSession
spark = SparkSession.builder \
    .appName("CryptoAnalysis") \
    .getOrCreate()

# Теперь можно читать данные и работать с DataFrame
df_btc = spark.read.csv("/content/drive/MyDrive/уник/cripto/crypto_raw_data.csv", header=True, inferSchema=True)

# Пример: показать статистику
df_btc.select("price_change", "ma7", "volatility", "log_volume").summary().show()


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `price_change` cannot be resolved. Did you mean one of the following? [`market_cap`, `ticker`, `avg_price`, `date`, `volume`].;
'Project ['price_change, 'ma7, 'volatility, 'log_volume]
+- Relation [date#1366,avg_price#1367,volume#1368,ticker#1369,market_cap#1370] csv
