In [3]:
import os
from dotenv import load_dotenv
import sys
sys.path.append("utils")

from twitter_api import search_recent_tweets
# Load .env file from project root
load_dotenv(dotenv_path="../.env")

# Get token securely
bearer_token = os.getenv("TWITTER_BEARER_TOKEN")
if not bearer_token:
    raise ValueError("Bearer token not found in .env")
else:
    query = "databricks OR spark lang:en -is:retweet"

tweets = search_recent_tweets(query, bearer_token)
print(tweets[0])  # Inspect one tweet


{'edit_history_tweet_ids': ['1936859530505146680'], 'lang': 'en', 'id': '1936859530505146680', 'author_id': '932881495462641665', 'created_at': '2025-06-22T18:51:11.000Z', 'text': '@0xTired_ @sparkdotfi @cookiedotfun Good content Spark is ......'}


In [None]:
import os
from dotenv import load_dotenv
import sys
import json
from datetime import datetime

# Set up path and import Twitter logic
sys.path.append("utils")
from twitter_api import search_recent_tweets

# Load token
load_dotenv(dotenv_path="../.env")
bearer_token = os.getenv("TWITTER_BEARER_TOKEN")
if not bearer_token:
    raise ValueError("Bearer token not found")

# 1. Get tweets
query = "databricks OR spark lang:en -is:retweet"
tweets = search_recent_tweets(query, bearer_token)

# 2. Save tweets to local JSON
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
local_path = f"/tmp/tweets_{timestamp}.json"

with open(local_path, "w") as f:
    json.dump(tweets, f)

print(f"✅ Saved tweets locally at: {local_path}")

# 3. Upload to DBFS
dbfs_path = f"dbfs:/mnt/twitter/raw/tweets_{timestamp}.json"
dbutils.fs.cp(f"file:{local_path}", dbfs_path)
print(f"✅ Uploaded to DBFS at: {dbfs_path}")

# 4. Load into Spark DataFrame
df = spark.read.json(dbfs_path)
df.show(truncate=False)

# 5. Save as Delta Lake table (overwrite or append)
delta_path = "dbfs:/mnt/twitter/silver/tweets_delta"
df.write.format("delta").mode("append").save(delta_path)
print(f"✅ Written to Delta table at: {delta_path}")
