#### Analysis Table:

| **Method**         | **Purpose** | **Best Use Cases** | **Pros** | **Cons** |
| ------------------ | ----------- | ------------------ | -------- | -------- |
| `upload_file`      | Upload file to an S3 bucket. | large-file parallel uploading | efficient handling of large files | less advanced configurations / no ACLs |
| `upload_fileobj`   | Uploads a file-like object to S3 | file-like object uploading directly to S3 | managed transfer / can use ACLs | Can only be opened in binary mode not text mode |
| `put_object`       | add an object to an S3 bucket | Uploading files with specific configurations | Can use ACLs / Can add metadata | No support for multipart uploads / file must be < 5gb  |
| `download_file`    | downloads an object and saves to file | When you want to save an object locally | Support extra arguments and callback parameters  | does not support downloading full buckets         |
| `download_fileobj` | downloads a file like object to a file | When you want to save a file-like object locally | can handle file like objects | Can only be opened in binary mode |
| `get_object`       | retrieve an object from S3 | To view an object in a bucket using a key | faster than manual file handling | complex to implement and manage |

#### Reflection Questions:

1. **Upload Methods**:
   - What are the key differences between `upload_file`, `upload_fileobj`, and `put_object`?

    upload file is efficient at handling large files due to multipart uploads, upload_fileobj can only handle file-type ojects in binary and does not support large files < 5gb, put_object can upload files with specific configs but also struggles with larger files. 
   - When would you choose to use `put_object` over `upload_file` or `upload_fileobj`?
   
    I would choose put_object for when I want to add an object to S3 with specific configurations, I would use upload_file for files larger than 5gb, and I would use upload_filobj for files that needed to be read in binary format. 
2. **Download Methods**:
   - How does `download_file` differ from `download_fileobj` and `get_object`?
   - In what scenarios would `get_object` be more beneficial than `download_file`?
3. **Efficiency and Performance**:
   - How do multipart uploads and downloads enhance the performance of file transfer operations?
   - What are the limitations of using `put_object` and `get_object` for large files?
4. **Practical Applications**:
   - Consider a scenario where you need to upload a large video file to S3. Which method would you use and why?
   - If you need to process data in memory before saving it locally, which download method would be most suitable?

---

# Lab 2: Exploring AWS Boto3

### Load your credentials

In [1]:
from dotenv import load_dotenv
load_dotenv() 

True

In [2]:
import boto3
import os

s3_client = boto3.client('s3')


In [8]:
buckets = s3_client.list_buckets()
for bucket in buckets['Buckets']:
    if 'techcatalyst' in bucket['Name']:
        print(bucket['Name'])

capstone-techcatalyst-conformed
capstone-techcatalyst-raw
capstone-techcatalyst-transformed
techcatalyst-public
techcatalyst-raw
techcatalyst-transformed


In [10]:
# list objects in a specific bucket "techcatalyst-raw" 
bucket_name = 'techcatalyst-raw'
objects = s3_client.list_objects_v2(Bucket=bucket_name)
for obj in objects.get('Contents', []):
    print(obj['Key'])

BLAKE/test_export.parquet
BLAKE/upload_file_method_GOOG.csv
BLAKE/upload_fileobj_method.txt
Ben/Million_Songs/
Ben/bingchilling.txt
Ben/gooooog.csv
Ben/parquetGoogleStock/2c8da4b4fca54ff9b2e97ab78360ace4_000000_000000.snappy.parquet
Ben/parquetGoogleStock/2c8da4b4fca54ff9b2e97ab78360ace4_000001_000000.snappy.parquet
Ben/parquetGoogleStock/2c8da4b4fca54ff9b2e97ab78360ace4_000002_000000.snappy.parquet
Ben/parquetGoogleStock/2c8da4b4fca54ff9b2e97ab78360ace4_000003_000000.snappy.parquet
Ben/parquetGoogleStock/2c8da4b4fca54ff9b2e97ab78360ace4_000004_000000.snappy.parquet
Ben/parquetGoogleStock/2c8da4b4fca54ff9b2e97ab78360ace4_000005_000000.snappy.parquet
Ben/parquetGoogleStock/2c8da4b4fca54ff9b2e97ab78360ace4_000006_000000.snappy.parquet
Ben/parquetGoogleStock/2c8da4b4fca54ff9b2e97ab78360ace4_000007_000000.snappy.parquet
Ben/parquetGoogleStock/2c8da4b4fca54ff9b2e97ab78360ace4_000008_000000.snappy.parquet
Ben/parquetGoogleStock/2c8da4b4fca54ff9b2e97ab78360ace4_000009_000000.snappy.parquet
Be

