In [6]:
import requests
import os
from tqdm.notebook import tqdm_notebook
from concurrent.futures import ThreadPoolExecutor, as_completed
import shutil
from bs4 import BeautifulSoup
import time
import numpy as np
from requests.exceptions import HTTPError
import zstandard as zstd
import shutil
from tqdm.notebook import tqdm
from pathlib import Path
import pandas as pd
import json
import re
import glob
import io
from collections import deque
import gc
import concurrent.futures
import glob

def get_files_in_directory(directory_path):
    pattern = '*.zst'
    full_path = os.path.join(directory_path, pattern)
    zst_files = glob.glob(full_path)
    return zst_files


def extract_single_zst(archive: Path):
    """
    Decompress a .zst file directly into memory.
    
    Parameters
    ----------
    archive: pathlib.Path
        Path to the .zst file to extract.
    
    Returns
    -------
    generator
        Yields decompressed lines as strings.
    """
    dctx = zstd.ZstdDecompressor(max_window_size=2147483648)
    with archive.open('rb') as compressed:
        with dctx.stream_reader(compressed) as reader:
            text_stream = io.TextIOWrapper(reader, encoding='utf-8')
            for line in text_stream:
                yield line
                

def get_df(archive_path: str, chunk_size=10000):
    """
    Process the decompressed .zst file directly to a pandas DataFrame in chunks using deque.
    
    Parameters
    ----------
    archive_path: str
        Path to the .zst file.
    chunk_size: int
        Number of lines to process per chunk.
    
    Returns
    -------
    pd.DataFrame
        DataFrame constructed from the decompressed JSON data.
    """
    temp_data = deque()  # Using deque for intermediate storage
    data_frames = deque()  # Using deque to store DataFrames

    for line in tqdm_notebook(extract_single_zst(Path(archive_path))):
        try:
            temp_data.append(json.loads(line))
            if len(temp_data) >= chunk_size:
                df = pd.DataFrame(list(temp_data))  # Convert deque to DataFrame
                data_frames.append(df)
                temp_data.clear()
                del df  # Explicitly delete the temporary DataFrame
                gc.collect()  # Optional: Force garbage collection
        except json.JSONDecodeError as e:
            print(f"error in {archive_path}\n")
            print(f"Error decoding JSON: {e} in line: {line}")
            temp_data.clear()
    
    if temp_data:  # Handle any remaining data
        df = pd.DataFrame(list(temp_data))
        data_frames.append(df)
        temp_data.clear()
        del df  # Explicitly delete the temporary DataFrame

    # Concatenate all data frames into one DataFrame
    if data_frames:
        final_df = pd.concat(data_frames, ignore_index=True)
        data_frames.clear()  # Clear the deque to release memory
        del data_frames  # Explicitly delete the deque
        gc.collect()  # Optional: Force garbage collection
        return final_df
    else:
        return pd.DataFrame()



In [7]:
files=get_files_in_directory("data/Reddit")

In [8]:
df=pd.DataFrame()
for i in tqdm_notebook(files):
    temp_df=get_df(i)
    df=pd.concat([df,temp_df])
    
df

  0%|          | 0/4 [00:00<?, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

0it [00:00, ?it/s]

  final_df = pd.concat(data_frames, ignore_index=True)


0it [00:00, ?it/s]

Unnamed: 0,selftext,media,ups,retrieved_on,link_flair_text,permalink,downs,thumbnail,distinguished,title,...,call_to_action,gallery_data,is_gallery,event_end,event_is_live,event_start,poll_data,tournament_data,previous_visits,collections
0,,,1.0,1.440847e+09,,/r/askSingapore/comments/2zgp2y/were_doing_a_c...,0.0,default,,We're doing a comparative analysis of PH and S...,...,,,,,,,,,,
1,As the title says. \nI'm in Singapore for 3day...,,2.0,1.440843e+09,,/r/askSingapore/comments/2zpibb/best_way_to_ge...,0.0,self,,Best way to get around in town as tourist?,...,,,,,,,,,,
2,"Hi! I am an acting student in Los Angeles, and...",,2.0,1.440838e+09,,/r/askSingapore/comments/2zyt7a/help_intereste...,0.0,self,,Help! interested in your accent,...,,,,,,,,,,
3,,,0.0,1.440836e+09,,/r/askSingapore/comments/3049gi/is_there_anywh...,0.0,self,,Is there anywhere to get a good jump rope in S...,...,,,,,,,,,,
4,,,0.0,1.440834e+09,,/r/askSingapore/comments/308hr7/any_singapores...,0.0,self,,Any Singapore's website selling gift card?,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
267971,[removed],,,1.673168e+09,Discussion,/r/singapore/comments/10026jw/sign_on/,,default,,Sign on,...,,,,,,,,,,
267972,,{'oembed': {'author_name': 'Archangel Guidance...,,1.673168e+09,"Photos, Videos",/r/singapore/comments/1004c9a/taurus_3am_memor...,,default,,Taurus ~ 3am Memories 🌹 A Mirror of Two Lives ...,...,,,,,,,,,,
267973,"Talk about your day. Anything goes, but subred...",,,1.673168e+09,,/r/singapore/comments/1004s1o/rsingapore_rando...,,self,,/r/singapore random discussion and small quest...,...,,,,,,,,,,
267974,,,,1.673168e+09,Tabloid/Low-quality source,/r/singapore/comments/1005b6r/jurong_has_just_...,,https://b.thumbs.redditmedia.com/bbmCU4_asyBM2...,,Jurong Has Just Joined the Million-Dollar Club...,...,,,,,,,,,,


In [9]:
df=df.reset_index(drop=True)

In [10]:
df.to_csv('data/data_SG_reddit.csv.gz', compression='gzip', index=False)