# Load a test dataset 
.. and put it into our datalake


In [53]:
import wget
import zipfile
import pandas as pd
import os
import shutil
import boto3
from dotenv import load_dotenv
import io

# load .env
load_dotenv()

# set variables
os.environ["AWS_CONFIG_FILE"] = "~/.aws/config_terraform"
os.environ["AWS_SHARED_CREDENTIALS_FILE"] = "~/.aws/cred_terraform"
aws_region = os.getenv('AWS_REGION')
aws_profile = os.getenv('AWS_TERRAFORM_USER')
PROJECT_NAME=os.getenv('PROJECT_NAME')
ITERATION=os.getenv('ITERATION')

In [54]:
# create boto3 session
aws_session = boto3.Session(profile_name=aws_profile)
s3_client = aws_session.client('s3')

In [55]:
# set variables

url = 'https://archive.ics.uci.edu/static/public/352/online+retail.zip'
folder_path = 'data'


In [56]:
# create empty folder
if os.path.exists(folder_path):
    shutil.rmtree(folder_path)
    print(f"Ordner '{folder_path}' wurde gelöscht.")
else:
    print(f"Ordner '{folder_path}' existiert nicht.")
os.makedirs(folder_path)
print(f"Ordner '{folder_path}' wurde erstellt.")

Ordner 'data' wurde gelöscht.
Ordner 'data' wurde erstellt.


In [57]:
# download the data
wget.download(url, 'data.zip')
with zipfile.ZipFile('data.zip', 'r') as zip_ref:
    zip_ref.extractall(path=folder_path)
os.remove('data.zip')

In [58]:
# load the xlsx file
file = os.listdir(folder_path)[0]
df = pd.read_excel(os.path.join(folder_path, file))



In [59]:
df.shape

(541909, 8)

In [60]:
df.dtypes

InvoiceNo              object
StockCode              object
Description            object
Quantity                int64
InvoiceDate    datetime64[ns]
UnitPrice             float64
CustomerID            float64
Country                object
dtype: object

In [None]:
# clean data
df_1 = df[df['InvoiceNo'].astype(str).str.isdigit()]
df_1.loc[:,'StockCode'] = df_1['StockCode'].fillna("").astype(str)
df_1.loc[:,'Description'] = df_1['Description'].fillna("").astype(str)

In [14]:
df_1.shape

(532618, 8)

In [None]:
# write file as parquet
new_file_name = f"{file.split('.')[0].replace(' ', '')}.parquet"
df_1.to_parquet(os.path.join(folder_path, new_file_name), engine="pyarrow", index=False)

In [21]:

output = pd.read_parquet(os.path.join(folder_path, new_file_name))


In [None]:
# upload parquet file to s3
bucket = f"{PROJECT_NAME}-i{ITERATION}-data-lake"
key = f"staging/{new_file_name}"


# DataFrame in einen Bytes-Buffer als Parquet schreiben
buffer = io.BytesIO()
df_1.to_parquet(buffer, engine='pyarrow', index=False)


# Upload
s3_client.put_object(Bucket=bucket, Key=key, Body=buffer.getvalue())

print(f"File uploaded to s3://{bucket}/{key}")

File uploaded to s3://lenico-dwh-i1-data-lake/staging/OnlineRetail.parquet


In [16]:

df_1.dtypes
    

InvoiceNo              object
StockCode              object
Description            object
Quantity                int64
InvoiceDate    datetime64[ns]
UnitPrice             float64
CustomerID            float64
Country                object
dtype: object

In [None]:
print(df[df['InvoiceDate'] > pd.Timestamp.now()])

Empty DataFrame
Columns: [InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country]
Index: []


In [18]:
print(df['InvoiceDate'].max())


2011-12-09 12:50:00


In [28]:
df['InvoiceDate'] = df['InvoiceDate'].astype('string')


In [20]:
# Prüfen Sie, ob es Werte mit mehr als 13 Stellen gibt (Nanosekunden):
invalid_timestamps = df[df['InvoiceDate'] > pd.Timestamp('3000-01-01')]
print(invalid_timestamps)

Empty DataFrame
Columns: [InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country]
Index: []
