In [0]:
import pandas as pd
import requests
import time
import json
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, MapType

# Azure Storage credentials
storage_account_name = "macavstorage"
container_name = "datalake"
servicePrincipalID = "0b27e0ea-e184-49cd-b921-b3519cb03f7f"
blobsecret = dbutils.secrets.get(scope="Scope1", key="blobsecret1")
tenantID = "60feac79-e042-4ce8-8759-dca313146110"

# Path to the file in Azure Data Lake
# Create secret scope at 
# https://adb-4383697834848777.17.azuredatabricks.net/#secrets/createScope
#Scope Name = Scope1
#DNS Name = "https://twitchkv.vault.azure.net/" (Vault URI)
#Resource ID = "/subscriptions/972ad05f-b62e-48ab-a9fa-a17fd4dc6640/resourceGroups/twitchData/providers/Microsoft.KeyVault/vaults/twitchkv" (Keyvault resource ID)

#Initializing spark-session and adding configurations
spark = SparkSession.builder \
    .appName("DeltaLakeAzureStorage") \
    .config("spark.sql.extensions", "delta.sql.DeltaSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

#Authenticating Serviceprincipal to access blob storage
spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net", 
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", 
               servicePrincipalID)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", 
               blobsecret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net", 
               f"https://login.microsoftonline.com/{tenantID}/oauth2/token")

In [0]:

campaigns_fetched = spark.read.format("delta").load(f"abfs://{container_name}@{storage_account_name}.dfs.core.windows.net/datalake/campaigns")
display(campaigns_fetched)

In [0]:
for row in campaigns_fetched.select("hashtags").collect():
    for hashtag in row["hashtags"]:
        print(hashtag)

In [0]:

# Function to fetch user details by author_id
def fetch_user_by_author_id(author_id, token):
    url = f"https://api.twitter.com/2/users/{author_id}"
    
    # Headers with Bearer Token
    headers = {
        "Authorization": f"Bearer {token}"
    }

    # API request
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()  # Raise error if request fails
        
        user_data = response.json()
        
        # Extract username and other details if they exist
        if "data" in user_data:
            user = user_data["data"]
            return {
                "id": user["id"],
                "username": user["username"],
                "name": user.get("name", "N/A")
            }
        else:
            print(f"User with author_id {author_id} not found.")
            return None

    except requests.exceptions.RequestException as e:
        print(f"Failed to fetch user for author_id {author_id}: {e}")
        return None


ChatGPT 03/04/2025 - THIS WORKS!

In [0]:
import requests
import time
from pyspark.sql import Row
import datetime

# Toggle between tokens
token_switch = True

# Set up Bearer Tokens (Replace with your actual tokens)
BEARER_TOKEN1 = dbutils.secrets.get(scope="Scope1", key="XBearerToken")
BEARER_TOKEN2 = dbutils.secrets.get(scope="Scope1", key="XBearerToken2")


# Loop through all tweets in all_tweets_data
all_tweet_data = []

# Iterate through all tweets
for tweet in all_tweets_data:
    author_id = tweet['author_id']

    # Alternate between tokens
    token = BEARER_TOKEN1 if token_switch else BEARER_TOKEN2
    token_switch = not token_switch  # Flip token for next request

    # Fetch user details using the author_id
    user = fetch_user_by_author_id(author_id, token)

    if user:
        tweet_id = tweet['id']
        text = tweet['text']
        hashtags = ",".join([tag['tag'] for tag in tweet.get('entities', {}).get('hashtags', [])])
        geo = tweet.get("geo", {}).get("place_id", "N/A")
        username = user['username']  # Extracted from user lookup
        created_at = tweet['created_at']

        # Public metrics
        metrics = tweet['public_metrics']
        retweet_count = metrics['retweet_count']
        reply_count = metrics['reply_count']
        like_count = metrics['like_count']
        quote_count = metrics['quote_count']
        bookmark_count = metrics['bookmark_count']
        impression_count = metrics['impression_count']

        # API completed timestamp (current time in UTC)
        api_completed_timestamp = datetime.datetime.utcnow().isoformat()

        # Prepare data for DataFrame
        tweet_data = {
            "id": tweet_id,
            "text": text,
            "hashtags": hashtags,
            "author_id": author_id,
            "username": username,
            "created_at": created_at,
            "retweet_count": retweet_count,
            "reply_count": reply_count,
            "like_count": like_count,
            "quote_count": quote_count,
            "bookmark_count": bookmark_count,
            "impression_count": impression_count,
            "geo_place_id": geo,
            "api_completed_timestamp": api_completed_timestamp
        }

        all_tweet_data.append(tweet_data)

    # Rate limiting handling
    time.sleep(1)  # Adjust as needed to avoid hitting rate limits

