In [1]:
# Блок 0: Установка зависимостей
# Устанавливаем необходимые библиотеки
!pip install pandas pyspark requests pycryptodome

# Блок 1: Настройка окружения и Google Диска
# Подключаем Google Диск и создаем директорию
from google.colab import drive
import os

drive.mount('/content/drive')
output_dir = "/content/drive/MyDrive/crypto_analysis"
os.makedirs(output_dir, exist_ok=True)

# Блок 2: Инициализация SQLite базы данных
# Создаем базу данных и таблицы
import sqlite3

conn = sqlite3.connect(os.path.join(output_dir, "crypto.db"))
cursor = conn.cursor()

cursor.executescript('''
CREATE TABLE IF NOT EXISTS Cryptocurrency (
    crypto_id INTEGER PRIMARY KEY,
    ticker TEXT NOT NULL UNIQUE,
    name TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS Date (
    date_id INTEGER PRIMARY KEY,
    year INTEGER NOT NULL,
    month INTEGER NOT NULL,
    day INTEGER NOT NULL,
    UNIQUE(year, month, day)
);
CREATE TABLE IF NOT EXISTS PriceData (
    price_id INTEGER PRIMARY KEY,
    crypto_id INTEGER,
    date_id INTEGER,
    avg_price REAL,
    volume REAL,
    FOREIGN KEY (crypto_id) REFERENCES Cryptocurrency(crypto_id),
    FOREIGN KEY (date_id) REFERENCES Date(date_id)
);
''')
conn.commit()

# Блок 3: Сбор данных с CryptoCompare API
# Получаем исторические данные для BTC и ETH
import pandas as pd
import requests
from datetime import datetime, timedelta
import time

def get_crypto_data(symbol, start_date, end_date):
    base_url = "https://min-api.cryptocompare.com/data/v2/histoday"
    days = (end_date - start_date).days
    params = {
        'fsym': symbol,
        'tsym': 'USD',
        'limit': min(days, 2000),
        'toTs': int(end_date.timestamp())
    }

    response = requests.get(base_url, params=params)
    data = response.json()
    if data['Response'] != 'Success':
        raise ValueError(f"Ошибка API: {data.get('Message', 'Неизвестная ошибка')}")

    df = pd.DataFrame(data['Data']['Data'])
    df['date'] = pd.to_datetime(df['time'], unit='s')
    df['avg_price'] = (df['high'] + df['low']) / 2
    df['volume'] = df['volumeto']
    df = df[['date', 'avg_price', 'volume']]
    return df[df['date'].notnull() & (df['avg_price'] > 0) & (df['date'] >= start_date)]

coins = {'BTC': 'Bitcoin', 'ETH': 'Ethereum'}
start_date = datetime(2023, 1, 1)
end_date = datetime(2025, 1, 1)
all_data = []

for symbol, name in coins.items():
    df = get_crypto_data(symbol, start_date, end_date)
    df['ticker'] = symbol
    all_data.append(df)
    time.sleep(1)

full_df = pd.concat(all_data)
full_df.to_csv(os.path.join(output_dir, "crypto_raw_data.csv"), index=False)

# Блок 4: Структурированный стриминг с Apache Spark
# Симулируем стриминг и выполняем агрегацию по 5-минутным окнам
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, avg, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Инициализация Spark
spark = SparkSession.builder.appName("CryptoStreaming").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# Определяем схему
schema = StructType([
    StructField("ticker", StringType(), True),
    StructField("date", TimestampType(), True),
    StructField("avg_price", DoubleType(), True),
    StructField("volume", DoubleType(), True)
])

# Читаем CSV как Spark DataFrame
df = pd.read_csv(os.path.join(output_dir, "crypto_raw_data.csv"))
df['date'] = pd.to_datetime(df['date'])
spark_df = spark.createDataFrame(df[['ticker', 'date', 'avg_price', 'volume']], schema)

# Сохраняем данные для симуляции стриминга
temp_dir = os.path.join(output_dir, "temp_streaming_input")
os.makedirs(temp_dir, exist_ok=True)
spark_df.write.mode("overwrite").csv(temp_dir)

# Настраиваем стриминг
streaming_df = spark.readStream.schema(schema).option("header", True).csv(temp_dir)
streaming_df = streaming_df.withWatermark("date", "10 minutes")

