In [None]:
import redis

# Test connection
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Test storing data
r.set('test', 'hello world')

# Test retrieving data
print(r.get('test'))

In [2]:
# import redis

# # Create Redis connection
# r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# # Store a test value
# r.set('welcome', 'Redis is working!')

In [None]:
import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Print all keys
print("All keys in Redis:")
for key in r.keys():
    print(f"{key}: {r.get(key)}")

In [None]:
from eventtrader.keys import BENZINGANEWS_API_KEY

from utils.redisClasses import EventTraderRedis
redis = EventTraderRedis()

In [3]:
# redis.bz_livenews.clear()


In [None]:
# Redis specific imports
import redis
import json

# Add after API configuration
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
REDIS_INSERTION = True
try:
    redis_client.ping()
    print("Successfully connected to Redis")
except Exception as e:
    print(f"Failed to connect to Redis: {e}")


import requests
import schedule
import time
from datetime import datetime, timedelta, timezone as dt_timezone

from email.utils import parsedate_to_datetime
from typing import Dict, List, Any
from pytz import timezone as pytz_timezone



# API Key and Configuration
bz_api_key = BENZINGANEWS_API_KEY
API_URL = "https://api.benzinga.com/api/v2/news"
API_HEADERS = {"accept": "application/json"}
API_PARAMS = {
    "token": bz_api_key,
    "page": 0,
    "pageSize": 10,
    "displayOutput": "full"
}

# Initialize latest_updated to 1 hour ago
latest_updated = int((datetime.now(dt_timezone.utc) - timedelta(hours=1)).timestamp())

from utils.redisClasses import EventTraderRedis
redis_instance = EventTraderRedis()


# This is better? 
# from datetime import datetime
# import pytz

# utc_time = datetime.strptime("2025-01-23T13:31:16.268Z", "%Y-%m-%dT%H:%M:%S.%fZ")
# utc_time = utc_time.replace(tzinfo=pytz.UTC)
# ny_time = utc_time.astimezone(pytz.timezone('America/New_York'))


def to_local_time(utc_time, tz_name="US/Eastern"):
    """Convert UTC datetime to the specified timezone (e.g., EST/EDT)."""
    local_tz = pytz_timezone(tz_name)
    return utc_time.astimezone(local_tz)



def rfc2822_to_unix(date_str: str) -> int:
    """Convert RFC 2822 formatted date (GMT-4:00) to Unix timestamp in UTC."""
    try:
        dt = parsedate_to_datetime(date_str)
        # Force the API's GMT-4:00 offset to UTC        
        return int(dt.astimezone(dt_timezone.utc).timestamp())

    except Exception as e:
        print(f"Error parsing date: {e}")
        return 0


def fetch_news_with_delta(latest_updated: int) -> List[Dict[str, Any]]:
    """Fetch news with updatedSince parameter to minimize data retrieval."""
    params = API_PARAMS.copy()
    if latest_updated:
        params["updatedSince"] = latest_updated - 5  # Subtract 5 seconds as per API guidelines

    try:
        response = requests.get(API_URL, headers=API_HEADERS, params=params)
        response.raise_for_status()
        RAW_RESPONSE = response.json()
        print(f"RAW Response: {RAW_RESPONSE}")
        print(f"RAW Response Type: {type(RAW_RESPONSE)}")
        print(f"RAW Response Keys: {RAW_RESPONSE.keys()}")
        print(f"RAW Response dir: {dir(RAW_RESPONSE)}")
        return RAW_RESPONSE
    except requests.exceptions.RequestException as e:
        print(f"API Request Error: {e}")
    return []

def process_news_item(item: Dict[str, Any]) -> Dict[str, str]:
    """Process a single news item and return formatted fields."""
    return {
        'id': str(item.get('id', 'No ID')),
        'title': item.get('title', 'No Title'),
        'created': item.get('created', 'Unknown Date'),
        'updated': item.get('updated', 'Unknown Date'),
        'url': item.get('url', 'No URL'),
        'author': item.get('author', 'Unknown'),
        'stocks': ', '.join([stock['name'] for stock in item.get('stocks', [])]),
        'channels': ', '.join([channel['name'] for channel in item.get('channels', [])]),
        'tags': ', '.join([tag['name'] for tag in item.get('tags', [])])
    }

def print_news_item(news_item: Dict[str, str]) -> None:
    """Print formatted news item."""
    try:
        updated_time = parsedate_to_datetime(news_item['updated']).astimezone(dt_timezone.utc)
        local_time = to_local_time(updated_time)
    except Exception as e:
        updated_time, local_time = "Invalid", "Invalid"
        print(f"Error parsing 'updated': {e}")

    print(f"ID: {news_item['id']}")
    print(f"Title: {news_item['title']}")
    print(f"Created: {news_item['created']}")
    print(f"Updated (UTC): {updated_time}")
    print(f"Updated (Local): {local_time}")
    print(f"URL: {news_item['url']}")
    print(f"Author: {news_item['author']}")
    print(f"Stocks: {news_item['stocks']}")
    print(f"Channels: {news_item['channels']}")
    print(f"Tags: {news_item['tags']}")
    print("-" * 80)


