In [1]:
import sys
import hopsworks
import os
sys.path.append("../..")

import utils.reddit_scraper as utils
import pandas as pd
from datetime import datetime

Loading CryptoBERT...


# Configuration

In [None]:
from datetime import datetime, timedelta
import time

SUBREDDITS = {
    'solana': 'SOL',
}

START_DATE = '2020-01-01'
END_DATE = '2025-12-31'

def get_weekly_ranges(start, end):
    """Generate weekly date ranges"""
    ranges = []
    current = datetime.strptime(start, '%Y-%m-%d')
    end_dt = datetime.strptime(end, '%Y-%m-%d')
    
    while current < end_dt:
        next_week = current + timedelta(days=1)

        if next_week > end_dt:
            next_week = end_dt
        
        ranges.append((
            current.strftime('%Y-%m-%d'),
            next_week.strftime('%Y-%m-%d')
        ))
        
        current = next_week
    
    return ranges

# Generate date ranges
date_ranges = get_weekly_ranges(START_DATE, END_DATE)
print(f"üìÖ Will fetch {len(date_ranges)} weeks of data")
print(f"   From {date_ranges[0][0]} to {date_ranges[-1][1]}")

üìÖ Will fetch 2191 weeks of data
   From 2020-01-01 to 2025-12-31


In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
import time

print_lock = Lock()
all_posts = []
posts_lock = Lock()

def fetch_period(args):
    """Fetch posts for a specific period"""
    subreddit, start, end, period_idx, total_periods = args
    
    try:
        posts = utils.fetch_pushshift_posts(
            subreddit=subreddit,
            start_date=start,
            end_date=end,
            limit=2000
        )
        
        with print_lock:
            if posts:
                print(f"   ‚úÖ [{period_idx}/{total_periods}] {start} ‚Üí {end}: {len(posts)} posts")
            else:
                print(f"   ‚ö†Ô∏è [{period_idx}/{total_periods}] {start} ‚Üí {end}: No posts")
        
        return posts if posts else []
    
    except Exception as e:
        with print_lock:
            print(f"   ‚ùå [{period_idx}/{total_periods}] {start} ‚Üí {end}: Error - {e}")
        return []

tasks = []
for subreddit in SUBREDDITS.keys():
    for i, (start, end) in enumerate(date_ranges, 1):
        tasks.append((subreddit, start, end, i, len(date_ranges)))

print(f"\nüöÄ Starting parallel scraping with {min(10, len(tasks))} workers...")


all_posts = []
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(fetch_period, task) for task in tasks]
    
    for future in as_completed(futures):
        posts = future.result()
        if posts:
            with posts_lock:
                all_posts.extend(posts)

print(f"\nüéâ Backfill complete!")
print(f"üìä Total posts fetched: {len(all_posts)}")


üöÄ Starting parallel scraping with 10 workers...
   Fetched 17 posts (total: 17)
   Fetched 33 posts (total: 33)
   Fetched 28 posts (total: 28)
   Fetched 27 posts (total: 27)
   Fetched 39 posts (total: 39)
   Fetched 32 posts (total: 65)
   Fetched 16 posts (total: 33)
   Fetched 26 posts (total: 53)
   Fetched 27 posts (total: 55)
   Fetched 38 posts (total: 77)
   Fetched 15 posts (total: 48)
   Fetched 26 posts (total: 81)
   Fetched 31 posts (total: 96)
   Fetched 25 posts (total: 78)
   Fetched 37 posts (total: 114)
   Fetched 25 posts (total: 106)
   Fetched 14 posts (total: 62)
   Fetched 24 posts (total: 102)
   Fetched 36 posts (total: 150)
   Fetched 30 posts (total: 126)
   Fetched 24 posts (total: 130)
   Fetched 13 posts (total: 75)
   Fetched 23 posts (total: 125)
   Fetched 29 posts (total: 155)
   Fetched 35 posts (total: 185)
   Fetched 23 posts (total: 153)
   Fetched 12 posts (total: 87)
   Fetched 22 posts (total: 147)
   Fetched 28 posts (total: 183)
   Fetch

