# Todo:
- write in batches
- parallelized

In [5]:
import boto3
import pandas as pd
from decimal import Decimal

# === CONFIG ===
bucket = "osiolek-data-tmp-bucket"
profile = "artur-admin"

# Mapping: filename → table name
file_table_map = {
    #"AKT_STAN_MAG.csv": "akt_stan_mag",
    #"CENY_TOWAROW.csv": "ceny_towarow",
    #"KLASY_TOWAROW.csv": "klasy_towarow",
    #"TOWARY.csv": "towary",
    "MAGAZYNY.csv": "magazyny"
}

# Mapping: table name → hash key
hash_key_map = {
    #"akt_stan_mag": "ID_TOWARU",
    #"ceny_towarow": "ID_TOWARU",
    #"klasy_towarow": "ID_KLASY",
    #"towary": "ID_TOWARU",
    "magazyny": "ID_MAGAZYNU",
}

# === AWS SESSION ===
session = boto3.Session(profile_name=profile)
s3 = session.client("s3")
dynamodb = session.resource("dynamodb")

# === MAIN LOOP ===
for file_name, table_name in file_table_map.items():
    hash_key = hash_key_map[table_name]
    print(f"\n🔄 Processing s3://{bucket}/{file_name} → DynamoDB: {table_name} (hash key: {hash_key})")

    try:
        # Read file from S3
        obj = s3.get_object(Bucket=bucket, Key=file_name)
        df = pd.read_csv(obj["Body"])

        # Deduplicate by hash key
        if hash_key in df.columns:
            df = df.drop_duplicates(subset=[hash_key])
        else:
            raise ValueError(f"❌ Column '{hash_key}' not found in {file_name}")

        if file_name in ["TOWARY.csv"]:
            df["DATA_MODYFIKACJI"] = pd.to_datetime(df["DATA_MODYFIKACJI"])
            df.sort_values("DATA_MODYFIKACJI", ascending=False, inplace=True)
            df = df.drop_duplicates(subset=["ID_TOWARU"], keep="first")
            # Convert datetime back to ISO format strings (important for sorting in DynamoDB)
            df["DATA_MODYFIKACJI"] = df["DATA_MODYFIKACJI"].dt.strftime("%Y-%m-%dT%H:%M:%S")
        elif file_name in ["CENY_TOWAROW.csv"]:
            df["TS"] = df["TS"].apply(lambda x: int(x, 16) if isinstance(x, str) and x.startswith("0x") else x)
            df.sort_values("TS", ascending=False, inplace=True)
            df = df.drop_duplicates(subset=["ID_TOWARU"], keep="first")
            df = df.dropna(subset=["ID_TOWARU"])
            df["FLAGA"] = df["FLAGA"].fillna("brak")

            df["CENA"] = df["CENA"].fillna(-1)
            df["CENA_ZAKUPU"] = df["CENA_ZAKUPU"].fillna(-1)
            df["MARZA"] = df["MARZA"].fillna(-1)
            df["UPUST"] = df["UPUST"].fillna(-1)

        elif file_name == "KLASY_TOWAROW.csv":
            df["TS"] = df["TS"].apply(lambda x: int(x, 16) if isinstance(x, str) and x.startswith("0x") else x)
            df.sort_values("TS", ascending=False, inplace=True)
            df = df.drop_duplicates(subset=["ID_KLASY"], keep="first")

            # Drop rows where NAZWA_KLASY is null or empty
            df = df[df["NAZWA_KLASY"].notnull() & (df["NAZWA_KLASY"] != "")]

            # Fix NAZWA_KLASY field
            df["NAZWA_KLASY"] = df["NAZWA_KLASY"].apply(lambda x: x if pd.notna(x) and x != "" else None)

            # Drop rows where index keys used in GSIs are missing
            df["ID_TOWARU"] = df["ID_TOWARU"].fillna(-1)
            df["ID_KLASY_NADRZ"] = df["ID_KLASY_NADRZ"].fillna(-1)

            # Cast to string as DynamoDB expects "S" (string)
            df["CZY_AKTYWNY"] = df["CZY_AKTYWNY"].astype(str)
        if file_name in ["MAGAZYNY.csv"]:
            #df["DATA_MODYFIKACJI"] = pd.to_datetime(df["DATA_MODYFIKACJI"])
            #df.sort_values("DATA_MODYFIKACJI", ascending=False, inplace=True)
            df = df.drop_duplicates(subset=["ID_MAGAZYNU"], keep="first")
            # Convert datetime back to ISO format strings (important for sorting in DynamoDB)
            #df["DATA_MODYFIKACJI"] = df["DATA_MODYFIKACJI"].dt.strftime("%Y-%m-%dT%H:%M:%S")

        # Upload to DynamoDB
        table = dynamodb.Table(table_name)
        with table.batch_writer() as batch:
            for row in df.to_dict(orient="records"):
                item = {}
                for k, v in row.items():
                    if pd.isna(v):
                        item[k] = None
                    elif isinstance(v, float):
                        item[k] = Decimal(str(v))
                    else:
                        item[k] = v
                batch.put_item(Item=item)

        print(f"✅ Uploaded {len(df)} deduplicated rows to table '{table_name}'")

    except Exception as e:
        print(f"❌ Error processing {file_name}: {e}, row: {row}")



🔄 Processing s3://osiolek-data-tmp-bucket/MAGAZYNY.csv → DynamoDB: magazyny (hash key: ID_MAGAZYNU)
✅ Uploaded 2 deduplicated rows to table 'magazyny'


In [2]:
df

NameError: name 'df' is not defined

In [6]:
df.dtypes

ID_GRUPY_CEN         int64
ID_TOWARU            int64
CENA               float64
FLAGA               object
CENA_MIN           float64
CENA_MAX           float64
CENA_ZAKUPU        float64
MARZA              float64
ZAOKRAGLENIA         int64
UPUST              float64
CZY_PROCENTOWE        bool
CZY_FORMULY_CEN       bool
TS                   int64
dtype: object