Step 1: import Bluesky Firehose Summer 2025 post data from Snowflake via a cursor

In [1]:
from datasets import Dataset
from datetime import datetime, timedelta
import os
import re
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from time import sleep
import torch
from transformers import pipeline

### GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS 
### GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS 
### GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS | GLOBAL VARS 

# this basically means "smoke em if you got em" where the "em" is NVIDIA GPU
DEVICE = 0 if torch.cuda.is_available() else -1

SF_USR = os.getenv('SF_USR')
SF_ID  = os.getenv('SF_ID')
SF_WH  = os.getenv('SF_WH')
SF_DB  = os.getenv('SF_DB')
SF_SC  = os.getenv('SF_SC')
SF_RL  = os.getenv('SF_RL')

# connect to database and init a cursor for querying
xct_params = {
    "user":                 SF_USR
   ,"account":              SF_ID
   ,"warehouse":            SF_WH
   ,"database":             SF_DB
   ,"schema":               SF_SC
   ,"role":                 SF_RL
   ,"private_key_file":     os.getenv('PRIVATE_KEY_PATH')
   ,"private_key_file_pwd": os.getenv('PRIVATE_KEY_PASSPHRASE')
   ,"authenticator":        os.getenv('SF_AUTH')
}

SF_XCT = snowflake.connector.connect(**xct_params) #connection object
CSR = SF_XCT.cursor()

In [2]:
#import needed libraries
import pandas as pd
import numpy as np
from statsmodels.tsa.stattools import adfuller
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import ccf

Step 2: Query the database. 

Our sampling methodology is an interrupted census (every post for a 5 minute interval, every 3 hours throughout the day), not a random sample, and not a continuous sample.

Therefore for timeseries analysis, we GROUP BY the day field to alow for a continuous timeseries.

In [3]:
query = f"""select
  keyword,
  topic,
  category,
  DAY_POST_CREATED_AT,
  max(MONTH_POST_CREATED_AT) as month_post_created_at,
  max(YEAR_POST_CREATED_AT) as year_post_created_at,
  COUNT(author_thread_mentions) daily_author_thread_mentions,
  SUM(total_posts_with_keyword) as daily_total_posts_keyword,
  SUM(positive_mentions) AS daily_positive_mentions,
  SUM(negative_mentions) AS daily_negative_mentions,
  SUM(neutral_mentions) AS daily_neutral_mentions,
  AVG(pct_positive) as daily_pct_positive,
 AVG(pct_negative) as daily_pct_negative,
 AVG(pct_neutral) as daily_pct_neutral,
from {SF_DB}.{SF_SC}.timeseries_nlp
group by 
keyword
,topic
,category
,day_post_created_at
order by month_post_created_at,day_post_created_at asc;""" 
CSR.execute(query)
df = CSR.fetch_pandas_all()

In [4]:
df.count()

KEYWORD                         124
TOPIC                           124
CATEGORY                        124
DAY_POST_CREATED_AT             124
MONTH_POST_CREATED_AT           124
YEAR_POST_CREATED_AT            124
DAILY_AUTHOR_THREAD_MENTIONS    124
DAILY_TOTAL_POSTS_KEYWORD       124
DAILY_POSITIVE_MENTIONS         124
DAILY_NEGATIVE_MENTIONS         124
DAILY_NEUTRAL_MENTIONS          124
DAILY_PCT_POSITIVE              124
DAILY_PCT_NEGATIVE              124
DAILY_PCT_NEUTRAL               124
dtype: int64

Step 3: Format Timestamp Column

This step creates a pandas-formatted timestamp that we can use for all the remaining tests in this notebook.

Timeseries data columns (hour, day, month, year) are stored as separate numeric columns in the dataset to allow for other groupins and analyses.

In [5]:
df['TIMESTAMP'] = pd.to_datetime(dict(year=df['YEAR_POST_CREATED_AT'], month=df['MONTH_POST_CREATED_AT'], day=df['DAY_POST_CREATED_AT']))
print(df.head())

  KEYWORD             TOPIC        CATEGORY  DAY_POST_CREATED_AT  \
0   china  global conflicts  issue specific                   17   
1   trump       US domestic  issue specific                   17   
2   biden       US domestic  issue specific                   17   
3   putin  global conflicts  issue specific                   17   
4   putin  global conflicts  issue specific                   18   

   MONTH_POST_CREATED_AT  YEAR_POST_CREATED_AT  DAILY_AUTHOR_THREAD_MENTIONS  \
0                      7                  2025                            17   
1                      7                  2025                            22   
2                      7                  2025                            16   
3                      7                  2025                            17   
4                      7                  2025                            17   

  DAILY_TOTAL_POSTS_KEYWORD  DAILY_POSITIVE_MENTIONS  DAILY_NEGATIVE_MENTIONS  \
