In [1]:
import os
import psycopg2
import tabulate
from dotenv import load_dotenv
import pandas as pd
import chardet
from psycopg2.extras import execute_values

load_dotenv()

dis_db_password = os.getenv("REMOTE_POSTGRES_DIS_PASSWORD")

conn_params = {
    'host': 'vsisdb.informatik.uni-hamburg.de',
    'dbname': 'dis-2025',
    'user': 'vsisp42',
    'password': dis_db_password
}


In [None]:
def show_existing_tables():
    conn = psycopg2.connect(**conn_params)

    with conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT table_name
                FROM information_schema.tables
                WHERE table_schema = %s
                ORDER BY table_name;
                """,
                (conn_params["user"],),
            )
            tables = cur.fetchall()

            for table in tables:
                print("-", table[0])
show_existing_tables()

In [2]:
with open('resources/sales.csv', 'rb') as f:
    result = chardet.detect(f.read(10000))

print(result)

{'encoding': 'ISO-8859-1', 'confidence': 0.73, 'language': ''}


In [3]:
bad_lines = []
with open('resources/sales.csv', encoding='ISO-8859-1') as f:
    for i, line in enumerate(f, start=1):
        if line.count(';') != 4:  # 4 Semikolons = 5 Spalten
            bad_lines.append((i, line.strip()))

for i, l in bad_lines:
    print(f"Fehler in Zeile {i}: {l}")

Fehler in Zeile 35906: 06.04.2019;12.03.2019;Superstore Dresden;AEG Öko Lavatherm 59850 Sensidry;3;2997,00


In [4]:
def load_csv_data():
    return pd.read_csv(
        'resources/sales.csv',
        encoding='ISO-8859-1',
        sep=';',
        decimal=',',
        on_bad_lines='skip'
    )

df = load_csv_data()
print(df.head())


         Date               Shop                           Article  Sold  \
0  01.01.2019  Superstore Berlin  AEG Öko Lavatherm 59850 Sensidry    25   
1  01.01.2019  Superstore Berlin     AEG Öko-Lavamat Öko Plus 1400    25   
2  01.01.2019  Superstore Berlin              Bauknecht TK Care 6B    13   
3  01.01.2019  Superstore Berlin      Bauknecht WA Sensitive 36 DI     2   
4  01.01.2019  Superstore Berlin                       BenQ DE350P    31   

    Revenue  
0  24975.00  
1  14975.00  
2   3639.74  
3    699.80  
4   8369.69  


In [7]:
def create_star_schema_tables(conn):
    with conn.cursor() as cur:
        cur.execute("""
            CREATE TABLE IF NOT EXISTS dim_product (
                product_id SERIAL PRIMARY KEY,
                article_id INT UNIQUE,
                article_name VARCHAR(255) UNIQUE,
                product_group_id INT,
                product_group_name VARCHAR(255),
                product_family_id INT,
                product_family_name VARCHAR(255),
                product_category_id INT,
                product_category_name VARCHAR(255)
            );
        """)

        cur.execute("""
            CREATE TABLE IF NOT EXISTS dim_geo (
                geo_id SERIAL PRIMARY KEY,
                shop_id INT UNIQUE,
                shop_name VARCHAR(255) UNIQUE,
                city_id INT,
                city_name VARCHAR(255),
                region_id INT,
                region_name VARCHAR(255),
                country_id INT,
                country_name VARCHAR(255)
            );
        """)

        cur.execute("""
            CREATE TABLE IF NOT EXISTS dim_date (
                date DATE PRIMARY KEY,
                day INT,
                month INT,
                quarter INT,
                year INT
            );
        """)

        cur.execute("""
            CREATE TABLE IF NOT EXISTS fact_sales (
                sales_id SERIAL PRIMARY KEY,
                date Date,
                geo_id INT,
                product_id INT,
                quantity INT,
                revenue NUMERIC,

                CONSTRAINT fk_date FOREIGN KEY (date) REFERENCES dim_date(date),
                CONSTRAINT fk_geo FOREIGN KEY (geo_id) REFERENCES dim_geo(geo_id),
                CONSTRAINT fk_product FOREIGN KEY (product_id) REFERENCES dim_product(product_id)
            );
        """)

    conn.commit()
    print("Star-Schema-Tabellen erfolgreich erstellt.")

with psycopg2.connect(**conn_params) as conn:
    create_star_schema_tables(conn)

Star-Schema-Tabellen erfolgreich erstellt.


In [10]:
def populate_dim_product(conn):
    with conn.cursor() as cur:
         cur.execute("""
            INSERT INTO dim_product (
                article_id,
                article_name,
                product_group_id,
                product_group_name,
                product_family_id,
                product_family_name,
                product_category_id,
                product_category_name
            )
            SELECT
                a.ArticleID,
                a.Name,
                pg.ProductGroupID,
                pg.Name,
                pf.ProductFamilyID,
                pf.Name,
                pc.ProductCategoryID,
                pc.Name
            FROM Article a
                INNER JOIN ProductGroup pg ON pg.productgroupid = a.productgroupid
                INNER JOIN vsisp42.productfamily pf on pf.productfamilyid = pg.productfamilyid
                INNER JOIN vsisp42.productcategory pc on pc.productcategoryid = pf.productcategoryid

        """)
    conn.commit()
    print("dim_product erfolgreich befüllt.")

with psycopg2.connect(**conn_params) as conn:
    populate_dim_product(conn)

dim_product erfolgreich befüllt.


In [11]:
def populate_dim_geo(conn):
    with conn.cursor() as cur:
        cur.execute("""
            INSERT INTO dim_geo (
                shop_id,
                shop_name,
                city_id,
                city_name,
                region_id,
                region_name,
                country_id,
                country_name
            )
            SELECT
                s.ShopID,
                s.Name,
                c.CityID,
                c.Name,
                r.RegionID,
                r.Name,
                co.CountryID,
                co.Name
            FROM Shop s
                INNER JOIN City c ON s.CityID = c.CityID
                INNER JOIN Region r ON c.RegionID = r.RegionID
                INNER JOIN Country co ON r.CountryID = co.CountryID
        """)
    conn.commit()
    print("dim_geo erfolgreich befüllt.")

with psycopg2.connect(**conn_params) as conn:
    populate_dim_geo(conn)

dim_geo erfolgreich befüllt.


In [12]:
def transform_data(df):
    df['Date'] = pd.to_datetime(df['Date'], format='%d.%m.%Y', errors='coerce')
    num_invalid_dates = df['Date'].isna().sum()
    print(f'Entries: {len(df)}\t number of invaled dates: {num_invalid_dates}')
    print(df.head())
    # df_sales = df_sales.dropna(subset=['Date'])

df = load_csv_data()
transform_data(df)

Entries: 77311	 number of invaled dates: 0
        Date               Shop                           Article  Sold  \
0 2019-01-01  Superstore Berlin  AEG Öko Lavatherm 59850 Sensidry    25   
1 2019-01-01  Superstore Berlin     AEG Öko-Lavamat Öko Plus 1400    25   
2 2019-01-01  Superstore Berlin              Bauknecht TK Care 6B    13   
3 2019-01-01  Superstore Berlin      Bauknecht WA Sensitive 36 DI     2   
4 2019-01-01  Superstore Berlin                       BenQ DE350P    31   

    Revenue  
0  24975.00  
1  14975.00  
2   3639.74  
3    699.80  
4   8369.69  


In [13]:
def populate_dim_date(conn, df_sales):

    min_date = df_sales['Date'].min()
    max_date = df_sales['Date'].max()

    print(f"Erzeuge Zeitdimension von {min_date.date()} bis {max_date.date()}")

    date_range = pd.date_range(start=min_date, end=max_date)

    df_time = pd.DataFrame({'date': date_range})
    df_time['day'] = df_time['date'].dt.day
    df_time['month'] = df_time['date'].dt.month
    df_time['quarter'] = df_time['date'].dt.quarter
    df_time['year'] = df_time['date'].dt.year

    with conn.cursor() as cur:
        for _, row in df_time.iterrows():
            cur.execute("""
                INSERT INTO dim_date (date, day, month, quarter, year)
                VALUES (%s, %s, %s, %s, %s)
            """, (row['date'], row['day'], row['month'], row['quarter'], row['year']))

    conn.commit()
    print(f"DimTime befüllt mit {len(df_time)} Tagen.")

with psycopg2.connect(**conn_params) as conn:
    populate_dim_date(conn, df)

Erzeuge Zeitdimension von 2019-01-01 bis 2019-05-31
DimTime befüllt mit 151 Tagen.


In [17]:
def insert_fact_sales(conn, df_sales, batch_size=10000):
    with conn.cursor() as cur:
        # Lookup-Tabellen laden
        cur.execute("SELECT date, date FROM dim_date")
        date_lookup = dict(cur.fetchall())

        cur.execute("SELECT shop_name, geo_id FROM dim_geo")
        geo_lookup = dict(cur.fetchall())

        cur.execute("SELECT article_name, product_id FROM dim_product")
        product_lookup = dict(cur.fetchall())

        # Daten vorbereiten
        df_sales['Date'] = pd.to_datetime(df_sales['Date'], dayfirst=True).dt.date
        df_sales['geo_id'] = df_sales['Shop'].map(geo_lookup)
        df_sales['product_id'] = df_sales['Article'].map(product_lookup)
        df_sales['date_key'] = df_sales['Date'].map(date_lookup)

        # Ungültige Zeilen filtern
        valid_mask = df_sales[['date_key', 'geo_id', 'product_id']].notnull().all(axis=1)
        invalid_rows = df_sales[~valid_mask]
        for row in invalid_rows.itertuples():
            print(f'skipped row: {row}')

        df_valid = df_sales[valid_mask]

        # Datensätze in Batches einfügen
        rows = list(
            zip(
                df_valid['date_key'],
                df_valid['geo_id'],
                df_valid['product_id'],
                df_valid['Sold'],
                df_valid['Revenue']
            )
        )

        for i in range(0, len(rows), batch_size):
            batch = rows[i:i + batch_size]
            execute_values(
                cur,
                """
                INSERT INTO fact_sales (date, geo_id, product_id, quantity, revenue)
                VALUES %s
                """,
                batch
            )
            print(f'Inserted rows {i} - {i + batch_size}.')

    conn.commit()
    print(f"{len(rows)} Datensätze erfolgreich in fact_sales eingefügt.")

with psycopg2.connect(**conn_params) as conn:
    insert_fact_sales(conn, df)


skipped row: Pandas(Index=35899, Date=datetime.date(2019, 3, 12), Shop='furt', Article='Kodak Zx3 Playsport', Sold=31, Revenue=4029.69, geo_id=nan, product_id=7, date_key=datetime.date(2019, 3, 12))
Inserted rows 0 - 10000.
Inserted rows 10000 - 20000.
Inserted rows 20000 - 30000.
Inserted rows 30000 - 40000.
Inserted rows 40000 - 50000.
Inserted rows 50000 - 60000.
Inserted rows 60000 - 70000.
Inserted rows 70000 - 80000.
77310 Datensätze erfolgreich in fact_sales eingefügt.


# Analysis

In [19]:
def analyse_data(conn):
    with conn.cursor() as cur:
        query = """
        SELECT
            g.region_name AS region,
            d.quarter,
            d.year,
            p.article_name AS product,
            SUM(f.quantity) AS sales
        FROM fact_sales f
            INNER JOIN dim_geo g ON g.geo_id = f.geo_id
            INNER JOIN dim_product p ON p.product_id = f.product_id
            INNER JOIN dim_date d ON d.date = f.date
        GROUP BY g.region_name, d.quarter, d.year, p.article_name
        ORDER BY g.region_name, d.year, d.quarter, p.article_name;
        """

        cur.execute(query)
        rows = cur.fetchall()
        columns = [desc[0] for desc in cur.description]
        df = pd.DataFrame(rows, columns=columns)

    df['sales_time'] = 'quarter ' + df['quarter'].astype(str) + ', ' + df['year'].astype(str)

    pivot = df.pivot_table(
        index=['region', 'sales_time'],
        columns='product',
        values='sales',
        aggfunc='sum',
        fill_value=0
    ).reset_index()

    # Zwischensumme je Region
    region_totals = pivot.groupby('region').sum(numeric_only=True)
    region_totals['sales_time'] = 'total'
    region_totals = region_totals.reset_index()

    # Gesamttotal
    grand_total = pivot.drop(columns=['region', 'sales_time']).sum(numeric_only=True).to_frame().T
    grand_total['region'] = 'total'
    grand_total['sales_time'] = 'total'

    # Zusammenführen
    result = pd.concat([pivot, region_totals, grand_total], ignore_index=True)

    # Sortierung vorbereiten
    def extract_quarter_year(s):
        if s == 'total':
            return 999, 9999
        quarter = int(s.split('quarter ')[1].split(',')[0].strip())
        year = int(s.split(', ')[1])
        return quarter, year

    result[['time_quarter', 'time_year']] = result['sales_time'].apply(
        lambda x: pd.Series(extract_quarter_year(x))
    )

    result['region_order'] = result['region'].apply(lambda x: 'zzz' if x == 'total' else x)

    result = result.sort_values(by=['region_order', 'region', 'time_year', 'time_quarter']).drop(
        columns=['region_order', 'time_quarter', 'time_year']
    )


    # 'total'-Spalte: Summe aller Produkte pro Zeile
    produktspalten = result.columns.difference(['region', 'sales_time'])
    result['total'] = result[produktspalten].sum(axis=1)

    print(tabulate.tabulate(result, headers='keys', tablefmt='grid'))


with psycopg2.connect(**conn_params) as conn:
    analyse_data(conn)


+----+------------------------+-----------------+------------------------------------+---------------------------------+------------------------+--------------------------------+---------------+-----------------------------------+-------------------+-------------------+------------------+-------------------+-----------------------+----------------------------------------+-------------------------+-------------------+-----------------+-----------------------+---------------+-----------------------------+-------------------------------+-----------------+-----------------+-----------------------+---------------------+---------------------+------------------+------------------+----------------------+------------------------+-----------------------+----------------------+-----------------------------+------------------+---------+
|    | region                 | sales_time      |   AEG Öko Lavatherm 59850 Sensidry |   AEG Öko-Lavamat Öko Plus 1400 |   Bauknecht TK Care 6B |   Bauknecht WA Se

In [20]:
def analyse_data_cube(conn, geo, time, product):
    # Mapping der Granularitätsstufen zu Spaltennamen
    geo_levels = {
        'country': 'country_name',
        'region': 'region_name',
        'city': 'city_name',
        'shop': 'shop_name'
    }

    time_levels = {
        'year': 'year',
        'quarter': 'quarter',
        'month': 'month',
        'day': 'day',
        'date': 'date'
    }

    product_levels = {
        'productCategory': 'product_category_name',
        'productFamily': 'product_family_name',
        'productGroup': 'product_group_name',
        'article': 'article_name'
    }

    geo_col = geo_levels[geo]
    time_col = time_levels[time]
    prod_col = product_levels[product]

    # SELECT + ROLLUP über die gewählten Dimensionen
    query = f"""
        SELECT
            g.{geo_col} AS geo,
            d.{time_col} AS time,
            p.{prod_col} AS product,
            SUM(f.quantity) AS quantity,
            SUM(f.revenue) AS revenue,
            GROUPING(g.{geo_col}) AS grp_geo,
            GROUPING(d.{time_col}) AS grp_time,
            GROUPING(p.{prod_col}) AS grp_product
        FROM fact_sales f
        JOIN dim_geo g ON f.geo_id = g.geo_id
        JOIN dim_date d ON f.date = d.date
        JOIN dim_product p ON f.product_id = p.product_id
        GROUP BY ROLLUP(g.{geo_col}, d.{time_col}, p.{prod_col})
        ORDER BY
            GROUPING(g.{geo_col}),
            g.{geo_col},
            GROUPING(d.{time_col}),
            d.{time_col},
            GROUPING(p.{prod_col}),
            p.{prod_col};
    """

    with conn.cursor() as cur:
        cur.execute(query)
        rows = cur.fetchall()
        columns = [desc[0] for desc in cur.description]
        df = pd.DataFrame(rows, columns=columns)

    # NULLs durch "ALL"-Label ersetzen
    df['geo'] = df.apply(lambda row: 'ALL' if row['grp_geo'] == 1 else row['geo'], axis=1)
    df['time'] = df.apply(lambda row: 'ALL' if row['grp_time'] == 1 else row['time'], axis=1)
    df['product'] = df.apply(lambda row: 'ALL' if row['grp_product'] == 1 else row['product'], axis=1)

    df = df.drop(columns=['grp_geo', 'grp_time', 'grp_product'])

    print(tabulate.tabulate(df, headers='keys', tablefmt='grid'))

with psycopg2.connect(**conn_params) as conn:
    analyse_data_cube(conn, geo='region', time='quarter', product='productFamily')


+-----+------------------------+--------+-----------------+------------+-------------+
|     | geo                    | time   | product         |   quantity |     revenue |
|   0 | Baden-Württemberg      | 1.0    | Audio           |      17816 | 4.11206e+06 |
+-----+------------------------+--------+-----------------+------------+-------------+
|   1 | Baden-Württemberg      | 1.0    | Gartengeräte    |      16834 | 3.35121e+06 |
+-----+------------------------+--------+-----------------+------------+-------------+
|   2 | Baden-Württemberg      | 1.0    | Haushaltsgeräte |      18236 | 1.07508e+07 |
+-----+------------------------+--------+-----------------+------------+-------------+
|   3 | Baden-Württemberg      | 1.0    | Video           |      17365 | 5.71076e+06 |
+-----+------------------------+--------+-----------------+------------+-------------+
|   4 | Baden-Württemberg      | 1.0    | ALL             |      70251 | 2.39248e+07 |
+-----+------------------------+--------+--