In [0]:
%pip install praw

Collecting praw
  Downloading praw-7.8.1-py3-none-any.whl (189 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 189.3/189.3 kB 871.3 kB/s eta 0:00:00
Collecting update_checker>=0.18
  Downloading update_checker-0.18.0-py3-none-any.whl (7.0 kB)
Collecting prawcore<3,>=2.4
  Downloading prawcore-2.4.0-py3-none-any.whl (17 kB)
Installing collected packages: update_checker, prawcore, praw
Successfully installed praw-7.8.1 prawcore-2.4.0 update_checker-0.18.0
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


### Import Libraries

In [0]:
# Databricks notebook source
from pyspark.sql import SparkSession
from datetime import datetime
import praw
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType


### Reddit API Setup

In [0]:
# Reddit API credentials and setup
REDDIT_CLIENT_ID = "Cb9XMUcLMOnhnSO7JyHOSw"
REDDIT_CLIENT_SECRET = "XZlHL-BNgvaVJYasA6lG0DjFbqXbmA"
REDDIT_USER_AGENT = "Delta Live pipeline"

# Initialize Reddit API client
reddit = praw.Reddit(
    client_id=REDDIT_CLIENT_ID,
    client_secret=REDDIT_CLIENT_SECRET,
    user_agent=REDDIT_USER_AGENT
)

### Fetch Subreddit Data

In [0]:
# Function to fetch subreddit data
def fetch_subreddit_data(subreddit_name="pakistan", limit=100):
    """Fetch hot posts from a subreddit."""
    subreddit = reddit.subreddit(subreddit_name)
    posts = [
        {
            "post_id": post.id,
            "title": post.title,
            "description": post.selftext or None,
            "subreddit": subreddit_name,
            "author": str(post.author),
            "score": post.score,
            "created_utc": datetime.utcfromtimestamp(post.created_utc),
            "url": post.url
        }
        for post in subreddit.hot(limit=limit)
    ]
    return posts

# Fetch raw data from the subreddit
subreddit_name = "pakistan"
raw_data = fetch_subreddit_data(subreddit_name=subreddit_name, limit=100)

It is strongly recommended to use Async PRAW: https://asyncpraw.readthedocs.io.
See https://praw.readthedocs.io/en/latest/getting_started/multiple_instances.html#discord-bots-and-asynchronous-environments for more info.



### Create Spark Session & Define Schema

In [0]:
# Create Spark session with Delta Lake extensions
spark = SparkSession.builder \
    .appName("Reddit Bronze Layer") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define schema for the DataFrame
schema = StructType([
    StructField("post_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("subreddit", StringType(), True),
    StructField("author", StringType(), True),
    StructField("score", IntegerType(), True),
    StructField("created_utc", TimestampType(), True),
    StructField("url", StringType(), True)
])

### Write Data to Delta Format

In [0]:
# Create DataFrame from raw data
df = spark.createDataFrame(raw_data, schema=schema)

# Create or replace the temporary view with the DataFrame
df.createOrReplaceTempView("bronze_reddit_posts_temp")

In [0]:
import dlt
# Use DLT to create a Delta Live Table
@dlt.table(
    comment="This table contains raw Reddit posts fetched from the Pakistan subreddit",
    table_properties={"quality": "bronze"}
)
def bronze_reddit_posts():
    # Returning the DataFrame to be stored in the Delta Live Table
    return df