# Reddit Data Loading and Analysis with Direct Parquet Uploads

This notebook demonstrates how to:
1. Download data from MinIO
2. Stage the Parquet files directly in Snowflake
3. Load the data into separate posts and authors tables
4. Perform data analysis queries

## 1. Setup and Environment

In [1]:
import os
import pandas as pd
import pyarrow.parquet as pq
import snowflake.connector
import logging
from minio import Minio
from config.config import settings


logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

  warn_incompatible_dep(


## 2. MinIO Configuration and Connection

In [2]:
minio_config = {
    "endpoint": f"localhost:{settings.MINIO_PORT}",
    "access_key": settings.MINIO_ACCESS_KEY,
    "secret_key": settings.MINIO_SECRET_KEY,
    "secure": False,
}

minio_client = Minio(**minio_config)

data_bucket = "extracts-data"
media_bucket = "extracts-media"

print(f"extracts-data bucket exists: {minio_client.bucket_exists(data_bucket)}")
print(f"extracts-media bucket exists: {minio_client.bucket_exists(media_bucket)}")

extracts-data bucket exists: True
extracts-media bucket exists: True


## 3. Download Parquet Files from MinIO

In [3]:
local_dir = "/tmp/minio_downloads"
os.makedirs(local_dir, exist_ok=True)


def download_parquet_files(bucket_name, prefix, output_dir):
    objects = minio_client.list_objects(bucket_name, prefix=prefix, recursive=True)

    download_paths = []
    for obj in objects:
        logger.info(f"Processing {obj.object_name}")
        output_folder = os.path.join(output_dir, os.path.dirname(obj.object_name))
        os.makedirs(output_folder, exist_ok=True)

        local_path = os.path.join(output_dir, obj.object_name)

        try:
            minio_client.fget_object(bucket_name, obj.object_name, local_path)
            download_paths.append(local_path)
            logger.info(f"Downloaded {obj.object_name} to {local_path}")
        except Exception as e:
            logger.error(f"Error downloading {obj.object_name}: {e}")

    logger.info(f"Downloaded {len(download_paths)} files to {output_dir}")
    return download_paths

In [4]:
# Download posts and authors data as parquet
posts_paths = download_parquet_files(data_bucket, "posts/", local_dir)
authors_paths = download_parquet_files(data_bucket, "authors/", local_dir)
media_paths = download_parquet_files(data_bucket, "media/metadata/", local_dir)

2025-05-05 12:34:27,188 - INFO - Processing posts/t3_1jzebsn.parquet
2025-05-05 12:34:27,194 - INFO - Downloaded posts/t3_1jzebsn.parquet to /tmp/minio_downloads/posts/t3_1jzebsn.parquet
2025-05-05 12:34:27,195 - INFO - Processing posts/t3_1k047gw.parquet


2025-05-05 12:34:27,204 - INFO - Downloaded posts/t3_1k047gw.parquet to /tmp/minio_downloads/posts/t3_1k047gw.parquet
2025-05-05 12:34:27,205 - INFO - Processing posts/t3_1k19wzn.parquet
2025-05-05 12:34:27,212 - INFO - Downloaded posts/t3_1k19wzn.parquet to /tmp/minio_downloads/posts/t3_1k19wzn.parquet
2025-05-05 12:34:27,213 - INFO - Processing posts/t3_1k1h2ny.parquet
2025-05-05 12:34:27,217 - INFO - Downloaded posts/t3_1k1h2ny.parquet to /tmp/minio_downloads/posts/t3_1k1h2ny.parquet
2025-05-05 12:34:27,218 - INFO - Processing posts/t3_1k24wgp.parquet
2025-05-05 12:34:27,223 - INFO - Downloaded posts/t3_1k24wgp.parquet to /tmp/minio_downloads/posts/t3_1k24wgp.parquet
2025-05-05 12:34:27,224 - INFO - Processing posts/t3_1k2ypim.parquet
2025-05-05 12:34:27,231 - INFO - Downloaded posts/t3_1k2ypim.parquet to /tmp/minio_downloads/posts/t3_1k2ypim.parquet
2025-05-05 12:34:27,231 - INFO - Processing posts/t3_1k3b3zc.parquet
2025-05-05 12:34:27,235 - INFO - Downloaded posts/t3_1k3b3zc.parq

## 4. Inspecting Parquet Structure (Optional)

This step is just to verify our data structure before loading to Snowflake

In [5]:
if posts_paths:
    sample_post = pq.read_table(posts_paths[0]).to_pandas()
    print("Sample post structure:")
    print(sample_post.head())
    print(f"\nColumns: {sample_post.columns.tolist()}")
else:
    print("No post files found")

if authors_paths:
    sample_author = pq.read_table(authors_paths[0]).to_pandas()
    print("\nSample author structure:")
    print(sample_author.head())
    print(f"\nColumns: {sample_author.columns.tolist()}")
else:
    print("No author files found")

if media_paths:
    sample_media = pq.read_table(media_paths[0]).to_pandas()
    print("\nSample media structure:")
    print(sample_media.head())
    print(f"\nColumns: {sample_media.columns.tolist()}")
else:
    print("No media files found")

Sample post structure:
           id text             title                   timestamp num_likes  \
0  t3_1jzebsn       [i ate] bibimbap  2025-04-15 00:20:06.528000        49   

  num_comments                                                url    author_id  
0            3  https://reddit.com/r/food/comments/1jzebsn/i_a...  t2_f5ynk9df  

Columns: ['id', 'text', 'title', 'timestamp', 'num_likes', 'num_comments', 'url', 'author_id']

Sample author structure:
              id         name headline  \
0  t2_1g08bd984l  CozyBvnnies     None   

                                       url          joined_date  \
0  https://www.reddit.com/user/CozyBvnnies  2025-04-14 00:00:00   

  publication_score comment_score  
0              4184           643  

Columns: ['id', 'name', 'headline', 'url', 'joined_date', 'publication_score', 'comment_score']

Sample media structure:
             id     post_id  \
0  t3_1jzebsn_0  t3_1jzebsn   

                                        original_url  \
0  

## 5. Connect to Snowflake

In [2]:
snowflake_config = {
    "account": settings.SNOWFLAKE_ACCOUNT,
    "user": settings.SNOWFLAKE_USER,
    "password": settings.SNOWFLAKE_PASSWORD,
    "schema": settings.SNOWFLAKE_SCHEMA,
    "database": settings.SNOWFLAKE_DATABASE,
    "stage": settings.SNOWFLAKE_STAGE,
}

try:
    conn = snowflake.connector.connect(**snowflake_config)
    logger.info("Connected to Snowflake")
except Exception as e:
    logger.error(f"Failed to connect to Snowflake: {e}")
    conn = None

2025-05-06 00:29:33,770 - INFO - Snowflake Connector for Python Version: 3.15.0, Python Version: 3.10.12, Platform: Linux-6.8.0-58-generic-x86_64-with-glibc2.35
2025-05-06 00:29:33,773 - INFO - Connecting to GLOBAL Snowflake domain
2025-05-06 00:29:34,214 - INFO - Connected to Snowflake


## 6. Create Snowflake Tables

In [7]:
create_authors_table_sql = f"""
CREATE TABLE IF NOT EXISTS {settings.SNOWFLAKE_SCHEMA}.author (
    id STRING PRIMARY KEY,
    name STRING,
    headline STRING,
    url STRING,
    joined_date TIMESTAMP,
    publication_score INTEGER,
    comment_score INTEGER
);
"""

create_posts_table_sql = f"""
CREATE TABLE IF NOT EXISTS {settings.SNOWFLAKE_SCHEMA}.post (
    id STRING PRIMARY KEY,
    text STRING,
    title STRING,
    timestamp TIMESTAMP,
    num_likes INTEGER,
    num_comments INTEGER, 
    url STRING,
    author_id STRING
);
"""

create_media_table_sql = f"""
CREATE TABLE IF NOT EXISTS {settings.SNOWFLAKE_SCHEMA}.media (
    id STRING PRIMARY KEY,
    post_id STRING,
    original_url STRING,
    hosted_url STRING
);
"""

if conn:
    try:
        cursor = conn.cursor()
        cursor.execute(create_authors_table_sql)
        logger.info("Created or verified reddit_authors table")
        cursor.execute(create_posts_table_sql)
        logger.info("Created or verified reddit_posts table")
        cursor.execute(create_media_table_sql)
        logger.info("Created or verified media table")
    except Exception as e:
        logger.error(f"Error creating tables: {e}")

2025-05-05 12:35:05,017 - INFO - Created or verified reddit_posts table
2025-05-05 12:35:05,530 - INFO - Created or verified reddit_authors table
2025-05-05 12:35:06,042 - INFO - Created or verified media table


In [3]:
add_post_fk_sql = f"""
ALTER TABLE {settings.SNOWFLAKE_SCHEMA}.post
ADD CONSTRAINT fk_post_author
FOREIGN KEY (author_id) REFERENCES {settings.SNOWFLAKE_SCHEMA}.author(id);
"""

add_media_fk_sql = f"""
ALTER TABLE {settings.SNOWFLAKE_SCHEMA}.media
ADD CONSTRAINT fk_media_post
FOREIGN KEY (post_id) REFERENCES {settings.SNOWFLAKE_SCHEMA}.post(id);
"""

if conn:
    try:
        cursor = conn.cursor()

        cursor.execute(add_post_fk_sql)
        logger.info("Added foreign key: post.author_id → author.id")

        cursor.execute(add_media_fk_sql)
        logger.info("Added foreign key: media.post_id → post.id")

    except Exception as e:
        logger.error(f"Error creating tables or adding foreign keys: {e}")


2025-05-06 00:31:12,958 - INFO - Added foreign key: post.author_id → author.id
2025-05-06 00:31:13,366 - INFO - Added foreign key: media.post_id → post.id


## 7. Upload Parquet Files to Snowflake

In [8]:
if conn:
    try:
        cursor = conn.cursor()

        cursor.execute(f"USE SCHEMA {settings.SNOWFLAKE_SCHEMA}")
        cursor.execute(f"SHOW STAGES LIKE '{settings.SNOWFLAKE_STAGE}'")

        if cursor.fetchone() is None:
            logger.info(
                f"Stage {settings.SNOWFLAKE_STAGE} does not exist, please create it or check the name"
            )
        else:
            logger.info(f"Stage {settings.SNOWFLAKE_STAGE} exists")

            if posts_paths:
                for path in posts_paths:
                    put_command = f"PUT file://{os.path.abspath(path)} @{settings.SNOWFLAKE_STAGE}/posts/ AUTO_COMPRESS=FALSE;"
                    cursor.execute(put_command)

            if authors_paths:
                for path in authors_paths:
                    put_command = f"PUT file://{os.path.abspath(path)} @{settings.SNOWFLAKE_STAGE}/authors/ AUTO_COMPRESS=FALSE;"
                    cursor.execute(put_command)

            if media_paths:
                for path in media_paths:
                    put_command = f"PUT file://{os.path.abspath(path)} @{settings.SNOWFLAKE_STAGE}/media/metadata/ AUTO_COMPRESS=FALSE;"
                    cursor.execute(put_command)

    except Exception as e:
        logger.error(f"Error working with Snowflake stage: {e}")

2025-05-05 12:35:19,888 - INFO - Stage BACKEND_DEV_DE_2025_STAGE_AMINSAFFAR exists


### SnowSQL copy data from stage to tables(Role permissions are not enough)

In [9]:
parquet_format = f"""
    CREATE FILE FORMAT IF NOT EXISTS {settings.SNOWFLAKE_SCHEMA}.parquet_format
    TYPE = 'PARQUET';
"""

if conn:
    try:
        cursor = conn.cursor()
        cursor.execute("USE ROLE BACKEND_DEV_DE_2025_AMINSAFFAR")
        cursor.execute(parquet_format)
    except Exception as e:
        logger.error(f"Error copying data: {e}")

2025-05-05 12:41:44,096 - ERROR - Error copying data: 003001 (42501): 01bc24a1-0306-6501-0002-5c1e00c1d26e: SQL access control error:
Insufficient privileges to operate on schema 'BACKEND_DEV_DE_2025_AMINSAFFAR'


In [10]:
copy_posts_sql = f"""
COPY INTO {settings.SNOWFLAKE_SCHEMA}.reddit_posts 
FROM @{settings.SNOWFLAKE_STAGE}/posts/
FILE_FORMAT = parquet_format;
"""

copy_authors_sql = f"""
COPY INTO {settings.SNOWFLAKE_SCHEMA}.reddit_authors 
FROM @{settings.SNOWFLAKE_STAGE}/authors/
FILE_FORMAT = parquet_format;
"""

copy_media_sql = f"""
COPY INTO {settings.SNOWFLAKE_SCHEMA}.media
FROM @{settings.SNOWFLAKE_STAGE}/media/metadata/
FILE_FORMAT = parquet_format;
"""

if conn:
    try:
        cursor = conn.cursor()
        cursor.execute("USE ROLE BACKEND_DEV_DE_2025_AMINSAFFAR")
        cursor.execute(copy_posts_sql)
        cursor.execute(copy_authors_sql)
        logger.info("Data copied successfully")
    except Exception as e:
        logger.error(f"Error copying data: {e}")

2025-05-05 12:41:46,331 - ERROR - Error copying data: 002003 (02000): 01bc24a1-0306-6c39-0002-5c1e00c26132: SQL compilation error:
File format 'PARQUET_FORMAT' does not exist or not authorized.


## 8. Snowflake Analysis Queries

Perform the required analysis queries:

### Query 1: Get the top post per author by interactions / likes

In [4]:
top_post_by_author_query = """
WITH RankedPosts AS (
    SELECT 
        p.id as post_id,
        p.title,
        p.author_id,
        a.name as author_name,
        p.num_likes,
        ROW_NUMBER() OVER (PARTITION BY p.author_id ORDER BY p.num_likes DESC) as rank
    FROM post p
    JOIN author a ON p.author_id = a.id
)
SELECT 
    post_id,
    title,
    author_id,
    author_name,
    num_likes
FROM RankedPosts
WHERE rank = 1
ORDER BY num_likes DESC;
"""

print("Query 1: Top post per author by interactions/likes")
print(top_post_by_author_query)

Query 1: Top post per author by interactions/likes

WITH RankedPosts AS (
    SELECT 
        p.id as post_id,
        p.title,
        p.author_id,
        a.name as author_name,
        p.num_likes,
        ROW_NUMBER() OVER (PARTITION BY p.author_id ORDER BY p.num_likes DESC) as rank
    FROM post p
    JOIN author a ON p.author_id = a.id
)
SELECT 
    post_id,
    title,
    author_id,
    author_name,
    num_likes
FROM RankedPosts
WHERE rank = 1
ORDER BY num_likes DESC;



### Query 2: Get the top post per author and per week by interactions / likes

In [5]:
top_post_by_author_weekly_query = """
WITH WeeklyPosts AS (
    SELECT 
        p.id as post_id,
        p.title,
        p.author_id,
        a.name as author_name,
        p.num_likes,
        DATE_TRUNC('WEEK', p.timestamp) as week,
        ROW_NUMBER() OVER (PARTITION BY p.author_id, DATE_TRUNC('WEEK', p.timestamp) ORDER BY p.num_likes DESC) as rank
    FROM post p
    JOIN author a ON p.author_id = a.id
)
SELECT 
    post_id,
    title,
    author_id,
    author_name,
    week,
    num_likes
FROM WeeklyPosts
WHERE rank = 1
ORDER BY week DESC, num_likes DESC;
"""

print("Query 2: Top post per author per week by interactions/likes")
print(top_post_by_author_weekly_query)

Query 2: Top post per author per week by interactions/likes

WITH WeeklyPosts AS (
    SELECT 
        p.id as post_id,
        p.title,
        p.author_id,
        a.name as author_name,
        p.num_likes,
        DATE_TRUNC('WEEK', p.timestamp) as week,
        ROW_NUMBER() OVER (PARTITION BY p.author_id, DATE_TRUNC('WEEK', p.timestamp) ORDER BY p.num_likes DESC) as rank
    FROM post p
    JOIN author a ON p.author_id = a.id
)
SELECT 
    post_id,
    title,
    author_id,
    author_name,
    week,
    num_likes
FROM WeeklyPosts
WHERE rank = 1
ORDER BY week DESC, num_likes DESC;



### Query 3: Get the top author per number of posts (in the available data set)

In [6]:
top_author_by_post_count_query = """
SELECT 
    a.id as author_id,
    a.name as author_name,
    COUNT(p.id) as post_count
FROM author a
JOIN post p ON a.id = p.author_id
GROUP BY a.id, a.name
ORDER BY post_count DESC
LIMIT 10;
"""

print("Query 3: Top author by number of posts")
print(top_author_by_post_count_query)

Query 3: Top author by number of posts

SELECT 
    a.id as author_id,
    a.name as author_name,
    COUNT(p.id) as post_count
FROM author a
JOIN post p ON a.id = p.author_id
GROUP BY a.id, a.name
ORDER BY post_count DESC
LIMIT 10;



## 9. Execute Queries (if connected)

In [7]:
def run_query(query, title):
    if conn:
        try:
            cursor = conn.cursor()
            cursor.execute(query)
            results = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
            df = pd.DataFrame(results, columns=columns)
            print(f"\n{title} Results:")
            return df
        except Exception as e:
            logger.error(f"Error executing query: {e}")
            return pd.DataFrame()
    else:
        print("Not connected to Snowflake, skipping query execution")
        return pd.DataFrame()

In [8]:
# Run the queries
query1_results = run_query(top_post_by_author_query, "Top Post by Author")
if not query1_results.empty:
    display(query1_results)

query2_results = run_query(top_post_by_author_weekly_query, "Top Post by Author Weekly")
if not query2_results.empty:
    display(query2_results)

query3_results = run_query(top_author_by_post_count_query, "Top Authors by Post Count")
if not query3_results.empty:
    display(query3_results)


Top Post by Author Results:


Unnamed: 0,POST_ID,TITLE,AUTHOR_ID,AUTHOR_NAME,NUM_LIKES
0,t3_1k835m6,"Not sure if guys can post here, but here’s my ...",t2_l5w4p7jmz,MeaningOtherwise5022,6998
1,t3_1kal9y6,my birthday outfit ♉︎,t2_f5ynk9df,cIitaurus,2055
2,t3_1kcfsne,Dinner date outfit with my sister ❤️ F35,t2_1g08bd984l,CozyBvnnies,1106
3,t3_1k047gw,Photo dump,t2_9nux0f2c,Grace-Music,643
4,t3_1k9gs5z,Made Wild Violet Jelly,t2_u02f1kswd,OurCozyColonial1900,535
5,t3_1k5q2vw,She being funny most of the time,t2_pcmevaw4h,Objective_Common_536,57



Top Post by Author Weekly Results:


Unnamed: 0,POST_ID,TITLE,AUTHOR_ID,AUTHOR_NAME,WEEK,NUM_LIKES
0,t3_1kal9y6,my birthday outfit ♉︎,t2_f5ynk9df,cIitaurus,2025-04-28,2055
1,t3_1kcfsne,Dinner date outfit with my sister ❤️ F35,t2_1g08bd984l,CozyBvnnies,2025-04-28,1106
2,t3_1kb33r1,Photo dump,t2_9nux0f2c,Grace-Music,2025-04-28,511
3,t3_1kdaoou,"I’ve never loved a dress more, except for my w...",t2_u02f1kswd,OurCozyColonial1900,2025-04-28,174
4,t3_1ke8iwp,"'Nala come', and she...",t2_pcmevaw4h,Objective_Common_536,2025-04-28,56
5,t3_1kc00vt,I got another date! Going to take her out on a...,t2_l5w4p7jmz,MeaningOtherwise5022,2025-04-28,17
6,t3_1k835m6,"Not sure if guys can post here, but here’s my ...",t2_l5w4p7jmz,MeaningOtherwise5022,2025-04-21,6998
7,t3_1k4c0g3,extending the life of my curls ➿,t2_f5ynk9df,cIitaurus,2025-04-21,804
8,t3_1k9gs5z,Made Wild Violet Jelly,t2_u02f1kswd,OurCozyColonial1900,2025-04-21,535
9,t3_1k6h2p8,Photo dump. Playing Alabama tomorrow who’s com...,t2_9nux0f2c,Grace-Music,2025-04-21,403



Top Authors by Post Count Results:


Unnamed: 0,AUTHOR_ID,AUTHOR_NAME,POST_COUNT
0,t2_1g08bd984l,CozyBvnnies,16
1,t2_pcmevaw4h,Objective_Common_536,12
2,t2_9nux0f2c,Grace-Music,9
3,t2_f5ynk9df,cIitaurus,7
4,t2_u02f1kswd,OurCozyColonial1900,7
5,t2_l5w4p7jmz,MeaningOtherwise5022,4


## 10. Clean Up

In [60]:
if conn:
    conn.close()
    logger.info("Snowflake connection closed")

2025-05-03 15:49:04,344 - INFO - Snowflake connection closed


## Summary

In this notebook, we've accomplished the following tasks:

1. Connected to MinIO and downloaded the stored Reddit parquet files directly
2. Created separate tables for posts and authors in Snowflake
3. Demonstrated how to stage the parquet files in Snowflake without conversion to CSV
4. Created SQL queries to analyze the data for the required metrics:
   - Top post per author by interactions/likes
   - Top post per author and per week by interactions/likes
   - Top author per number of posts

Using parquet files directly is more efficient as it:
1. Preserves the original data types and schema without conversion
2. Has better compression, reducing data transfer time
3. Is a columnar format, providing better query performance
4. Eliminates the need for intermediate format transformations