# Выполняем агрегацию по 5-минутным окнам
aggregated_df = streaming_df.groupBy(
    col("ticker"),
    window(col("date"), "5 minutes").alias("window")
).agg(
    avg("avg_price").alias("avg_price"),
    avg("volume").alias("avg_volume")
).select(
    col("ticker"),
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    col("avg_price"),
    col("avg_volume")
)

# Сохраняем результаты стриминга
output_path = os.path.join(output_dir, "crypto_streaming_output")
query = aggregated_df.writeStream \
    .format("csv") \
    .option("path", output_path) \
    .option("checkpointLocation", os.path.join(output_dir, "checkpoints")) \
    .option("header", True) \
    .outputMode("append") \
    .start()

# Ожидаем завершения стриминга (60 секунд для теста)
query.awaitTermination(60)

# Блок 5: Сохранение агрегированных данных в SQLite
# Записываем данные в базу данных
monthly_data = spark.read.csv(os.path.join(output_dir, "crypto_streaming_output"), header=True, inferSchema=True)
monthly_df = monthly_data.toPandas().dropna()

# Добавляем криптовалюты
crypto_data = [(ticker, ticker) for ticker in monthly_df['ticker'].unique()]
cursor.executemany("INSERT OR IGNORE INTO Cryptocurrency (ticker, name) VALUES (?, ?)", crypto_data)

# Добавляем даты
dates = monthly_df[['window_start']].drop_duplicates()
date_data = [(row['window_start'].year, row['window_start'].month, row['window_start'].day) for _, row in dates.iterrows()]
cursor.executemany("INSERT OR IGNORE INTO Date (year, month, day) VALUES (?, ?, ?)", date_data)
conn.commit()

# Получаем ID криптовалют и дат
cursor.execute("SELECT ticker, crypto_id FROM Cryptocurrency")
crypto_ids = {row[0]: row[1] for row in cursor.fetchall()}
cursor.execute("SELECT year, month, day, date_id FROM Date")
date_ids = {(row[0], row[1], row[2]): row[3] for row in cursor.fetchall()}

# Добавляем данные о ценах
price_data = []
for _, row in monthly_df.iterrows():
    date = row['window_start']
    year, month, day = date.year, date.month, date.day
    if (year, month, day) in date_ids and row['ticker'] in crypto_ids:
        price_data.append((
            crypto_ids[row['ticker']],
            date_ids[(year, month, day)],
            row['avg_price'],
            row['avg_volume']
        ))

cursor.executemany(
    "INSERT INTO PriceData (crypto_id, date_id, avg_price, volume) VALUES (?, ?, ?, ?)",
    price_data
)
conn.commit()

# Блок 6: Шифрование файлов
# Шифруем CSV и базу данных
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes

def encrypt_file(input_file, output_file, key):
    cipher = AES.new(key, AES.MODE_EAX)
    with open(input_file, 'rb') as f:
        data = f.read()
    ciphertext, tag = cipher.encrypt_and_digest(data)
    with open(output_file, 'wb') as f:
        [f.write(x) for x in (cipher.nonce, tag, ciphertext)]

key = get_random_bytes(16)
with open(os.path.join(output_dir, "key.txt"), "wb") as f:
    f.write(key)

encrypt_file(
    os.path.join(output_dir, "crypto_raw_data.csv"),
    os.path.join(output_dir, "crypto_raw_data_encrypted.bin"),
    key
)
encrypt_file(
    os.path.join(output_dir, "crypto.db"),
    os.path.join(output_dir, "crypto_encrypted.db"),
    key
)

# Блок 7: Вывод результатов
# Выполняем SQL-запрос для отображения данных
cursor.execute('''
SELECT c.ticker, d.year, d.month, p.avg_price
FROM PriceData p
JOIN Cryptocurrency c ON p.crypto_id = c.crypto_id
JOIN Date d ON p.date_id = d.date_id
ORDER BY c.ticker, d.year, d.month
''')
results = cursor.fetchall()

for row in results:
    print(f"{row[0]} {row[1]}-{row[2]}: ${row[3]:.2f}")

# Блок 8: Очистка
# Завершаем работу
query.stop()
spark.stop()
conn.close()
drive.flush_and_unmount()

Collecting pycryptodome
  Downloading pycryptodome-3.22.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Downloading pycryptodome-3.22.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m16.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pycryptodome
Successfully installed pycryptodome-3.22.0
Mounted at /content/drive