0                       167  

Step 4: Determine which keyword sentiments show a trend vs a stationary. 

The Augmented Dickey-Fuller Test identifies whether a quantity has a trend within a timeseries.
This test cannot determine the direction of the trend on its own. Those keywords showing a trend will be analyzed with decomposition analysis and autocorrelation.

In [6]:
def get_adf_stats(series: pd.Series):
    """
    Performs the Augmented Dickey-Fuller test and returns statistics
    as dictionary with p-value and stationarity status.
    """
    try:
        result = adfuller(series, autolag='AIC')
        p_value = result[1]
        is_stationary = p_value <= 0.05
        return {'adf_p_value': p_value, 'is_stationary': is_stationary}
    except Exception as e:
        print(f"Error during ADF test: {e}")
        return {'adf_p_value': np.nan, 'is_stationary': np.nan}

Step 5: Perform timeseries tests: decomposition runs conditionally on keywords with an identified trend. 

If there is a trend, decomposition  separates out seasonality and residual. This is only performed on keywords where a trend is identified.

We compare the residuals from both additive and multiplicative models. The one with a more random distribution of residuals is the better match to the data.

In [7]:
def get_decomposition_stats(series: pd.Series, model: str, series_name: str):
    """
    Performs seasonal decomposition on a time series using a specified model
    and returns statistics as a dictionary.
    """
    try:
        # Default is 7 day period for weekly seasonality component.
        decomposition = seasonal_decompose(series, model=model, period=7)

        # Populate dictionary output with decomp statistics: trend, seasonal, residual
        trend_stats = {
            'trend_mean': decomposition.trend.mean(),
            'trend_std': decomposition.trend.std()
        }
        
        seasonal_stats = {
            'seasonal_range': decomposition.seasonal.max() - decomposition.seasonal.min()
        }

        residual_stats = {
            'residual_std': decomposition.resid.std()
        }

        # Combine all stats into a single dictionary
        all_decomp_stats = {**trend_stats, **seasonal_stats, **residual_stats}
        
        # Add a prefix to each key to indicate the decomposition model used
        prefixed_decomp_stats = {f"{model}_{series_name}_{key}": value for key, value in all_decomp_stats.items()}
        return prefixed_decomp_stats

    except Exception as e:
        print(f"Error during {model} decomposition for {series_name} series: {e}")
        return {}

Pearson correlation test for one trend vs another, assuming no trend in the data (i.e. from the result of ADF test above)

In [8]:
def get_pearson_correlation_stats(series_a: pd.Series, series_b: pd.Series):
    """
    Calculates the Pearson Correlation Coefficient (zero-lag correlation) between two series.
    Assumes that the input series are already made stationary (e.g., by differencing).
    """
    try:
        # Align series by index and drop any NaN values
        combined = pd.concat([series_a.rename('a'), series_b.rename('b')], axis=1).dropna()
        
        if len(combined) < 2: 
            return {'pearson_r': np.nan, 'abs_pearson_r': np.nan}

        # Calculate the zero-lag Pearson correlation coefficient
        pearson_r = combined['a'].corr(combined['b'], method='pearson')

        return {
            'pearson_r': pearson_r,
            'abs_pearson_r': abs(pearson_r)
        }

    except Exception as e:
        print(f"Error during Pearson Correlation test: {e}")
        return {'pearson_r': np.nan, 'abs_pearson_r': np.nan}

Step 6: Run the pipeline script that stitches the tests together.

In [9]:
#Runs the full timeseries analysis pipeline
def timeseries_analysis_pipeline(df: pd.DataFrame, keyword_column: str = 'KEYWORD', timestamp_column: str = 'TIMESTAMP'):
    """
    Runs a statistical analysis pipeline on the timeseries DataFrame ('df' by default here).
    It expects a DataFrame with a 'TIMESTAMP' column for daily time series index
    and the 'KEYWORD' column for grouping the data. Keywords are determined by table object in Snowflake (steets schema) prior to analysis.
    It will perform ADF tests
    and conditional, seasonal decomposition (if trend identified) and pearson's correlation test on the specified data columns.
    Pearson's runs on 
    """
    # Define the metric columns for analysis-- both percentages and thread counts.
    time_series_columns = [
        'DAILY_PCT_POSITIVE',
        'DAILY_PCT_NEGATIVE',
        'DAILY_POSITIVE_MENTIONS',
        'DAILY_NEGATIVE_MENTIONS'
    ]
