In [None]:
import os
import shutil
import tempfile
from time import time
import boto3
import kaggle
import pandas as pd

In [None]:
kaggle.api.authenticate()
s3_client = boto3.client("s3")
sts_client = boto3.client("sts")

In [None]:
ACCOUNT_ID = sts_client.get_caller_identity()["Account"]

In [None]:
def kaggle_to_s3(dataset, path, bucket_name):
    with tempfile.TemporaryDirectory() as temp_dir:
        print(temp_dir)
        kaggle.api.dataset_download_files(dataset, path=temp_dir, unzip=True)
        for root, dirs, files in os.walk(temp_dir):
            files_number = len(files)
            for index, file in enumerate(files):
                start_time = time()
                print(f"Uploading {file} ({index+1}/{files_number})")
                object_key = os.path.join(path, file)
                s3_client.upload_file(
                    os.path.join(root, file), bucket_name, object_key
                )
                end_time = time()
                print(
                    f"Uploaded {file} ({index+1}/{files_number}) in "
                    "{end_time - start_time} seconds"
                )
        shutil.rmtree(temp_dir)


dataset = "olistbr/brazilian-ecommerce"
path = "/olistbr-kaggle"
bucket_name = f"sor-{ACCOUNT_ID}"

kaggle_to_s3(dataset, path, bucket_name)

In [None]:
# list files in bucket
uploaded_files = s3_client.list_objects_v2(Bucket=bucket_name)
uploaded_files["Contents"][:1]

In [None]:
def sor_csv_to_sot_parquet(key):
    sor_bucket_name = f"sor-{ACCOUNT_ID}"
    sot_bucket_name = f"sot-{ACCOUNT_ID}"
    with tempfile.TemporaryFile() as temp_file:
        start_time = time()
        s3_client.download_fileobj(sor_bucket_name, key, temp_file)
        temp_file.seek(0)
        df = pd.read_csv(temp_file, encoding="utf-8", sep=",")
        df.to_parquet(f"/tmp/{key}.parquet")
        s3_client.upload_file(
            f"/tmp/{key}.parquet",
            sot_bucket_name,
            key.replace(".csv", ".parquet"),
        )
        os.remove(f"/tmp/{key}.parquet")
        end_time = time()
        print(f"Uploaded {key} in {end_time - start_time} seconds")


uploaded_files = s3_client.list_objects_v2(Bucket=bucket_name)
for index, obj in enumerate(uploaded_files["Contents"]):
    files_number = len(uploaded_files["Contents"])
    if obj["Key"].endswith(".csv"):
        print(f"Uploading {obj['Key']} ({index+1}/{files_number})")
        sor_csv_to_sot_parquet(obj["Key"])