In [0]:

# # CLI example
# databricks secrets create-scope --scope streaming-scope
# databricks secrets put --scope streaming-scope --key x_bearer_token


In [0]:

import requests
import json
from datetime import datetime
from pyspark.sql.functions import lit


In [0]:
# Replace with your bearer token retrieval logic, e.g., from an environment variable or another secure source
import os

bearer = os.environ.get("X_BEARER_TOKEN")
headers = {"Authorization": f"Bearer {bearer}"}
url = "https://api.x.com/2/tweets/sample/stream"  # or filtered stream endpoint

def fetch_tweets(batch_size=100):
    tweets = []
    with requests.get(url, headers=headers, stream=True) as resp:
        for line in resp.iter_lines():
            if line:
                try:
                    tweets.append(json.loads(line))
                except json.JSONDecodeError:
                    continue  # skip malformed lines
                if len(tweets) >= batch_size:
                    break
    return tweets

In [0]:
def process_batch(df, epoch_id):
    tweets = fetch_tweets()
    if tweets:
        spark.createDataFrame(tweets).withColumn("ingest_ts", lit(datetime.utcnow())) \
            .write.format("delta").mode("append").save("abfss://metastore@databricksunityjd.dfs.core.windows.net/Tweeter/tweets")

rate_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

query = (rate_df.writeStream.foreachBatch(process_batch)
         .outputMode("append")
         .trigger(processingTime="30 seconds")
         .start())

In [0]:
%sql

CREATE TABLE IF NOT EXISTS bronze.tweeter_bronze.tweeter
USING DELTA
LOCATION 'abfss://metastore@databricksunityjd.dfs.core.windows.net/Tweeter/tweets';


In [0]:
%sql
select * from bronze.tweeter_bronze.tweeter