In [31]:
!pip install dash dash-bootstrap-components

Writing the following data to SQLite:
            id      name symbol current_price              last_updated
0      bitcoin   Bitcoin    btc        103557  2025-05-31T09:59:38.456Z
1     ethereum  Ethereum    eth       2522.14  2025-05-31T09:59:37.338Z
2       tether    Tether   usdt           1.0  2025-05-31T09:59:37.679Z
3       ripple       XRP    xrp          2.14  2025-05-31T09:59:38.117Z
4  binancecoin       BNB    bnb        654.59  2025-05-31T09:59:38.801Z
Data written to /content/coingecko_data.db


In [1]:
!pip install jupyter-dash dash-bootstrap-components pyngrok

Collecting jupyter-dash
  Downloading jupyter_dash-0.4.2-py3-none-any.whl.metadata (3.6 kB)
Collecting dash-bootstrap-components
  Downloading dash_bootstrap_components-2.0.3-py3-none-any.whl.metadata (18 kB)
Collecting pyngrok
  Downloading pyngrok-7.2.9-py3-none-any.whl.metadata (9.3 kB)
Collecting dash (from jupyter-dash)
  Downloading dash-3.0.4-py3-none-any.whl.metadata (10 kB)
Collecting retrying (from jupyter-dash)
  Downloading retrying-1.3.4-py3-none-any.whl.metadata (6.9 kB)
Collecting ansi2html (from jupyter-dash)
  Downloading ansi2html-1.9.2-py3-none-any.whl.metadata (3.7 kB)
Collecting flask (from jupyter-dash)
  Downloading flask-3.0.3-py3-none-any.whl.metadata (3.2 kB)
Collecting Werkzeug<3.1 (from dash->jupyter-dash)
  Downloading werkzeug-3.0.6-py3-none-any.whl.metadata (3.7 kB)
Collecting jedi>=0.16 (from ipython->jupyter-dash)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading jupyter_dash-0.4.2-py3-none-any.whl (23 kB)
Downloading dash_boot

In [2]:
import pandas as pd
import threading
import time
import requests
import sqlite3
import os
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, unix_timestamp, col, avg, stddev, mean, lag, when
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, LongType
from pyspark.sql.window import Window
import dash
import dash_bootstrap_components as dbc
from dash import html, dcc, Input, Output
from jupyter_dash import JupyterDash
import plotly.express as px
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

In [3]:
spark = SparkSession.builder.appName("CoinGeckoStreamingApp").getOrCreate()
# Create a StreamingContext with a batch interval of 1 second
ssc = StreamingContext(spark.sparkContext, 60)



In [4]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("symbol", StringType(), True),
    StructField("current_price", StringType(), True),
    StructField("last_updated", StringType(), True)
])

In [5]:
api_url = "https://api.coingecko.com/api/v3/coins/markets"

In [6]:
def fetch_coingecko_data():
    response = requests.get(api_url, params={"vs_currency": "usd"})
    if response.status_code == 200:
        return response.json()
    else:
        return []

In [7]:
def write_to_sqlite(df):
    # Convert Spark DataFrame to Pandas DataFrame
    pandas_df = df.toPandas()

    # Optional: Display the DataFrame
    print("Writing the following data to SQLite:")
    print(pandas_df.head())

    # Define SQLite DB path (stored locally in Colab)
    db_path = "/content/coingecko_data.db"

    # Create a connection to the SQLite database
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # Define table name
    table_name = "coingecko_market"

    # Create table if it doesn't exist
    cursor.execute(f'''
        CREATE TABLE IF NOT EXISTS {table_name} (
            id TEXT,
            name TEXT,
            symbol TEXT,
            current_price FLOAT,
            last_updated TEXT
        )
    ''')

    # Insert data into the table
    for _, row in pandas_df.iterrows():
        cursor.execute(f'''
            INSERT INTO {table_name} (id, name, symbol, current_price, last_updated)
            VALUES (?, ?, ?, ?, ?)
        ''', tuple(row))

    # Commit and close the connection
    conn.commit()
    conn.close()
    print(f"Data written to {db_path}")


In [8]:
def fetch_and_insert():
  while True:
      data = fetch_coingecko_data()
      if data:
          df = spark.createDataFrame(data, schema)
          write_to_sqlite(df)
      time.sleep(60)

In [9]:
data_thread = threading.Thread(target=fetch_and_insert)
data_thread.daemon = True  # will stop when notebook stops
data_thread.start()

In [11]:
# Path to your database file
db_path = "/content/coingecko_data.db"

# Connect to the database
conn = sqlite3.connect(db_path)

# Query the table into a Pandas DataFrame
df = pd.read_sql_query("SELECT * FROM coingecko_market", conn)

