# S3 to Dataframe

Convert s3 files to dataframe

In [63]:
import boto3
import pandas as pd, json, csv
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

from itertools import chain
from functools import partial

import requests
from tenacity import retry

from projects_secretes import *

In [50]:
# get the time of a collection file 
def get_time_from_fileName(fileName):
    dateName = fileName.split("/")[-1][:-5]
    date, siteName = dateName.split("_")

    return (date, siteName)

# check if a date is within range
# between time_range [YYYYMMDDHH, YYYYMMDDHH], inclusive of the first item
# exclusive of the second
def is_date_witin_range(date, date_range):
    # Define the date range
    start_date = datetime.strptime(date_range[0], "%Y%m%d%H")
    end_date = datetime.strptime(date_range[1],"%Y%m%d%H")
    
    # The date to check
    date_to_check = datetime.strptime(date, "%Y%m%d%H")  # June 15, 2023
    return start_date <= date_to_check < end_date
  
is_date_witin_range("2024020100", ["2024010100", "2024013123"])

False

In [65]:
## get data from s3
def read_news_file(filekey, bucket):
    s3 = boto3.resource('s3')
    content = s3.Object(bucket, filekey).get()['Body'].read()

    # get date and site name
    date, siteName = get_time_from_fileName(filekey)

    #  read headline, url, probability and time into a list
    content = json.loads(content)
    if "articles" in content:
        result = []

        for x in content["articles"]:
            row = {}
            row["url"] = x.get("url")
            row["headline"] = x.get("headline")
            row["datePublished_site"] = x.get("datePublished")
            row["probability"] = x["metadata"]["probability"]
    
            row["date_collected"] = date
            row["siteName"] = siteName
            result.append(row)

        return (True, result)
    else:
        # failed collection
        return (False, filekey)

# Use ThreadPoolExecutor read files in parallel
# wrapper function of read_news_file
def read_files_in_parallel(bucketName, fnames):
    with ThreadPoolExecutor(max_workers=4) as executor:
        read_news_file_with_partial = partial(read_news_file, bucket=bucketName)
        news_headlines = list(tqdm(executor.map(read_news_file_with_partial, fnames), total=len(fnames)))

    return news_headlines

# post_processsing to create a list of failed collection
def post_processsing(result):
    processed_result = []
    failed_sites = []
    for x in result:
        if x[0]:
            processed_result.append(x[1])
        else:
            failed_sites.append(x[1])

    processed_result = list(chain.from_iterable(processed_result))
    return (processed_result, failed_sites)

In [66]:
domestic_fnames_feb = get_news_fileNames(domestic_bucket_name, prefix = "current_data", time_range = ["2024020100", "2024022923"])
len(domestic_fnames_feb)

15104

In [67]:
news_headlines_domestic_feb, failed_collection_domestic_feb = post_processsing(read_files_in_parallel(domestic_bucket_name, domestic_fnames_feb))

100%|███████████████████████████████████████████| 15104/15104 [11:37<00:00, 21.65it/s]


In [68]:
df_domestic_news = pd.DataFrame(news_headlines_domestic_feb)
df_domestic_news.to_parquet('data/data_domestic_news_feb.parquet', index=False)

In [70]:
# Writing the failed collection list to a CSV

def save_to_csv(list, path):
    with open(path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile)
        # Writing each item in the list as a row
        for item in list:
            writer.writerow([item])

save_to_csv(failed_collection_domestic_feb, "data/collection_rate/failed_collection_domestic_feb.csv")