def fetch_and_process_news() -> None:
    """Main function to fetch and process news."""
    global latest_updated
    news_items = fetch_news_with_delta(latest_updated)

    if news_items:
        print(f"\nFetched {len(news_items)} news items at {datetime.now()}")
        for item in news_items:
            processed_item = process_news_item(item)
            print_news_item(processed_item)

            if REDIS_INSERTION:
                try:
                    news_json = json.dumps(processed_item)
                    key = f"benzinga:news:{processed_item['id']}"
                    
                    # Store with debug prints
                    print(f"Storing news item with key: {key}")
                    set_result = redis_client.set(key, news_json)
                    zadd_result = redis_client.zadd('benzinga:news:timeline', 
                                                  {processed_item['id']: rfc2822_to_unix(processed_item['updated'])})
                    
                    # Verify storage
                    stored_value = redis_client.get(key)
                    print(f"Redis SET result: {set_result}")
                    print(f"Redis ZADD result: {zadd_result}")
                    print(f"Verification - Data stored: {'Yes' if stored_value else 'No'}")
                    
                except Exception as e:
                    print(f"Redis error: {e}")

            # Update the latest_updated timestamp
            updated_unix = rfc2822_to_unix(item['updated'])
            if not latest_updated or updated_unix > latest_updated:
                latest_updated = updated_unix
    else:
        print("No new or updated news items retrieved")

def main():
    """Main entry point."""
    print("Starting Benzinga News Fetcher...")

    # Initial run
    fetch_and_process_news()

    # Schedule subsequent runs
    schedule.every(5).seconds.do(fetch_and_process_news)

    # Main loop
    while True:
        try:
            schedule.run_pending()
            time.sleep(5)
        except KeyboardInterrupt:
            print("\nStopping news fetcher...")
            break

if __name__ == "__main__":
    main()


### Redis Insertion (Trial)


In [None]:
# To get a specific news item
news_json = redis_client.get("benzinga:news:some_id")
news_item = json.loads(news_json) if news_json else None

# To get latest news IDs
latest_ids = redis_client.zrevrange('benzinga:news:timeline', 0, 9)  # get last 10 news IDs
latest_ids

In [None]:
class RedisToNeo4j:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, decode_responses=True)
        self.uri = "bolt://localhost:7687"
        self.neo4j_driver = GraphDatabase.driver(self.uri, auth=("neo4j", "Next2020#"))

    def check_data_types(self):
        # Check all benzinga keys and their types
        for key in self.redis.keys("benzinga:news:*"):
            key_type = self.redis.type(key)
            print(f"Key: {key}, Type: {key_type}")
            
            # If it's a string, try to get the value
            if key_type == 'string':
                try:
                    value = self.redis.get(key)
                    print(f"Value: {value[:100]}...")  # Print first 100 chars
                except Exception as e:
                    print(f"Error reading value: {e}")
            elif key_type == 'set' or key_type == 'sorted_set':
                members = self.redis.smembers(key) if key_type == 'set' else self.redis.zrange(key, 0, -1)
                print(f"Members: {list(members)[:5]}...")  # Print first 5 members

# Test it
processor = RedisToNeo4j()
processor.check_data_types()

In [None]:
from neo4j import GraphDatabase
import redis
import json

class RedisToNeo4j:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, decode_responses=True)
        self.uri = "bolt://localhost:7687"
        self.neo4j_driver = GraphDatabase.driver(self.uri, auth=("neo4j", "Next2020#"))

    def process_news(self, key):
        try:
            # Get news from Redis (only for string types)
            if self.redis.type(key) == 'string':
                news_data = self.redis.get(key)
                news = json.loads(news_data)
                
                # Store in Neo4j
                with self.neo4j_driver.session() as session:
                    result = session.run("""
                        MERGE (n:News {id: $id})
                        SET n.title = $title,
                            n.created = $created
                        RETURN n
                    """, {
                        'id': news['id'],
                        'title': news.get('title', ''),
                        'created': news.get('created', '')
                    })
                    print(f"Stored news {news['id']} in Neo4j")

        except Exception as e:
            print(f"Error processing {key}: {e}")

    def process_all_news(self):
        # Process all news except timeline
        for key in self.redis.keys("benzinga:news:*"):
            if key != "benzinga:news:timeline":  # Skip the timeline set
                self.process_news(key)

    def close(self):
        self.neo4j_driver.close()

# Test it
processor = RedisToNeo4j()
processor.process_all_news()
processor.close()