# Show the first few rows
df

Unnamed: 0,id,name,symbol,current_price,last_updated
0,bitcoin,Bitcoin,btc,103554.000000,2025-05-31T10:02:45.430Z
1,ethereum,Ethereum,eth,2521.730000,2025-05-31T10:02:35.836Z
2,tether,Tether,usdt,1.000000,2025-05-31T10:02:38.404Z
3,ripple,XRP,xrp,2.140000,2025-05-31T10:02:36.682Z
4,binancecoin,BNB,bnb,654.600000,2025-05-31T10:02:45.241Z
...,...,...,...,...,...
95,xdce-crowd-sale,XDC Network,xdc,0.058831,2025-05-31T10:02:45.155Z
96,mantle-staked-ether,Mantle Staked Ether,meth,2689.130000,2025-05-31T10:02:37.450Z
97,paypal-usd,PayPal USD,pyusd,0.999582,2025-05-31T10:02:42.933Z
98,maker,Maker,mkr,1561.800000,2025-05-31T10:02:44.598Z


In [12]:
def add_indicators(df):
    df = df.sort_values('last_updated')
    df['moving_avg'] = df.groupby('symbol')['current_price'].transform(lambda x: x.rolling(5, min_periods=1).mean())
    df['price_mean'] = df.groupby('symbol')['current_price'].transform('mean')
    df['price_std'] = df.groupby('symbol')['current_price'].transform('std').fillna(1)
    df['z_score'] = (df['current_price'] - df['price_mean']) / df['price_std']
    return df

In [13]:
df = add_indicators(df)

In [14]:
df

Unnamed: 0,id,name,symbol,current_price,last_updated,moving_avg,price_mean,price_std,z_score
44,blackrock-usd-institutional-digital-liquidity-...,BlackRock USD Institutional Digital Liquidity ...,buidl,1.000000,2025-05-31T09:55:05.434Z,1.000000,1.000000,1.000000,0.000000
7,dogecoin,Dogecoin,doge,0.187664,2025-05-31T10:02:35.165Z,0.187664,0.187664,1.000000,0.000000
66,filecoin,Filecoin,fil,2.500000,2025-05-31T10:02:35.480Z,2.500000,2.500000,1.000000,0.000000
6,usd-coin,USDC,usdc,0.999813,2025-05-31T10:02:35.552Z,0.999813,0.999592,0.000313,0.707107
62,fetch-ai,Artificial Superintelligence Alliance,fet,0.726427,2025-05-31T10:02:35.553Z,0.726427,0.726427,1.000000,0.000000
...,...,...,...,...,...,...,...,...,...
23,hedera-hashgraph,Hedera,hbar,0.162698,2025-05-31T10:02:45.241Z,0.162698,0.162698,1.000000,0.000000
4,binancecoin,BNB,bnb,654.600000,2025-05-31T10:02:45.241Z,654.600000,654.600000,1.000000,0.000000
61,fasttoken,Fasttoken,ftn,4.430000,2025-05-31T10:02:45.266Z,4.430000,4.430000,1.000000,0.000000
82,rocket-pool-eth,Rocket Pool ETH,reth,2868.650000,2025-05-31T10:02:45.315Z,2868.650000,2868.650000,1.000000,0.000000


In [26]:
app = JupyterDash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])


JupyterDash is deprecated, use Dash instead.
See https://dash.plotly.com/dash-in-jupyter for more details.



In [27]:
symbols = df['symbol'].unique()

In [28]:
app.layout = dbc.Container([
    html.H1("Real-Time Crypto Analytics Dashboard"),
    dcc.Tabs(id="tabs", value='overview', children=[
        dcc.Tab(label='Overview', value='overview'),
        dcc.Tab(label='Trends', value='trends'),
        dcc.Tab(label='Alerts', value='alerts'),
        dcc.Tab(label='Trend Prediction', value='trend'),
    ]),
    html.Br(),
    dcc.Dropdown(id='symbol-select', options=[{'label': s, 'value': s} for s in symbols], value='btc', clearable=False),
    html.Div(id='tab-content'),
    dcc.Interval(id='refresh', interval=60000, n_intervals=0)
])