#########set up variables and keyword mentions filter    
    # Define the two model types for decomp
    decomposition_models = ['additive', 'multiplicative']
    #Store all the results
    pipeline_results = []   
    
    #Specifying minimum number of mentions to run the correlation analysis (avoids spurrious correlations with low data). 
    #This number is just arbitrary.
    minimum_mentions = 100 
    corr_series_a = 'DAILY_POSITIVE_MENTIONS' 
    corr_series_b = 'DAILY_NEGATIVE_MENTIONS'
    
    # Group the DataFrame by keyword for a row-by-row analysis
    grouped = df.groupby('KEYWORD')
    
    for KEYWORD, group_df in grouped:
        # Sort by TIMESTAMP to ensure the time series is in order
        group_df = group_df.set_index('TIMESTAMP').sort_index()

        # Create a base dictionary for this keyword's results
        row_stats = {'KEYWORD': KEYWORD}

        # Dictionaries to store series and stationarity status for correlation
        series_for_corr = {}
        stationary_status = {}

        #filter to minimum keyword mentions
        run_correlation = group_df[corr_series_a].sum() >= minimum_mentions
#########  APPLY STATISTICAL TESTS
        # Loop through each specified time series column
        for col in time_series_columns:
            series = group_df[col]
            
            #Run the ADF test
            adf_stats = get_adf_stats(series)
            stationary_status[col] = adf_stats['is_stationary']

            #if the series is stationary we use the data as-is. if not, we run series.diff() to difference it
            if not adf_stats ['is_stationary']:
                series_for_corr[col] = series.diff().dropna()
            else:
                series_for_corr[col] = series.dropna()


            # Add ADF results to the dictionary with a descriptive column header
            for key, value in adf_stats.items():
                row_stats[f"{col}_{key}"] = value

############# Conditionally run the decomposition tests
            if not adf_stats['is_stationary']:
                print(f"Running decomposition for '{KEYWORD}' on column '{col}'...")
                for model in decomposition_models:
                    stats = get_decomposition_stats(series, model, col)
                    row_stats.update(stats)
            else:
                print(f"Skipping decomposition for '{KEYWORD}' on column '{col}' (stationary).")
                # Create placeholder columns with NA values
                for model in decomposition_models:
                    for stat_key in ['trend_mean', 'trend_std', 'seasonal_range', 'residual_std']:
                        row_stats[f"{model}_{col}_{stat_key}"] = np.nan
            

############# Next: run Pearson test for correlations on non-trending keywords
        if run_correlation and corr_series_a in series_for_corr and corr_series_b in series_for_corr:
            print(f"Running Pearson correlation for '{KEYWORD}' on {'differenced' if not stationary_status[corr_series_a] else 'raw'} data...")
            
            pos_series_adj = series_for_corr[corr_series_a]
            neg_series_adj = series_for_corr[corr_series_b]

            correlation_stats = get_pearson_correlation_stats(pos_series_adj, neg_series_adj)
            
            # Update dictionary keys
            for key, value in correlation_stats.items():
                row_stats[f"Pearson_{corr_series_a}_vs_{corr_series_b}_{key}"] = value
        else:
            print(f"Skipping Pearson correlation for '{KEYWORD}' (mentions < {minimum_mentions}).")
            # Create placeholder columns with NA values
            row_stats[f"Pearson_{corr_series_a}_vs_{corr_series_b}_pearson_r"] = np.nan
            row_stats[f"Pearson_{corr_series_a}_vs_{corr_series_b}_abs_pearson_r"] = np.nan

        pipeline_results.append(row_stats)
    
    # Convert the list of dictionaries to the final summary DataFrame
    final_df = pd.DataFrame(pipeline_results)
    
    print("--- Final Summary Table Results ---")
    return final_df

In [10]:
final_results_df = timeseries_analysis_pipeline(df, keyword_column='KEYWORD', timestamp_column='TIMESTAMP')

#testing results
final_results_df.to_csv('pipeline_results.csv')

Skipping decomposition for 'biden' on column 'DAILY_PCT_POSITIVE' (stationary).
Skipping decomposition for 'biden' on column 'DAILY_PCT_NEGATIVE' (stationary).
Skipping decomposition for 'biden' on column 'DAILY_POSITIVE_MENTIONS' (stationary).
Skipping decomposition for 'biden' on column 'DAILY_NEGATIVE_MENTIONS' (stationary).
Running Pearson correlation for 'biden' on raw data...
Running decomposition for 'china' on column 'DAILY_PCT_POSITIVE'...
Skipping decomposition for 'china' on column 'DAILY_PCT_NEGATIVE' (stationary).
Skipping decomposition for 'china' on column 'DAILY_POSITIVE_MENTIONS' (stationary).
Skipping decomposition for 'china' on column 'DAILY_NEGATIVE_MENTIONS' (stationary).
Running Pearson correlation for 'china' on raw data...
Skipping decomposition for 'putin' on column 'DAILY_PCT_POSITIVE' (stationary).
Skipping decomposition for 'putin' on column 'DAILY_PCT_NEGATIVE' (stationary).
Running decomposition for 'putin' on column 'DAILY_POSITIVE_MENTIONS'...
Running d