# Convert the collected tweet data to a Spark DataFrame
twitter_data = spark.createDataFrame([Row(**tweet) for tweet in all_tweet_data])


# Writing the Spark DataFrame to Azure Data Lake as Parquet format
#spark_df.write.parquet(data_lake_path, mode="overwrite")
display(spark_df)
# Optionally, if you want to write in JSON format, use:



In [0]:
display(spark_df)

Own code Added bearer token2 03/04/2025, doesn't return user.fields with second api key - USE THE ABOVE INSTEAD

In [0]:
#DONT USE!!!
import requests
import datetime

# Set up Bearer Tokens (Replace with your actual tokens)
BEARER_TOKEN1 = dbutils.secrets.get(scope="Scope1", key="XBearerToken")
BEARER_TOKEN2 = dbutils.secrets.get(scope="Scope1", key="XBearerToken2")

all_tweets_data = []

# Toggle between tokens
token_switch = True

# Loop through hashtags in campaigns_fetched
for row in campaigns_fetched.select("hashtags").collect():
    for hashtag in row["hashtags"]:
        HASHTAG = hashtag

        # Alternate between tokens
        token = BEARER_TOKEN1 if token_switch else BEARER_TOKEN2
        token_switch = not token_switch  # Flip token for next request

        # Twitter API endpoint
        url = "https://api.twitter.com/2/tweets/search/recent"

        # Query parameters
        params = {
            "query": f"#{HASHTAG} -is:reply",  # Exclude replies
            "tweet.fields": "id,text,created_at,public_metrics,attachments,lang,source,author_id,entities",
            "expansions": "attachments.media_keys,author_id",
            "media.fields": "media_key,type,url",
            "user.fields": "id,username",
            "max_results": 10,  # Adjust as needed
        }

        # Headers with Bearer Token
        headers = {
            "Authorization": f"Bearer {token}"
        }

        # API request with error handling
        try:
            response = requests.get(url, params=params, headers=headers)
            response.raise_for_status()  # Raise error if request fails
            tweets_data = response.json()

            # Append only if data exists
            if "data" in tweets_data:
                all_tweets_data.extend(tweets_data["data"])  # Add new tweets without overwriting

        except requests.exceptions.RequestException as e:
            print(f"Failed to fetch tweets for #{HASHTAG}: {e}")

        time.sleep(10)  # Prevent rate limiting

# Print collected tweets
print(all_tweets_data)

OLD

In [0]:
#OLD DO NOT USE

from pyspark.sql import Row
import datetime
# Extract data
tweet = all_tweets_data[0]
user = all_tweets_data['includes']['users'][0]

tweet_id = tweet['id']
text = tweet['text']
hashtags = ",".join([tag['tag'] for tag in tweet.get('entities', {}).get('hashtags', [])])
author_id = tweet['author_id']
geo = tweet.get("geo", {}).get("place_id", "N/A")
username = user['username']
created_at = tweet['created_at']

# Public metrics
metrics = tweet['public_metrics']
retweet_count = metrics['retweet_count']
reply_count = metrics['reply_count']
like_count = metrics['like_count']
quote_count = metrics['quote_count']
bookmark_count = metrics['bookmark_count']
impression_count = metrics['impression_count']

# Get API completed timestamp (current time in UTC)
#api_completed_timestamp = datetime.datetime.utcnow().isoformat()

# Sample data
tweet_data = {
    "id": tweet_id,
    "text": text,
    "hashtags": hashtags,
    "author_id": author_id,
    "username": username,
    "created_at": created_at,
    "retweet_count": retweet_count,
    "reply_count": reply_count,
    "like_count": like_count,
    "quote_count": quote_count,
    "bookmark_count": bookmark_count,
    "impression_count": impression_count,
    "geo_place_id": geo,
    "api_completed_timestamp": 0
}

# Convert to Spark DataFrame
spark_df = spark.createDataFrame([Row(**tweet_data)])

In [0]:
spark_df.write \
    .format("delta") \
    .mode("append") \
    .save(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/datalake/datalake/x_data")

In [0]:
dbutils.fs.ls("abfss://datalake@macavstorage.dfs.core.windows.net/")

In [0]:
xdata_fetched = spark.read.format("delta").load(f"abfs://{container_name}@{storage_account_name}.dfs.core.windows.net/datalake/datalake/x_data")

display(xdata_fetched)