In [29]:
@app.callback(
    Output('tab-content', 'children'),
    Input('tabs', 'value'),
    Input('symbol-select', 'value'),
    Input('refresh', 'n_intervals')
)
def update_price_chart(tab, symbol, n):
    conn = sqlite3.connect('/content/coingecko_data.db')
    df = pd.read_sql_query("SELECT * FROM coingecko_market WHERE symbol=?", conn, params=(symbol,))
    conn.close()
    # ... Fetching and processing data ...
     # Convert current_price to float for plotting
    df['current_price'] = pd.to_numeric(df['current_price'], errors='coerce')
    df['last_updated'] = pd.to_datetime(df['last_updated'])
    df = add_indicators(df)

    if tab == 'overview':
        fig = px.line(df, x='last_updated', y='current_price', title=f"{symbol.upper()} Price Over Time")
        return dcc.Graph(figure=fig)

    elif tab == 'trends':
        fig = px.line(df, x='last_updated', y='moving_avg', title=f"{symbol.upper()} 5-Point Moving Average")
        return dcc.Graph(figure=fig)

    elif tab == 'alerts':
        alert_df = df[df['z_score'].abs() > 2]
        if alert_df.empty:
            return html.Div("No price anomalies detected.")
        fig = px.scatter(alert_df, x='last_updated', y='current_price', color='z_score',
                         title=f"Anomalies in {symbol.upper()} Price (Z-Score > 2)")
        return dcc.Graph(figure=fig)

    elif tab == 'trend':
        sdf = spark.createDataFrame(df[['last_updated', 'current_price']].dropna())
        sdf = sdf.withColumn("price_lag", lag("current_price", 1).over(Window.orderBy("last_updated")))
        sdf = sdf.withColumn("price_change", col("current_price") - col("price_lag"))
        sdf = sdf.withColumn("label", when(col("price_change") > 0.01, "Rise")
                                          .when(col("price_change") < -0.01, "Drop")
                                          .otherwise("Stable"))

        label_indexer = StringIndexer(inputCol="label", outputCol="label_index", handleInvalid='keep')
        assembler = VectorAssembler(inputCols=["price_change"], outputCol="features", handleInvalid='skip')
        classifier = RandomForestClassifier(labelCol="label_index", featuresCol="features", numTrees=10)
        pipeline = Pipeline(stages=[label_indexer, assembler, classifier])

        train, test = sdf.randomSplit([0.8, 0.2], seed=42)
        model = pipeline.fit(train)
        predictions = model.transform(test).toPandas()

        fig = px.scatter(predictions, x='last_updated', y='price_change', color='label',
                         title=f"{symbol.upper()} Trend Classification (Rise/Drop/Stable)")
        return dcc.Graph(figure=fig)


In [30]:
app.run(mode='inline', debug=True, port=8050)

<IPython.core.display.Javascript object>

## Price Trend Classification and Next Price Prediction using Spark MLlib

In [None]:
df_ML = spark.createDataFrame(df)
df_ML.show()

df_ML = df_ML.withColumn("current_price", df_ML["current_price"].cast("float"))
df_ML = df_ML.withColumn("timestamp", to_timestamp("last_updated"))

# Create price change column and label
window = Window.orderBy("timestamp")
df_ML = df_ML.withColumn("prev_price", lag("current_price", 1).over(window))
df_ML = df_ML.withColumn("price_change", (df_ML["current_price"] - df_ML["prev_price"]) / df_ML["prev_price"])
df_ML = df_ML.fillna(0)

df_ML = df_ML.withColumn("label",
    when(df_ML["price_change"] > 0.01, "rise")
    .when(df_ML["price_change"] < -0.01, "drop")
    .otherwise("stable")
)


+--------------------+--------------------+------+-------------+--------------------+----------+----------+------------------+------------------+
|                  id|                name|symbol|current_price|        last_updated|moving_avg|price_mean|         price_std|           z_score|
+--------------------+--------------------+------+-------------+--------------------+----------+----------+------------------+------------------+
|blackrock-usd-ins...|BlackRock USD Ins...| buidl|          1.0|2025-05-30T07:35:...|       1.0|       1.0|               1.0|               0.0|
|             bitcoin|             Bitcoin|   btc|     105522.0|2025-05-30T07:39:...|  105522.0|  105522.0|               1.0|               0.0|
|         sei-network|                 Sei|   sei|     0.205684|2025-05-30T07:39:...|  0.205684|  0.205684|               1.0|               0.0|
|         binancecoin|                 BNB|   bnb|       669.98|2025-05-30T07:39:...|    669.98|    669.98|               1.

In [None]:
df_ML.show()

+--------------------+--------------------+--------+-------------+--------------------+----------+------------------+------------------+------------------+--------------------+----------+--------------------+------+
|                  id|                name|  symbol|current_price|        last_updated|moving_avg|        price_mean|         price_std|           z_score|           timestamp|prev_price|        price_change| label|
+--------------------+--------------------+--------+-------------+--------------------+----------+------------------+------------------+------------------+--------------------+----------+--------------------+------+
|blackrock-usd-ins...|BlackRock USD Ins...|   buidl|          1.0|2025-05-29T11:30:...|       1.0|               1.0|               1.0|               0.0|2025-05-29 11:30:...|       0.0|                 0.0|stable|
|                usds|                USDS|    usds|     0.999769|2025-05-29T11:34:...|  0.999769|          0.999769|               1.0|