In [13]:
# list objects that are CSV in a specific bucket "techcatalyst-raw" 
for obj in objects.get('Contents', []):
    if (obj['Key']).endswith('.csv'):
        print(obj['Key'])

BLAKE/upload_file_method_GOOG.csv
Ben/gooooog.csv
accidents/accidents_2017_to_2023_english.csv
fabiola/fabiola_GOOG.csv
stage/yellow_tripdata.csv
stocks/GOOG.csv
tatwan/GOOG.csv
tatwan/GOOG_NEW.csv


### Downloading an objects using `download_file`

In [15]:
s3_client.download_file(Bucket='techcatalyst-raw',  # from which bucket
                        Key='stocks/GOOG.csv',
                        Filename='emma_goog.csv') # Filename is what you want to call it once it is downloaded

### Downloading an objects using `download_fileobj`

In [17]:
import io
io_temp = io.BytesIO()
temp = s3_client.download_fileobj(Bucket='techcatalyst-raw',  # from which bucket
                                Key='stocks/GOOG.csv',
                                Fileobj=io_temp) # pass th io.BytesIO object

In [18]:
print(io_temp.getvalue()[:100])

b'Date,Open,High,Low,Close,Volume\r\n1/2/2025 16:00:00,191.49,193.2,188.71,190.63,17545162\r\n1/3/2025 16:'


In [21]:
next(io_temp)

b'Date,Open,High,Low,Close,Volume\r\n'

In [20]:
io_temp.seek(0) 

0

In [22]:
with open('google_stock_downloaded.csv', 'wb') as f:
    f.write(io_temp.getvalue())

### Uploading a local file using `upload_file`

In [23]:
s3_client.upload_file(Filename='emma_goog.csv', # local file name
                      Bucket='techcatalyst-raw', # the bucket target
                      Key='EMMA/emna_goog.csv') # destination name, make sure it include YOURNAME/ANY_FILE_NAME.csv

### Uploading using `upload_fileobj`

In [24]:
in_memory_file = io.BytesIO(b"Uploading in memeory file!")
s3_client.upload_fileobj(Fileobj=io_temp,
                          Bucket='techcatalyst-raw', 
                          Key='EMMA/emna_goog.txt') # destination name, make sure it include YOURNAME/ANY_FILE_NAME.txt

### List all the objects in your bucket/username

In [28]:
objects = s3_client.list_objects_v2(Bucket='techcatalyst-raw', Prefix='EMMA/')
objects.keys()
objects['Contents']

[{'Key': 'EMMA/emna_goog.csv',
  'LastModified': datetime.datetime(2025, 8, 4, 19, 43, 46, tzinfo=tzlocal()),
  'ETag': '"8cbbdc687ee45f1fe58a522e16d423c2"',
  'ChecksumAlgorithm': ['CRC32'],
  'ChecksumType': 'FULL_OBJECT',
  'Size': 7889,
  'StorageClass': 'STANDARD'},
 {'Key': 'EMMA/emna_goog.txt',
  'LastModified': datetime.datetime(2025, 8, 4, 19, 46, 32, tzinfo=tzlocal()),
  'ETag': '"5dedeaccf30c891106b85b31cfa718ea"',
  'ChecksumAlgorithm': ['CRC32'],
  'ChecksumType': 'FULL_OBJECT',
  'Size': 7856,
  'StorageClass': 'STANDARD'},
 {'Key': 'EMMA/test_export.parquet',
  'LastModified': datetime.datetime(2025, 7, 25, 15, 23, 16, tzinfo=tzlocal()),
  'ETag': '"3c8cff84500ad4c4c85361e38dce05ed-1"',
  'ChecksumAlgorithm': ['CRC64NVME'],
  'ChecksumType': 'FULL_OBJECT',
  'Size': 8393,
  'StorageClass': 'STANDARD'}]

