In [5]:
import os

import pyarrow
import s3fs
import dotenv
import duckdb
# import pandas as pd
import polars as pl
from pyarrow.dataset import dataset
import pyarrow.parquet as pq
from deltalake import write_deltalake, DeltaTable

In [2]:
def load_s3_env(
		vars: list [str]
		):
	"""A function to check whether the s3 keys are set inside .env file
	Args:
		vars (list[str], optional): A list of required key.
		Defaults to REQUIRED_S3_KEYS.

	Raises:
		ValueError: Raise error when a key is missing

	Returns:
		_type_: None_
	"""
	dotenv.load_dotenv()

	for var in vars:
		if not os.getenv(var):
			raise ValueError(
					f"Required environment variables are not set correctly: {var}"
			)

	return None

In [7]:
def check_file_exists(
		bucket_name: str,
		file_name: str,
		key: str,
		secret: str
		) -> bool:
	"""Check if a file exists in S3 storage.

	Args:
		bucket_name (str): The name of the S3 bucket.
		file_name (str): The name of the file to check.
		key (str): access key to s3
		secret (str): secret access key to s3

	Returns:
		bool: True if the file exists, False otherwise.
	"""

	fs = s3fs.S3FileSystem(anon=True,
	                       key=key,
	                       secret=secret)
	return fs.exists(f's3://{bucket_name}/data/{file_name}')

In [8]:
def polars_ingestion():
	required_key = [
		"AWS_DEFAULT_REGION",
		"AWS_ACCESS_KEY_ID",
		"AWS_SECRET_ACCESS_KEY",
		"LOCAL_FILE_NAME",
		"S3_BUCKET",
	]

	load_s3_env(vars=required_key)
	s3_region = os.getenv('AWS_DEFAULT_REGION')
	s3_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
	s3_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
	s3_bucket_name = os.getenv("S3_BUCKET")
	s3_local_file_name = os.getenv("LOCAL_FILE_NAME")
	storage_options = {
		'aws_default_region': s3_region,
		'aws_access_key_id': s3_access_key_id,
		'aws_secret_access_key': s3_secret_access_key,
	}

	delta_table_path = "march_order"
	df = pl.scan_parquet(
				source=f"s3://{s3_bucket_name}/data/{s3_local_file_name}",
				storage_options=storage_options
		).collect()

	
	
	df.write_delta(
			target=delta_table_path,
			mode="overwrite",
			overwrite_schema=True
	)

	return None

In [3]:
def duckdb_ingestion():
	required_key = [
		"AWS_DEFAULT_REGION",
		"AWS_ACCESS_KEY_ID",
		"AWS_SECRET_ACCESS_KEY",
		"LOCAL_FILE_NAME",
		"S3_BUCKET",
	]

	load_s3_env(vars=required_key)
	s3_region = os.getenv('AWS_DEFAULT_REGION')
	s3_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
	s3_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
	s3_bucket_name = os.getenv("S3_BUCKET")
	s3_local_file_name = os.getenv("LOCAL_FILE_NAME")

	conn = duckdb.connect()

	sql = f"""
		INSTALL httpfs;
        LOAD httpfs;
        PRAGMA enable_optimizer;
        SET s3_region='{s3_region}';
		SET s3_access_key_id='{s3_access_key_id}';
		SET s3_secret_access_key='{s3_secret_access_key}';
		SELECT
			*
		FROM read_parquet('s3://{s3_bucket_name}/data/{s3_local_file_name}');
		"""
	df = conn.sql(query=sql).arrow()
	
	write_deltalake(
		data=df,
		table_or_uri="march_order",
		mode="overwrite",
		overwrite_schema=True
	)
	return None

In [19]:
def pyarrow_ingestion():
	required_key = [
		"AWS_DEFAULT_REGION",
		"AWS_ACCESS_KEY_ID",
		"AWS_SECRET_ACCESS_KEY",
		"LOCAL_FILE_NAME",
		"S3_BUCKET",
	]

	load_s3_env(vars=required_key)
	s3_region = os.getenv('AWS_DEFAULT_REGION')
	s3_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
	s3_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
	s3_bucket_name = os.getenv("S3_BUCKET")
	s3_local_file_name = os.getenv("LOCAL_FILE_NAME")
	
	cloudfs = s3fs.S3FileSystem(
			key=s3_access_key_id,
			secret=s3_secret_access_key,
	)
	
	arrow_df = dataset(
			source= f"s3://{s3_bucket_name}/data/{s3_local_file_name}",
			filesystem=cloudfs,
			format= 'parquet'
	).to_table()
	
	df = pl.from_arrow(arrow_df)

	delta_table_path = "march_order"
	
	df.write_delta(
			target=delta_table_path,
			mode="overwrite",
			overwrite_schema=True
	)
	
	return None
	


In [13]:
def pyarrow_parquet_ingestion_():
	required_key = [
		"AWS_DEFAULT_REGION",
		"AWS_ACCESS_KEY_ID",
		"AWS_SECRET_ACCESS_KEY",
		"LOCAL_FILE_NAME",
		"S3_BUCKET",
	]

	load_s3_env(vars=required_key)
	s3_region = os.getenv('AWS_DEFAULT_REGION')
	s3_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
	s3_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
	s3_bucket_name = os.getenv("S3_BUCKET")
	s3_local_file_name = os.getenv("LOCAL_FILE_NAME")

	cloudfs = s3fs.S3FileSystem(
			key=s3_access_key_id,
			secret=s3_secret_access_key,
	)
	
	with cloudfs.open(f"s3://{s3_bucket_name}/data/{s3_local_file_name}", "rb") as f:
		table = pq.read_table(f)
	
	# df = pl.from_arrow(table)
	# 
	# 
	# delta_table_path = "march_order"
	# 
	# df.write_delta(
	# 		target=delta_table_path,
	# 		mode="overwrite",
	# 		overwrite_schema=True
	# )
	write_deltalake(
		data=table,
		table_or_uri="march_order",
		mode="overwrite",
		overwrite_schema=True
	)

	return None



In [14]:
%%time 
pyarrow_parquet_ingestion_()

CPU times: user 9.23 s, sys: 4.29 s, total: 13.5 s
Wall time: 48.2 s


In [20]:
%%time 
pyarrow_ingestion()

CPU times: user 10.4 s, sys: 5.45 s, total: 15.8 s
Wall time: 1min 11s


In [11]:
%%time
polars_ingestion()

CPU times: user 21.3 s, sys: 4.09 s, total: 25.4 s
Wall time: 54.4 s


In [6]:
%%time
duckdb_ingestion()

CPU times: user 1min 43s, sys: 13.5 s, total: 1min 56s
Wall time: 2min 37s
