# Data Preprocessing Notebook

This notebook preprocesses two datasets:

- **NASDAQ news data**: Converts date strings to UTC and saves the result as a Parquet file.
- **Full stock price history**: Standardizes date columns, converts dates to UTC, and saves each file as a Parquet file.

Make sure the required CSV files are available in the specified folders before running the cells.

In [1]:
import os
from datetime import timedelta, datetime
import polars as pl
from multiprocessing import Pool, cpu_count

In [10]:
import os
import polars as pl

input_file = 'nasdaq_external_data.csv'
output_file = 'nasdaq_small.csv'
n = 100

# Read the CSV file using Polars
df = pl.read_csv(input_file)

df_head = df.head(n)
df_head.write_csv(output_file)
    


In [11]:
def convert_to_utc(time_str):
    if isinstance(time_str, float) or not isinstance(time_str, str):
        return "Invalid date format"
        
    if " EDT" in time_str:
        time_str_cleaned = time_str.replace(" EDT", "")
        offset = timedelta(hours=-4)
    elif " EST" in time_str:
        time_str_cleaned = time_str.replace(" EST", "")
        offset = timedelta(hours=-5)
    else:
        offset = timedelta(hours=0)
        time_str_cleaned = time_str

    formats = [
        '%B %d, %Y — %I:%M %p',  # e.g., "September 12, 2023 — 06:15 pm"
        '%b %d, %Y %I:%M%p',      # e.g., "Nov 14, 2023 7:35AM"
        '%d-%b-%y',              # e.g., "6-Jan-22"
        '%Y-%m-%d',              # e.g., "2021-4-5"
        '%Y/%m/%d',              # e.g., "2021/4/5"
        '%b %d, %Y',             # e.g., "DEC 7, 2023"
        '%Y-%m-%d %H:%M:%S'      # e.g., "2023-12-07 16:30:00"
    ]

    for fmt in formats:
        try:
            dt = datetime.strptime(time_str_cleaned, fmt)
            # For the '%d-%b-%y' format, ignore offset adjustment
            if fmt == '%d-%b-%y':
                offset = timedelta(hours=0)
            dt_utc = dt + offset
            # Return ISO formatted string without the " UTC" suffix
            return dt_utc.strftime('%Y-%m-%d %H:%M:%S')
        except ValueError:
            continue

    return "Invalid date format"

In [12]:
def process_nasdaq_data_parquet(file_path, saving_path):
    print('Processing NASDAQ news data...')
    try:
        # Use lazy reading for memory efficiency
        df = pl.scan_csv(file_path, infer_schema_length=50000, schema_overrides={"Article": pl.Utf8})
        
        print("scan done")
        # Drop the Unnamed: 0 column if it exists (using lazy schema collection)
        if "Unnamed: 0" in df.collect_schema().names():
            df = df.drop("Unnamed: 0")

        print("drop column")
        
        # Convert the Date column to UTC (using the custom function)
        # Collect only the Date column to minimize overhead
        date_df = df.select("Date").collect()
        converted_dates = date_df.with_columns([
            pl.col("Date").map_elements(convert_to_utc, return_dtype=pl.Utf8).alias("Date")
        ])

        print("strp time")
        
        # Replace the original Date column with the converted dates (still as strings)
        df = df.with_columns([
            pl.lit(converted_dates.get_column("Date")).alias("Date")
        ])
        
        print("date")
        
        # Since our dates are now in ISO format ("YYYY-MM-DD HH:MM:SS"),
        # we can sort them as strings and get correct chronological order.
        df = df.sort("Date", descending=True)
        
        # Ensure saving path exists
        os.makedirs(saving_path, exist_ok=True)
        output_file = os.path.join(saving_path, os.path.basename(file_path).replace('.csv', '.parquet'))
        
        print("executing lazy query")
        # Execute the lazy query and write directly to parquet
        df.collect().write_parquet(output_file)
        print('Done processing NASDAQ news data. Saved to:', output_file)
        
    except Exception as e:
        print(f"Error processing NASDAQ news data: {str(e)}")
        raise 

In [2]:
def process_stock_history_parquet(folder_path, saving_path):
    os.makedirs(saving_path, exist_ok=True)
    
    csv_files = [f for f in os.listdir(folder_path) if f.endswith('.csv')]
    for csv_file in csv_files:
        print(f'Processing {csv_file}...')
        file_path = os.path.join(folder_path, csv_file)
        
        try:
            df = pl.read_csv(file_path)

            # Lowercase all column names
            for col in df.columns:
                df = df.rename({col: col.lower()})

            # Identify and standardize the date column (rename to "Date")
            date_columns = ["datetime", "date", "time", "timestamp"]
            for col in date_columns:
                if col.lower() in df.columns:
                    df = df.rename({col: "Date"})
                    break
            
            if "Date" not in df.columns:
                print(f"No date column found in {csv_file}, skipping...")
                continue
            
            df = df.with_columns([
                pl.col("Date").map_elements(convert_to_utc, return_dtype=pl.Utf8).alias("Date")
            ])
            
            # Sort by date descending
            df = df.sort("Date", descending=True)
            
            output_file = os.path.join(saving_path, csv_file.replace('.csv', '.parquet'))
            df.write_parquet(output_file)
            print(f'Done processing {csv_file}. Saved to: {output_file}')
            
        except Exception as e:
            print(f"Error processing {csv_file}: {str(e)}")

## Run the Preprocessing Functions

Adjust the file paths as needed before running the cells.

In [15]:
# Define paths for the input files and output folders
# For NASDAQ news data:
nasdaq_file_path = 'nasdaq_external_data.csv'
nasdaq_saving_path = 'nasdaq_data_preprocessed'

# Process the NASDAQ news data and save as Parquet
process_nasdaq_data_parquet(nasdaq_file_path, nasdaq_saving_path)

Processing NASDAQ news data...
scan done
drop column
strp time
date
executing lazy query
Done processing NASDAQ news data. Saved to: nasdaq_data_preprocessed\nasdaq_external_data.parquet


In [None]:
# For full stock price history:
full_history_folder_path = 'full_history'
full_history_saving_path = 'full_history_preprocessed'

# Process the full stock history and save each file as Parquet
process_stock_history_parquet(full_history_folder_path, full_history_saving_path)

Processing full stock price history...
Using 11 processes for parallel processing...


: 