# Lab 3: Introduction to AWS Wrangler

In [30]:
!pip install awswrangler



In [7]:
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
import awswrangler as wr

In [3]:
df = wr.s3.read_csv('s3://techcatalyst-raw/stocks/GOOG.csv')
df.head()

2025-08-05 14:19:56,690	INFO worker.py:1927 -- Started a local Ray instance.
2025-08-05 14:20:02,447	INFO logging.py:295 -- Registered dataset logger for dataset dataset_1_0
2025-08-05 14:20:02,496	INFO streaming_executor.py:117 -- Starting execution of Dataset dataset_1_0. Full logs are in /tmp/ray/session_2025-08-05_14-19-46_902452_2542/logs/ray-data
2025-08-05 14:20:02,498	INFO streaming_executor.py:118 -- Execution plan of Dataset dataset_1_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadArrowCSV] -> AggregateNumRows[AggregateNumRows]


[dataset]: Run `pip install tqdm` to enable progress reporting.


2025-08-05 14:20:04,644	INFO streaming_executor.py:231 -- ✔️  Dataset dataset_1_0 execution finished in 2.15 seconds
2025-08-05 14:20:04,689	INFO logging.py:295 -- Registered dataset logger for dataset dataset_0_0
2025-08-05 14:20:04,707	INFO streaming_executor.py:117 -- Starting execution of Dataset dataset_0_0. Full logs are in /tmp/ray/session_2025-08-05_14-19-46_902452_2542/logs/ray-data
2025-08-05 14:20:04,710	INFO streaming_executor.py:118 -- Execution plan of Dataset dataset_0_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadArrowCSV]
2025-08-05 14:20:05,330	INFO streaming_executor.py:231 -- ✔️  Dataset dataset_0_0 execution finished in 0.62 seconds


Unnamed: 0,Date,Open,High,Low,Close,Volume
0,1/2/2025 16:00:00,191.49,193.2,188.71,190.63,17545162
1,1/3/2025 16:00:00,192.73,194.5,191.35,193.13,12874957
2,1/6/2025 16:00:00,195.15,199.56,195.06,197.96,19483323
3,1/7/2025 16:00:00,198.27,202.14,195.94,196.71,16966760
4,1/8/2025 16:00:00,193.95,197.64,193.75,195.39,14335341


In [4]:
df.info()