In [None]:
# Encode label and assemble features
label_indexer = StringIndexer(inputCol="label", outputCol="label_index")
assembler = VectorAssembler(inputCols=["price_change"], outputCol="features")
classifier = RandomForestClassifier(labelCol="label_index", featuresCol="features", numTrees=10)
pipeline = Pipeline(stages=[label_indexer, assembler, classifier])

# Split and train
train, test = df_ML.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train)

# Predict
predictions = model.transform(test)
predictions.select("timestamp", "price_change", "label", "prediction").show(10)


+--------------------+-------------------+------+----------+
|           timestamp|       price_change| label|prediction|
+--------------------+-------------------+------+----------+
|2025-05-29 11:34:...| 11.043436253658681|  rise|       0.0|
|2025-05-29 11:34:...|-0.9749576619759736|  drop|       1.0|
|2025-05-29 11:34:...| 238.65037206220993|  rise|       0.0|
|2025-05-29 11:30:...|                0.0|stable|       0.0|
|2025-05-29 11:34:...|  108875.9929659463|  rise|       0.0|
|2025-05-29 11:34:...|-0.7742681867905769|  drop|       1.0|
|2025-05-29 11:34:...|-0.9996330538299727|  drop|       1.0|
|2025-05-29 11:34:...|  43.33970821428176|  rise|       0.0|
|2025-05-29 11:34:...| -0.890697063598311|  drop|       1.0|
|2025-05-29 11:34:...| 3.9309717126060724|  rise|       0.0|
+--------------------+-------------------+------+----------+
only showing top 10 rows



In [None]:
# Next price prediction (regression)
# Create lag features: price_t, price_t-1
df_ML = df_ML.withColumn("price_t-1", lag("current_price", 1).over(window))
df_ML = df_ML.withColumn("price_t-2", lag("current_price", 2).over(window))
df_ML = df_ML.withColumn("price_t-3", lag("current_price", 3).over(window))

reg_df = df_ML.select("last_updated", "current_price", "price_t-1", "price_t-2", "price_t-3").dropna()

vec_assembler = VectorAssembler(inputCols=["price_t-1", "price_t-2", "price_t-3"], outputCol="features")
regression = LinearRegression(featuresCol="features", labelCol="current_price")
reg_pipeline = Pipeline(stages=[vec_assembler, regression])

# Train/test split and fit
train, test = reg_df.randomSplit([0.8, 0.2], seed=42)
reg_model = reg_pipeline.fit(train)

# Predict next price
reg_predictions = reg_model.transform(test)
reg_predictions.show()
# reg_predictions.select("current_price", "prediction").show(10)


Writing the following data to SQLite:
            id      name symbol current_price              last_updated
0      bitcoin   Bitcoin    btc        105251  2025-05-30T07:48:22.094Z
1     ethereum  Ethereum    eth       2627.04  2025-05-30T07:48:20.348Z
2       tether    Tether   usdt           1.0  2025-05-30T07:48:23.029Z
3       ripple       XRP    xrp          2.19  2025-05-30T07:48:20.481Z
4  binancecoin       BNB    bnb        669.85  2025-05-30T07:48:29.564Z
Data written to /content/coingecko_data.db
+--------------------+-------------+---------+---------+---------+--------------------+-----------------+
|        last_updated|current_price|price_t-1|price_t-2|price_t-3|            features|       prediction|
+--------------------+-------------+---------+---------+---------+--------------------+-----------------+
|2025-05-30T07:39:...|     0.998379|     11.3|   669.98| 0.205684|[11.3000001907348...|4871.208634332604|
|2025-05-30T07:39:...|         9.15|     4.97|    32.18|     1.

In [None]:
df_ML.show()

+--------------------+--------------------+--------+-------------+--------------------+----------+------------------+------------------+------------------+--------------------+----------+--------------------+------+---------+---------+---------+
|                  id|                name|  symbol|current_price|        last_updated|moving_avg|        price_mean|         price_std|           z_score|           timestamp|prev_price|        price_change| label|price_t-1|price_t-2|price_t-3|
+--------------------+--------------------+--------+-------------+--------------------+----------+------------------+------------------+------------------+--------------------+----------+--------------------+------+---------+---------+---------+
|blackrock-usd-ins...|BlackRock USD Ins...|   buidl|          1.0|2025-05-29T11:30:...|       1.0|               1.0|               1.0|               0.0|2025-05-29 11:30:...|       0.0|                 0.0|stable|     NULL|     NULL|     NULL|
|               