## Lab 3: Introduction to AWS Wrangler

#### Load Credentials

In [1]:
# load your credentials
from dotenv import load_dotenv
load_dotenv()

True

In [3]:
# import awswrangler
import awswrangler as wr

#### Making sure it works

In [4]:
# test if you can read from a private bucket

df = wr.s3.read_csv('s3://techcatalyst-raw/stocks/GOOG.csv')
df.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume
0,1/2/2025 16:00:00,191.49,193.2,188.71,190.63,17545162
1,1/3/2025 16:00:00,192.73,194.5,191.35,193.13,12874957
2,1/6/2025 16:00:00,195.15,199.56,195.06,197.96,19483323
3,1/7/2025 16:00:00,198.27,202.14,195.94,196.71,16966760
4,1/8/2025 16:00:00,193.95,197.64,193.75,195.39,14335341


In [5]:
# the returned object is actually a pandas DataFrame
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 140 entries, 0 to 139
Data columns (total 6 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   Date    140 non-null    object 
 1   Open    140 non-null    float64
 2   High    140 non-null    float64
 3   Low     140 non-null    float64
 4   Close   140 non-null    float64
 5   Volume  140 non-null    int64  
dtypes: float64(4), int64(1), object(1)
memory usage: 6.7+ KB


#### Inspect Glue databases

In [6]:
databases = wr.catalog.databases()
print(databases)

               Database       Description
0           alexia_logs                  
1           alexia_song                  
2      awswrangler_test                  
3                ben_db                  
4              ben_song                  
5              ben_taxi                  
6           blake_wr_db                  
7               default  default database
8               emma_db                  
9            jaden_taxi                  
10        jadenastle_db                  
11         melissa_logs                  
12        melissa_songs                  
13           michael_db                  
14                my_db                  
15         shaswat_logs                  
16         shaswat_song                  
17          suchitha_db                  
18            tatwan_db                  
19  tatwan_inclass_demo                  
20          tatwan_taxi                  


#### Creating a database

In [8]:
name = 'fabiola'
database_name = f"{name}_db"
wr.catalog.create_database(database_name)

In [None]:
# confirm new database
databases = wr.catalog.databases()
print(databases)

               Database       Description
0           alexia_logs                  
1           alexia_song                  
2      awswrangler_test                  
3                ben_db                  
4              ben_song                  
5              ben_taxi                  
6           blake_wr_db                  
7               default  default database
8               emma_db                  
9            fabiola_db                  
10           jaden_taxi                  
11        jadenastle_db                  
12         melissa_logs                  
13        melissa_songs                  
14           michael_db                  
15                my_db                  
16         shaswat_logs                  
17         shaswat_song                  
18          suchitha_db                  
19            tatwan_db                  
20  tatwan_inclass_demo                  
21          tatwan_taxi                  


In [None]:
# check tables in new database
wr.catalog.tables(database=database_name)

Unnamed: 0,Database,Table,Description,TableType,Columns,Partitions


#### Writing to database

In [None]:
# write parquet file to database
wr.s3.to_parquet(
    df=df, # the DataFrame you just created 
    path=f"s3://techcatalyst-raw/{name}/", # write to the techcatalyst-raw bucket under your folder name (or it would create a new folder if it does not exist)
    dataset=True, 
    database=database_name, # the name of the database you just created in AWS Glue 
    table= 'fabiola_stock', # pick a table name for example YOURNAME_STOCK
    mode='overwrite'
    )

{'paths': ['s3://techcatalyst-raw/fabiola/2903486ce2004d54b6ec239b0b4b3ff9.snappy.parquet'],
 'partitions_values': {}}

In [None]:
# verify
wr.catalog.tables(name_contains="stock")

Unnamed: 0,Database,Table,Description,TableType,Columns,Partitions
0,ben_db,ben_stock,This is my stock table.,EXTERNAL_TABLE,"date, open, high, low, close, volume",
1,blake_wr_db,blake_stock,This is my stock table.,EXTERNAL_TABLE,"date, open, high, low, close, volume",
2,emma_db,emma_stock,,EXTERNAL_TABLE,"date, open, high, low, close, volume",
3,fabiola_db,fabiola_stock,,EXTERNAL_TABLE,"date, open, high, low, close, volume",
4,jadenastle_db,jaden_stock,This is my stock table.,EXTERNAL_TABLE,"date, open, high, low, close, volume",
5,michael_db,michael_stock,,EXTERNAL_TABLE,"date, open, high, low, close, volume",
6,shaswat_db,shaswat_stock,,EXTERNAL_TABLE,"date, open, high, low, close, volume",
7,suchitha_db,suchitha_stock,,EXTERNAL_TABLE,"date, open, high, low, close, volume",
8,tatwan_db,tatwan_stock,This is my stock table.,EXTERNAL_TABLE,"date, open, high, low, close, volume",
9,tatwan_inclass_demo,tatwan_goog_stock,,EXTERNAL_TABLE,"date, open, high, low, close, volume",


#### Read table as df from database

In [16]:
df = wr.s3.read_parquet_table(database=database_name, 
                              table='fabiola_stock')

# Display the DataFrame's first few rows
df.head()

Unnamed: 0,date,open,high,low,close,volume
0,1/2/2025 16:00:00,191.49,193.2,188.71,190.63,17545162
1,1/3/2025 16:00:00,192.73,194.5,191.35,193.13,12874957
2,1/6/2025 16:00:00,195.15,199.56,195.06,197.96,19483323
3,1/7/2025 16:00:00,198.27,202.14,195.94,196.71,16966760
4,1/8/2025 16:00:00,193.95,197.64,193.75,195.39,14335341


In [17]:
# column types
wr.catalog.get_table_types(database=database_name, 
                           table='fabiola_stock')

{'date': 'string',
 'open': 'double',
 'high': 'double',
 'low': 'double',
 'close': 'double',
 'volume': 'bigint'}

In [18]:
# glue metadata
table_details = wr.catalog.get_tables(database=database_name)

next(table_details)

{'Name': 'fabiola_stock',
 'DatabaseName': 'fabiola_db',
 'CreateTime': datetime.datetime(2025, 8, 4, 19, 50, 49, tzinfo=tzlocal()),
 'UpdateTime': datetime.datetime(2025, 8, 4, 19, 50, 49, tzinfo=tzlocal()),
 'Retention': 0,
 'StorageDescriptor': {'Columns': [{'Name': 'date', 'Type': 'string'},
   {'Name': 'open', 'Type': 'double'},
   {'Name': 'high', 'Type': 'double'},
   {'Name': 'low', 'Type': 'double'},
   {'Name': 'close', 'Type': 'double'},
   {'Name': 'volume', 'Type': 'bigint'}],
  'Location': 's3://techcatalyst-raw/fabiola/',
  'InputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat',
  'OutputFormat': 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat',
  'Compressed': True,
  'NumberOfBuckets': -1,
  'SerdeInfo': {'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe',
   'Parameters': {'serialization.format': '1'}},
  'BucketColumns': [],
  'SortColumns': [],
  'Parameters': {'CrawlerSchemaDeserializerVersi

In [24]:
# Example adding additional metadata information 

desc = "This is my stock table."
param = {"source": "Google", "class": "stock"}
comments = {
    "Date": "Trading Date",
    "Open": "Opening Price",
    "Close": "Closing Price"
}

wr.s3.to_parquet(
    df=df,
    path=f"s3://techcatalyst-raw/{name}/",
    dataset=True,
    database=database_name,
    table='fabiola_stock',
    mode='overwrite',
    glue_table_settings=wr.typing.GlueTableSettings(description=desc,  # here we are passing some metadata
                                                    parameters=param, 
                                                    columns_comments=comments),
    )

{'paths': ['s3://techcatalyst-raw/fabiola/a3a03886216d40279ccfca68cc237524.snappy.parquet'],
 'partitions_values': {}}

In [25]:
wr.catalog.table(database=database_name, table='fabiola_stock')

Unnamed: 0,Column Name,Type,Partition,Comment
0,date,string,False,Trading Date
1,open,double,False,Opening Price
2,high,double,False,
3,low,double,False,
4,close,double,False,Closing Price
5,volume,bigint,False,


#### List objects in bucket

In [26]:
wr.s3.list_objects('s3://techcatalyst-raw/stage/')

['s3://techcatalyst-raw/stage/yellow_tripdata.csv',
 's3://techcatalyst-raw/stage/yellow_tripdata.json',
 's3://techcatalyst-raw/stage/yellow_tripdata.parquet']

#### Download file from S3

In [30]:
wr.s3.download(path='s3://techcatalyst-raw/stocks/GOOG.csv', 
               local_file='wr_GOOG.csv')

#### Upload file to S3

In [32]:
your_name = 'fabiola'
file_name = 'wr_GOOG.csv'
wr.s3.upload(local_file='wr_GOOG.csv',path= f's3://techcatalyst-raw/{your_name}/uploads/{file_name}')

In [33]:
wr.s3.list_objects(f's3://techcatalyst-raw/{your_name}/uploads/')

['s3://techcatalyst-raw/fabiola/uploads/wr_GOOG.csv']

### Exercise (using Glue Catalog and Athena)

#### Define Configuration

In [46]:
# A name for the new Glue Database it should be YOURNAME_TAXI
db_name = 'fabiola_taxi'

# A name for the table it should be YOURNAME_TRIPDATA
table_name = 'fabiola_tripdata'

# the path_direcoty to should point to the bucket (main directory)
s3_path_directory = 's3://techcatalyst-raw/taxi_data/'

# The path_file should be the full path to the actual file
s3_path_file = 's3://techcatalyst-raw/taxi_data/8d12668ed190477fa03a29a0a5d50e84_000000_000000.snappy.parquet'


#### Get the Schema from the Parquet File Metadata

In [47]:
# uncomment below if you ran into issues to clean things up and rerun the cell
wr.catalog.delete_table_if_exists(database=db_name, table=table_name) 

# Create the new Glue database first based on the db_name you created
#wr.catalog.create_database(db_name)

# This function can extract the schema from our file and returns a tuple: (schema, partitions). We only need the schema. 
columns_types, partitions_types = wr.s3.read_parquet_metadata(path=s3_path_file)
print("Successfully read schema from Parquet file.")

Successfully read schema from Parquet file.


#### Create the Glue Table with the Explicit Schema

In [48]:

wr.catalog.create_parquet_table(
    database=db_name, # pass the database name
    table=table_name, # pass the table name
    path=s3_path_directory, # use the directoy here 
    columns_types=columns_types,  # Pass the schema here
    partitions_types=partitions_types
)
print(f"Table '{table_name}' created successfully in database '{db_name}'.")

Table 'fabiola_tripdata' created successfully in database 'fabiola_taxi'.


In [None]:
query = f"SELECT * FROM {table_name} LIMIT 5"

df = wr.athena.read_sql_query(query, database=db_name)

print("\nQuery Results:")
print(df)