<a href="https://colab.research.google.com/github/antonum/Timescale-Workshops/blob/main/Tutorials/analyze_financial_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Analyze financial tick data


 This notebook is an adaptation of https://docs.timescale.com/tutorials/latest/financial-tick-data/

 The financial industry is extremely data-heavy and relies on real-time and historical data for decision-making, risk assessment, fraud detection, and market analysis. Timescale simplifies management of these large volumes of data, while also providing you with meaningful analytical insights and optimizing storage costs.
  
  To analyze financial data, you can chart the open, high, low, close, and volume (OHLCV) information for a financial asset. Using this data, you can create candlestick charts that make it easier to analyze the price changes of financial assets over time. You can use candlestick charts to examine trends in stock, cryptocurrency, or NFT prices.
  
 In this tutorial, you use real raw financial data provided by Twelve Data, create an aggregated candlestick view, query the aggregated data.
 
 ## Prerequisites
 To follow the steps on this page:

 Create a target Timescale Cloud service with time-series and analytics enabled.
 https://console.cloud.timescale.com/signup
 
 You need your connection details like `"postgres://tsdbadmin:xxxxxxx.yyyyy.tsdb.cloud.timescale.com:39966/tsdb?sslmode=require"` 
 
 Alternatevely you can use self-hosted TimescaleDB. In the Google Colab environment we will install TimescaleDB and use it as a local database.

