In [None]:
import os, gzip, json, typing

from tqdm import tqdm
from typing import Generator
import urllib.request
import ssl
import multiprocessing as mp
import concurrent.futures

import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import numpy as np

ssl._create_default_https_context = ssl._create_unverified_context # Disable the certificate verification process 

In [None]:
def parse(line: str) -> dict:
    return json.loads(line)

def read_gz_file(url: str, chunk_size: int) -> list:
    response = urllib.request.urlopen(url)
    with gzip.open(response, "rt") as gz_file:
        for i, line in enumerate(gz_file):
            # Using generator objects we pass each item into the list rather than load everything into the memory at once
            # This lets us avoid the memory storage of creating a temporary list and appending each line to each list
            if (i + 1) % chunk_size == 1:
                chunk = (l for l in [line]) # If chunk_size is 1 then a new chunk is created containing only the current line
            else:                                       
                chunk = (l for l in [*chunk, line]) # Any other index will result in items being added to the existing list
                
            if (i + 1) % chunk_size == 0: # If we have the perfect chunk size then they are then yielded
                yield chunk
                
        if chunk:
            yield chunk # If for some reason we have leftover lines that don't make up a full chunk we yield those as well
            
def process_chunk(chunk: Generator[str, None, None], file_name: str) -> None:             
    records = (parse(line) for line in chunk)
    table = pa.Table.from_pydict({"col": list(records)})
    pq.write_table(table, file_name)
    
def process_data(url: str, chunk_size: int, num_workers: int, output_dir: str) -> None:
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
        for i, chunk in enumerate(read_gz_file(url, chunk_size)):
            file_name = os.path.join(output_dir, f"chunk_{i}.parquet")
            executor.submit(process_chunk, chunk, file_name)                                

In [None]:
def parse(url: str):
    response = urllib.request.urlopen(url)
    gz_file = gzip.open(response, "r")
    for line in gz_file:
        yield json.loads(line)

def toDF(url: str) -> pd.DataFrame:
    index = 0
    df = {}
    for line in parse(url):
        df[index] = line
        index += 1
    return pd.DataFrame.from_dict(df, orient="index") 

In [None]:
# Faster Probably, but will take more memory
# Also pretty inconsistent since it each thread is trying to process the chunks at once.
# Breaks when I try to optimize
def read_gz_file(url: str) -> list:
    with gzip.open(urllib.request.urlopen(url), "rt") as gz_file:
        lines = gz_file.readlines()
    return lines

def write_lines(lines: list, filename: str) -> None:
    with open(filename, "w") as file:
        for line in lines:
            file.write(line)
    
def process_chunk(chunk: list, filename: str) -> None:
    write_lines(chunk, filename)
    print(f"FINISHED CHUNK: {filename}")

def process_data(url: str, chunk_size: int, num_workers: int, output_dir: str) -> None:
    lines = read_gz_file(url)
    chunks = [lines[i:i+chunk_size] for i in range(0,len(lines), chunk_size)]

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = []
        for i, chunk in enumerate(chunks):
            future = executor.submit(process_chunk, chunk, os.path.join(output_dir, f"chunk_{i}.json"))
            futures.append(future)
        concurrent.futures.wait(futures)