<class 'modin.pandas.dataframe.DataFrame'>
RangeIndex: 140 entries, 0 to 139
Data columns (total 6 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   Date    140 non-null    object 
 1   Open    140 non-null    float64
 2   High    140 non-null    float64
 3   Low     140 non-null    float64
 4   Close   140 non-null    float64
 5   Volume  140 non-null    int64  
dtypes: float64(4), int64(1), object(1)
memory usage: 6.7+ KB


In [35]:
databases = wr.catalog.databases()
print(databases)

               Database       Description
0           alexia_logs                  
1           alexia_song                  
2      awswrangler_test                  
3                ben_db                  
4              ben_song                  
5              ben_taxi                  
6            blake_taxi                  
7           blake_wr_db                  
8               default  default database
9               emma_db                  
10           fabiola_db                  
11           jaden_taxi                  
12        jadenastle_db                  
13         melissa_logs                  
14        melissa_songs                  
15           michael_db                  
16                my_db                  
17           shaswat_db                  
18         shaswat_logs                  
19         shaswat_song                  
20          suchitha_db                  
21            tatwan_db                  
22  tatwan_inclass_demo           

In [36]:
name = 'emma'
database_name = f"{name}_db"
wr.catalog.tables(database=database_name)

Unnamed: 0,Database,Table,Description,TableType,Columns,Partitions
0,emma_db,emma_stock,,EXTERNAL_TABLE,"date, open, high, low, close, volume",


In [37]:
wr.s3.to_parquet(
    df=df, # the DataFrame you just created 
    path=f"s3://techcatalyst-raw/{name}/", # write to the techcatalyst-raw bucket under your folder name (or it would create a new folder if it does not exist)
    dataset=True, 
    database='emma_db', # the name of the database you just created in AWS Glue 
    table= 'emma_stock', # pick a table name for example YOURNAME_STOCK
    mode='overwrite'
    )

2025-08-04 20:03:50,256	INFO logging.py:295 -- Registered dataset logger for dataset dataset_2_0
2025-08-04 20:03:50,879	INFO logging.py:295 -- Registered dataset logger for dataset dataset_3_0
2025-08-04 20:03:51,836	INFO logging.py:295 -- Registered dataset logger for dataset dataset_6_0
2025-08-04 20:03:51,840	INFO streaming_executor.py:117 -- Starting execution of Dataset dataset_6_0. Full logs are in /tmp/ray/session_2025-08-04_20-00-04_029478_48442/logs/ray-data
2025-08-04 20:03:51,842	INFO streaming_executor.py:118 -- Execution plan of Dataset dataset_6_0: InputDataBuffer[Input] -> TaskPoolMapOperator[Write]
2025-08-04 20:03:53,038	INFO streaming_executor.py:231 -- ✔️  Dataset dataset_6_0 execution finished in 1.20 seconds
2025-08-04 20:03:53,062	INFO dataset.py:4619 -- Data sink ArrowParquet finished. 140 rows and 15.1KB data written.


{'paths': ['s3://techcatalyst-raw/emma/cdfabb0fcbff483cb6906ce39afc203a_000000_000000.snappy.parquet',
  's3://techcatalyst-raw/emma/cdfabb0fcbff483cb6906ce39afc203a_000001_000000.snappy.parquet',
  's3://techcatalyst-raw/emma/cdfabb0fcbff483cb6906ce39afc203a_000002_000000.snappy.parquet',
  's3://techcatalyst-raw/emma/cdfabb0fcbff483cb6906ce39afc203a_000003_000000.snappy.parquet'],
 'partitions_values': {}}

In [38]:
wr.catalog.tables(name_contains="stock")

Unnamed: 0,Database,Table,Description,TableType,Columns,Partitions
0,ben_db,ben_stock,This is my stock table.,EXTERNAL_TABLE,"date, open, high, low, close, volume",
1,blake_wr_db,blake_stock,This is my stock table.,EXTERNAL_TABLE,"date, open, high, low, close, volume",
2,emma_db,emma_stock,,EXTERNAL_TABLE,"date, open, high, low, close, volume",
3,fabiola_db,fabiola_stock,This is my stock table.,EXTERNAL_TABLE,"date, open, high, low, close, volume",
4,jadenastle_db,jaden_stock,This is my stock table.,EXTERNAL_TABLE,"date, open, high, low, close, volume",
5,michael_db,michael_stock,,EXTERNAL_TABLE,"date, open, high, low, close, volume",
6,shaswat_db,shaswat_stock,This is my stock table.,EXTERNAL_TABLE,"date, open, high, low, close, volume",
7,suchitha_db,suchitha_stock,This is my stock table.,EXTERNAL_TABLE,"date, open, high, low, close, volume",
8,tatwan_db,tatwan_stock,This is my stock table.,EXTERNAL_TABLE,"date, open, high, low, close, volume",
9,tatwan_inclass_demo,tatwan_goog_stock,,EXTERNAL_TABLE,"date, open, high, low, close, volume",


In [None]:
import pyarrow
df = wr.s3.read_parquet_table(database='emma_db',
                               table='emma_stock')

In [41]:
wr.catalog.get_table_types(database='emma_db',
                               table='emma_stock')

{'date': 'string',
 'open': 'double',
 'high': 'double',
 'low': 'double',
 'close': 'double',
 'volume': 'bigint'}

In [42]:
table_details = wr.catalog.get_tables(database='emma_db')

In [43]:
desc = "This is my stock table."
param = {"source": "Google", "class": "stock"}
comments = {
    "Date": "Trading Date",
    "Open": "Opening Price",
    "Close": "Closing Price"
}


In [47]:
wr.s3.to_parquet(
    df=df,
    path='s3://techcatalyst-raw/emma/',
    dataset=True,
    database='emma_db',
    table='emma_stock',
    mode='overwrite',
    glue_table_settings=wr.typing.GlueTableSettings(description=desc,  # here we are passing some metadata
                                                    parameters=param, 
                                                    columns_comments=comments),
    )

2025-08-04 20:13:10,330	INFO logging.py:295 -- Registered dataset logger for dataset dataset_8_0
2025-08-04 20:13:10,461	INFO logging.py:295 -- Registered dataset logger for dataset dataset_9_0
2025-08-04 20:13:11,414	INFO logging.py:295 -- Registered dataset logger for dataset dataset_12_0
2025-08-04 20:13:11,418	INFO streaming_executor.py:117 -- Starting execution of Dataset dataset_12_0. Full logs are in /tmp/ray/session_2025-08-04_20-00-04_029478_48442/logs/ray-data
2025-08-04 20:13:11,422	INFO streaming_executor.py:118 -- Execution plan of Dataset dataset_12_0: InputDataBuffer[Input] -> TaskPoolMapOperator[Write]
2025-08-04 20:13:12,617	INFO streaming_executor.py:231 -- ✔️  Dataset dataset_12_0 execution finished in 1.20 seconds
2025-08-04 20:13:12,629	INFO dataset.py:4619 -- Data sink ArrowParquet finished. 140 rows and 15.1KB data written.


{'paths': ['s3://techcatalyst-raw/emma/3a1b7394faa84991a5e7700c963af73f_000001_000000.snappy.parquet',
  's3://techcatalyst-raw/emma/3a1b7394faa84991a5e7700c963af73f_000000_000000.snappy.parquet',
  's3://techcatalyst-raw/emma/3a1b7394faa84991a5e7700c963af73f_000002_000000.snappy.parquet',
  's3://techcatalyst-raw/emma/3a1b7394faa84991a5e7700c963af73f_000003_000000.snappy.parquet'],
 'partitions_values': {}}

In [48]:
wr.catalog.table(database='emma_db', table='emma_stock')

Unnamed: 0,Column Name,Type,Partition,Comment
0,date,string,False,Trading Date
1,open,double,False,Opening Price
2,high,double,False,
3,low,double,False,
4,close,double,False,Closing Price
5,volume,bigint,False,


In [49]:
wr.s3.list_objects('s3://techcatalyst-raw/emma/')

['s3://techcatalyst-raw/emma/3a1b7394faa84991a5e7700c963af73f_000000_000000.snappy.parquet',
 's3://techcatalyst-raw/emma/3a1b7394faa84991a5e7700c963af73f_000001_000000.snappy.parquet',
 's3://techcatalyst-raw/emma/3a1b7394faa84991a5e7700c963af73f_000002_000000.snappy.parquet',
 's3://techcatalyst-raw/emma/3a1b7394faa84991a5e7700c963af73f_000003_000000.snappy.parquet']

In [50]:
wr.s3.download(path='s3://techcatalyst-raw/stocks/GOOG.csv', 
               local_file='./new_file.csv')

In [51]:
your_name = 'emma'
file_name = 'wr_emma_stock'
wr.s3.upload(local_file='new_file.csv',path= f's3://techcatalyst-raw/{your_name}/uploads/{file_name}')

In [52]:
wr.s3.list_objects(f's3://techcatalyst-raw/{your_name}/uploads/')

['s3://techcatalyst-raw/emma/uploads/wr_emma_stock']

### Exercise (using Glue Catalog and Athena)

In [59]:
db_name = 'emma_taxi'
table_name = 'emma_tripdata'
s3_path_directory = 's3://techcatalyst-raw/'
s3_path_file = 's3://techcatalyst-raw/taxi_data/'

In [61]:
wr.catalog.delete_table_if_exists(database=db_name, table=table_name) 

True

In [55]:

wr.catalog.create_database(db_name)

In [57]:
wr.s3.to_parquet(
    df=df, # the DataFrame you just created 
    path=f"{s3_path_directory}{s3_path_file}", # write to the techcatalyst-raw bucket under your folder name (or it would create a new folder if it does not exist)
    dataset=True, 
    database=db_name, # the name of the database you just created in AWS Glue 
    table=table_name, # pick a table name for example YOURNAME_STOCK
    mode='overwrite'
    )

2025-08-04 20:23:45,457	INFO logging.py:295 -- Registered dataset logger for dataset dataset_14_0
2025-08-04 20:23:45,544	INFO logging.py:295 -- Registered dataset logger for dataset dataset_15_0
2025-08-04 20:23:46,135	INFO logging.py:295 -- Registered dataset logger for dataset dataset_18_0
2025-08-04 20:23:46,137	INFO streaming_executor.py:117 -- Starting execution of Dataset dataset_18_0. Full logs are in /tmp/ray/session_2025-08-04_20-00-04_029478_48442/logs/ray-data
2025-08-04 20:23:46,138	INFO streaming_executor.py:118 -- Execution plan of Dataset dataset_18_0: InputDataBuffer[Input] -> TaskPoolMapOperator[Write]
2025-08-04 20:23:46,887	INFO streaming_executor.py:231 -- ✔️  Dataset dataset_18_0 execution finished in 0.75 seconds
2025-08-04 20:23:46,901	INFO dataset.py:4619 -- Data sink ArrowParquet finished. 140 rows and 15.1KB data written.


{'paths': ['s3://techcatalyst-raw/taxi_data/8d12668ed190477fa03a29a0a5d50e84_000001_000000.snappy.parquet',
  's3://techcatalyst-raw/taxi_data/8d12668ed190477fa03a29a0a5d50e84_000000_000000.snappy.parquet',
  's3://techcatalyst-raw/taxi_data/8d12668ed190477fa03a29a0a5d50e84_000002_000000.snappy.parquet',
  's3://techcatalyst-raw/taxi_data/8d12668ed190477fa03a29a0a5d50e84_000003_000000.snappy.parquet'],
 'partitions_values': {}}

In [60]:
columns_types, partitions_types = wr.s3.read_parquet_metadata(path=s3_path_file)

In [64]:

wr.catalog.create_parquet_table(
    database=db_name, # pass the database name
    table=table_name, # pass the table name
    path='s3://techcatalyst-raw/emma', # use the directoy here 
    columns_types=columns_types,  # Pass the schema here
    partitions_types=partitions_types
)
print(f"Table '{table_name}' created successfully in database '{db_name}'.")

Table 'emma_tripdata' created successfully in database 'emma_taxi'.


In [None]:
query = f"SELECT * FROM {table_name} LIMIT 5"

df = wr.athena.read_sql_query(query, database=db_name)

print("\nQuery Results:")
print(df)

### Lambda Function Testing

In [15]:
s3_client.upload_file(Filename='test.csv', # local file name
                      Bucket='techcatalyst-public', # the bucket target
                      Key='emma/test.csv') # destination name, make sure it include YOURNAME/ANY_FILE_NAME.csv

In [22]:
wr.s3.upload(local_file='test.csv',path= f's3://techcatalyst-public/emma/test.csv')

### Automating Snowpipe for Amazon S3

#### Made a Snowpipe that ingests a parquet file into the stage in snowflake. Used am ARN to connect.



## Final

#### Bringing it all together

* End to end solution diagram 

!architecturediagram.png

* Use cases for this solution?
    * This pipeline could be used to ingest raw csv data into AWS for storage (Datalake), then the transformed parquet file can be ingested into snowflake for analysis (ETL Pipeline)
* 