In [None]:
df = pd.DataFrame(all_posts)

df = df[['subreddit', 'title', 'selftext', 'score', 'num_comments', 'created_utc']]
df['crypto'] = df['subreddit'].map(SUBREDDITS)
df = df[['crypto', 'title', 'selftext', 'score', 'num_comments', 'created_utc']]
df['created_utc'] = pd.to_datetime(df['created_utc'], unit='s')

# Sauvegarder
output_file = 'solana_posts_backfill.csv'
df.to_csv(output_file, index=False)



‚úÖ SUCCESS!
üìä Total posts: 33857
üíæ Saved to: cardano_posts_backfill.csv

Posts par crypto:
crypto
ADA    33857
Name: count, dtype: int64


In [None]:
import pandas as pd

df = pd.read_csv('cardano_posts_backfill.csv', parse_dates=['created_utc'])
df['created_utc'] = pd.to_datetime(df['created_utc'], unit='s')
df["selftext"] = df["selftext"].replace(['[deleted]', '[removed]'], '')
df["selftext"] = df["selftext"].fillna('')
df['crypto'] = df['crypto'].astype(str)
df['title'] = df['title'].astype(str)
df['selftext'] = df['selftext'].astype(str)

df.dtypes

crypto                  object
title                   object
selftext                object
score                    int64
num_comments             int64
created_utc     datetime64[ns]
dtype: object

# Create the feature of aggregated sentiments

In [None]:
df_sentiment = utils.create_sentiment_table(df)

df_sentiment.head()
df_sentiment.to_csv('solana_sentiment_backfill.csv', index=False)

Using device: cuda


ValueError: All arrays must be of the same length

In [None]:
df_sentiment.head()

Unnamed: 0,timestamp,sentiment
0,2022-02-28 17:44:51,0
1,2022-08-08 14:48:47,0
2,2023-05-12 12:08:07,0
3,2023-11-05 07:00:00,0
4,2022-08-23 18:11:23,0


In [None]:
df_sentiment = pd.read_csv('solana_sentiment_backfill.csv', parse_dates=['timestamp'])
df_agg_sentiment = utils.agregate_sentiment_table(df_sentiment)

df_agg_sentiment.head()

Unnamed: 0,date,mean_sentiment,count
0,2020-01-03,0.0,1
1,2020-01-11,0.0,1
2,2020-01-19,0.0,1
3,2020-02-05,0.333333,3
4,2020-02-06,0.0,1


In [None]:
df_agg_sentiment.to_csv('solana_aggregated_sentiment_backfill.csv', index=False)

In [None]:
import pandas as pd
import hopsworks
df = pd.read_csv('solana_aggregated_sentiment_backfill.csv')
df['timestamp'] = (pd.to_datetime(df['date']) - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s')
df.drop(columns=['date'], inplace=True)

project = hopsworks.login()
fs = project.get_feature_store()

fg = fs.create_feature_group(
    "solana_aggregated_sentiment_backfill",
    version=1,
    primary_key=["timestamp"],
    event_time="timestamp",
)

fg.save(df)

2026-01-09 18:00:03,600 INFO: Closing external client and cleaning up certificates.
Connection closed.
2026-01-09 18:00:03,615 INFO: Initializing external client
2026-01-09 18:00:03,618 INFO: Base URL: https://c.app.hopsworks.ai:443
To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'







2026-01-09 18:00:05,251 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1279131
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1279131/fs/1265740/fg/1893896


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 1818/1818 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: reddit_aggregated_sentiment_backfill_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279131/jobs/named/reddit_aggregated_sentiment_backfill_1_offline_fg_materialization/executions


(Job('reddit_aggregated_sentiment_backfill_1_offline_fg_materialization', 'SPARK'),
 None)