## About OHLCV data and candlestick charts
 The financial sector regularly uses candlestick charts to visualize the price change of an asset. 
 Each candlestick represents a time period, such as one minute or one hour, and shows how the asset's price changed during that time.

 Candlestick charts are generated from the open, high, low, close, and volume data for each financial asset during the time period. 
 This is often abbreviated as OHLCV:
 - Open: opening price
 - High: highest price
 - Low: lowest price
 - Close: closing price
 - Volume: volume of transactions

 ![candlestick data](https://assets.timescale.com/docs/images/tutorials/intraday-stock-analysis/timescale_cloud_candlestick.png)

In [None]:
import os
### Default connection for in-notebook Timescale ###
TS_CONNECTION="postgres://postgres:password@localhost/postgres"

### Use environment variable ###
#TS_CONNECTION = os.getenv("TS_CONNECTION", "postgres://postgres:password@localhost/postgres")

### Use your own Timescale Cloud instance ###
#TS_CONNECTION="postgres://tsdbadmin:xxxxxxx.yyyyy.tsdb.cloud.timescale.com:39966/tsdb?sslmode=require"

### Use colab secret ###
#from google.colab import userdata
#TS_CONNECTION=userdata.get('TS_CONNECTION')

### Set environment variable to be used in psql CLI ###
os.environ["TS_CONNECTION"]=TS_CONNECTION

In [None]:
#@title Install Timescale (takes 2-3 minutes)
%%bash
set -e # Exit immediately if a command exits with a non-zero status.

# --- Configuration ---
PG_VERSION="17"
PGVECTORSCALE_VERSION="0.7.0"
PG_PASSWORD="password" # Consider using a more secure password

echo "--- 1. Installing Prerequisites & Adding Repositories ---"
# Install essential packages quietly
apt-get -qq -y install gnupg postgresql-common apt-transport-https lsb-release wget > /dev/null 2>&1

# Add the official PostgreSQL repository
# The 'yes |' answers confirmation prompts automatically. Output redirected.
yes | /usr/share/postgresql-common/pgdg/apt.postgresql.org.sh > /dev/null 2>&1

# Add the TimescaleDB repository
echo "deb https://packagecloud.io/timescale/timescaledb/ubuntu/ $(lsb_release -c -s) main" | sudo tee /etc/apt/sources.list.d/timescaledb.list > /dev/null
# Add the TimescaleDB GPG key using the recommended method (avoids apt-key add)
wget --quiet -O - https://packagecloud.io/timescale/timescaledb/gpgkey | sudo gpg --dearmor -o /etc/apt/trusted.gpg.d/timescaledb.gpg

echo "--- 2. Updating Package List & Installing PostgreSQL + Extensions ---"
# Update package list quietly (should suppress apt-key warnings too)
apt-get -qq update > /dev/null 2>&1

# Install PostgreSQL, TimescaleDB, pgvector, toolkit, and client
apt-get -qq -y install \
  "timescaledb-2-postgresql-${PG_VERSION}" \
  "postgresql-client-${PG_VERSION}" \
  "postgresql-${PG_VERSION}-pgvector" \
  "timescaledb-toolkit-postgresql-${PG_VERSION}" > /dev/null 2>&1

echo "--- 3. Installing pgvectorscale ---"
# Download and install pgvectorscale
wget --quiet "https://github.com/timescale/pgvectorscale/releases/download/${PGVECTORSCALE_VERSION}/pgvectorscale-${PGVECTORSCALE_VERSION}-pg${PG_VERSION}-amd64.zip" -O pgvectorscale.zip
unzip -q pgvectorscale.zip # Use -q for quiet unzip
# Install the .deb package quietly
apt-get -qq -y install "./pgvectorscale-postgresql-${PG_VERSION}_${PGVECTORSCALE_VERSION}-Linux_amd64.deb" > /dev/null 2>&1

# Clean up downloaded files
rm pgvectorscale.zip "./pgvectorscale-postgresql-${PG_VERSION}_${PGVECTORSCALE_VERSION}-Linux_amd64.deb"

echo "--- 4. Configuring PostgreSQL & TimescaleDB ---"
# Tune PostgreSQL for TimescaleDB
timescaledb-tune --quiet --yes  > /dev/null 2>&1

# Restart PostgreSQL service to apply changes
service postgresql restart
sleep 2 # Give the service a moment to restart fully

echo "--- 5. Setting Up Database User and Extensions ---"
# Set the password for the default postgres user
sudo -u postgres psql -c "ALTER USER postgres PASSWORD '${PG_PASSWORD}'" > /dev/null

# Connect as the postgres user and create extensions quietly
psql -d "postgres://postgres:${PG_PASSWORD}@localhost/postgres" > /dev/null <<EOF
CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE;
CREATE EXTENSION IF NOT EXISTS timescaledb_toolkit CASCADE;
CREATE EXTENSION IF NOT EXISTS vector CASCADE;
CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE;
EOF

echo "--- Installation and Setup Complete ---"



In [None]:
#@title Init psycopg2 connection to Timescale
import pandas as pd
import psycopg2
import time

# establish connection to Timescale
conn = psycopg2.connect(TS_CONNECTION)
cursor = conn.cursor()
conn.autocommit = True


# helper function to convert SQL Results to the dataframe
def execute_sql(query, cursor=cursor):
    try:
        time_start = time.time()
        cursor.execute(query)
        conn.commit()
        time_end = time.time()
        print(f"Query executed in {round(time_end - time_start,3)} seconds")
        # Check if query returns data (SELECT)
        if cursor.description:  # If description is not None, query returned data
            columns = [desc[0] for desc in cursor.description]
            data = cursor.fetchall()
            df = pd.DataFrame(data, columns=columns)
            return df
        else:
            # Query was likely INSERT, CREATE TABLE, UPDATE, DELETE, etc.
            return f"Rows affected: {cursor.rowcount}"  # Return the number of rows affected

    except psycopg2.Error as e:
        print(f"Error executing SQL query: {e}")
        conn.rollback()  # Rollback changes in case of error
        return None  # Or raise the exception if you prefer

## Setup
 Drop tables and associated objects

In [None]:
query = """
DROP TABLE IF EXISTS crypto_ticks CASCADE;
DROP TABLE IF EXISTS crypto_assets CASCADE;
"""
execute_sql(query)

Create a standard PostgreSQL table to store the real-time cryptocurrency data:

In [None]:
query = """
CREATE TABLE crypto_ticks (
    "time" TIMESTAMPTZ,
    symbol TEXT,
    price DOUBLE PRECISION,
    day_volume NUMERIC
);
"""
execute_sql(query)

Convert the standard table into a hypertable partitioned on the time column using the create_hypertable() function provided by Timescale. 
 You must provide the name of the table and the column in that table that holds the timestamp data to use for partitioning:

In [None]:
query = """
SELECT create_hypertable('crypto_ticks', by_range('time'));
"""
execute_sql(query)

Create a standard PostgreSQL table for relational data

In [None]:
query = """
CREATE TABLE crypto_assets (
    symbol TEXT UNIQUE,
    "name" TEXT
);
"""
execute_sql(query)

## Load financial data
 This tutorial uses real-time cryptocurrency data, also known as tick data, from Twelve Data. 
 To ingest data into the tables that you created, you need to download the dataset, 
 then upload the data to your Timescale Cloud service.

In [None]:
!psql -d $TS_CONNECTION -c '\! wget https://assets.timescale.com/docs/downloads/candlestick/crypto_sample.zip'

In [None]:
!psql -d $TS_CONNECTION -c '\! unzip -o crypto_sample.zip'

Load the CSV files into the tables (takes 30-40 seconds)
 At the psql prompt, use the COPY command to transfer data into your Timescale instance. 
 If the .csv files aren't in your current directory, specify the file paths in these commands:

In [None]:
!psql -d $TS_CONNECTION -c '\COPY crypto_ticks FROM '\''tutorial_sample_tick.csv'\'' CSV HEADER;'

In [None]:
!psql -d $TS_CONNECTION -c '\COPY crypto_assets FROM '\''tutorial_sample_assets.csv'\'' CSV HEADER;'

## Preview data

In [None]:
query = """
SELECT * FROM crypto_ticks LIMIT 10;
"""
execute_sql(query)

Preview the reference data

In [None]:
query = """
SELECT * FROM crypto_assets LIMIT 10;
"""
execute_sql(query)

## Create indexes
 Indexes are used to speed up the retrieval of data from a database table.
 In this case, you create an index on the symbol column of the crypto_assets and crypto_ticks tables.
 Hypertables automatically create indexes on the time column, so you don't need to create an index on that column.

In [None]:
query = """
CREATE INDEX ON crypto_assets (symbol);
"""
execute_sql(query)

In [None]:
query = """
CREATE INDEX ON crypto_ticks (symbol);
"""
execute_sql(query)

## Examine hypertable details (psql command)
 Note that hypertable includes multiple partitions (chunks) of data.

 You can also see two indices created on the time column and the symbol column.

In [None]:
!psql -d $TS_CONNECTION -c '\d+ crypto_ticks'

## Examine Hypertable partitions
 Timescale provides SQL API (functions, views, procedures) to manage hypertables and chunks.
 The timescaledb_information.chunks view provides information about the chunks of a hypertable.

In [None]:
query = """
SELECT 
    chunk_name, 
    range_start, 
    range_end, 
    is_compressed 
FROM 
    timescaledb_information.chunks
WHERE 
    hypertable_name = 'crypto_ticks';
"""
execute_sql(query)

## JOIN Hypertable and regular table
 While organized differently internally, hypertables are fully-featured PostgreSQL tables.
 You can use standard SQL to query the data in a hypertable, including joining it with other tables.
 In this example, you join the crypto_ticks hypertable with the crypto_assets table to get the name of the asset.

 Optionally add EXPLAIN ANALYZE to see the query plan.
 You would see that the query goes through internal chunks of the hypertable like `_hyper_60_285_chunk`

In [None]:
query = """
-- EXPLAIN ANALYZE 
SELECT t.time, t.symbol, t.price, t.day_volume, a.name
FROM crypto_ticks t
JOIN crypto_assets a
ON t.symbol = a.symbol
ORDER BY t.time DESC
LIMIT 10;
"""
execute_sql(query)

Enable timing tracking for the PSQL session

In [None]:
!psql -d $TS_CONNECTION -c '\timing on'

## Calculate one-day candlestick data on non-compresssed Hypertable

In [None]:
query = """
SELECT
    time_bucket('1 day', time) AS bucket,
    symbol,
    FIRST(price, time) AS "open",
    MAX(price) AS high,
    MIN(price) AS low,
    LAST(price, time) AS "close",
    LAST(day_volume, time) AS day_volume
FROM crypto_ticks
WHERE symbol = 'BTC/USD' AND time >= NOW() - INTERVAL '14 days'
GROUP BY bucket, symbol
ORDER BY bucket;
"""
execute_sql(query)

Remember the time it took to run the query. Later we will compare the performance of the same query 
 on compressed data and preaggregated data in Continuos aggregate

In [None]:
!pip -q install mplfinance
import mplfinance as mpf

query = """
SELECT
    time_bucket('1 day', time) AS bucket,
    symbol,
    FIRST(price, time) AS "open",
    MAX(price) AS high,
    MIN(price) AS low,
    LAST(price, time) AS "close",
    LAST(day_volume, time) AS day_volume
FROM crypto_ticks
WHERE symbol = 'BTC/USD' AND time >= NOW() - INTERVAL '14 days'
GROUP BY bucket, symbol
ORDER BY bucket;
"""
df=execute_sql(query)
df_plot = df.sort_values('bucket')
df_plot = df_plot.set_index('bucket')
df_plot = df_plot.rename(columns={
    'open': 'Open',
    'high': 'High',
    'low': 'Low',
    'close': 'Close',
    'day_volume': 'Volume'
})
df_plot["Volume"] = pd.to_numeric(df_plot["Volume"], errors='coerce')

# Create the candlestick chart
mpf.plot(df_plot, type='candle', style='yahoo', volume=False, title='BTC/USD 1-Day Candlestick Chart (Last 14 Days)')
     

## Enable Columnarstore (Compression)
 To enable columnarstore, you need to set the timescaledb.enable_columnstore parameter to true.
 This parameter is set at the table level, so you need to run the ALTER TABLE command on the crypto_ticks hypertable.
 The timescaledb.compress_orderby parameter specifies the order in which the data is compressed.
 The timescaledb.segmentby parameter specifies the column by which the data is segmented.
 The segmentby column is used to group the data into segments, which are then compressed separately.

In [None]:
query = """
ALTER TABLE crypto_ticks 
SET (
    timescaledb.enable_columnstore = true, 
    timescaledb.segmentby = 'symbol',
    timescaledb.compress_orderby='time DESC'
);
"""
execute_sql(query)

Enambling a columnarstore for the table by itself does not compress the data.
 You can either manually compress hypertable chunks or create a policy to automatically compress chunks.
 The compress_chunk() function compresses the chunk of data in the hypertable.
 
 ### Manually compress all the chunks of the hypertable
 TODO: switch to convert_to_columnarstore()?

In [None]:
query = """
SELECT compress_chunk(c, true) FROM show_chunks('crypto_ticks') c;
 -- SELECT decompress_chunk(c, true) FROM show_chunks('crypto_ticks') c;
"""
execute_sql(query)

### Automatically compress Hypertable with a policy
 Create a job that automatically converts chunks in a hypertable to the columnstore older than 1 day
 This is a preferred way to compress data in production

In [None]:
query = """
CALL add_columnstore_policy('crypto_ticks', after => INTERVAL '1d');
"""
execute_sql(query)

## Storage saved by the compression
 The hypertable_compression_stats() function returns the size of the compressed and uncompressed data in the hypertable.

In [None]:
query = """
SELECT 
  pg_size_pretty(before_compression_total_bytes) as before,
  pg_size_pretty(after_compression_total_bytes) as after
FROM hypertable_compression_stats('crypto_ticks');
"""
execute_sql(query)

In our case the compression ratio is ~10x
 
 In practice that means that to store 1TB of data in a hypertable, you need just 100GB of storage.

## Calculate one-day candlestick data on compresssed Hypertable
 This is the same query as above, but now it runs on compressed data.

In [None]:
query = """
SELECT
    time_bucket('1 day', time) AS bucket,
    symbol,
    FIRST(price, time) AS "open",
    MAX(price) AS high,
    MIN(price) AS low,
    LAST(price, time) AS "close",
    LAST(day_volume, time) AS day_volume
FROM crypto_ticks
WHERE symbol = 'BTC/USD' AND time >= NOW() - INTERVAL '14 days'
GROUP BY bucket, symbol
ORDER BY bucket;
"""
execute_sql(query)

The query runs on columnar/compressed data and it is faster then the same query on uncompressed data

## Create a continuous aggregate 
 Continuous aggregates are a TimescaleDB feature that allows you to pre-aggregate data in a hypertable and store the results in a materialized view.
 This allows you to query the pre-aggregated data instead of the raw data, which can significantly improve query performance. 
 Continuous aggregates are automatically updated as new data is ingested into the hypertable.

In [None]:
query = """
CREATE MATERIALIZED VIEW one_day_candle
WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS
    SELECT
        time_bucket('1 day', time) AS bucket,
        symbol,
        FIRST(price, time) AS "open",
        MAX(price) AS high,
        MIN(price) AS low,
        LAST(price, time) AS "close",
        LAST(day_volume, time) AS day_volume
    FROM crypto_ticks
    GROUP BY bucket, symbol;
"""
execute_sql(query)

### Create continuous aggregate policy
 The add_continuous_aggregate_policy() function creates a policy that automatically refreshes the continuous aggregate view.

 The start_offset and end_offset parameters specify the time range for the job, updating the aggregate view.

 The schedule_interval parameter specifies how often the continuous aggregate view is refreshed.

In [None]:
query = """
SELECT add_continuous_aggregate_policy('one_day_candle',
    start_offset => INTERVAL '3 days',
    end_offset => INTERVAL '1 day',
    schedule_interval => INTERVAL '1 day');
"""
execute_sql(query)

## Query continuous aggregate
 This query delivers the same results as the previous query, 
 but it runs on the continuous aggregate view instead of the raw data.
 it is significantly faster than the same query on the raw data.

In [None]:
query = """
SELECT * FROM one_day_candle
WHERE symbol = 'BTC/USD' AND bucket >= NOW() - INTERVAL '14 days'
ORDER BY bucket;
"""
execute_sql(query)

## Real Time Continuous Aggregates
 The continuous aggregate view is automatically updated as new data is ingested into the hypertable.
 Let's insert a new row into the crypto_ticks table and see how the continuous aggregate view is updated.

In [None]:
query = """
INSERT INTO crypto_ticks (time, symbol, price, day_volume)
VALUES (NOW()+INTERVAL '1day', 'BTC/USD', 110000, 30750246);
"""
execute_sql(query)

In [None]:
query = """
SELECT * FROM one_day_candle
WHERE symbol = 'BTC/USD' AND bucket >= NOW() - INTERVAL '14 days'
ORDER BY bucket;
"""
execute_sql(query)

As you can see, the continuous aggregate view is automatically updated with the new data.
 This is the stark contrast to standard Postgres Materialized view that needs to be refreshed manually
 and does not support real-time updates.