# Spark DataFrame

In [1]:
from pyspark.sql.functions import col, count, when, lower, trim, from_json
from pyspark.sql.types import MapType, StringType

StatementMeta(, c643832c-249a-4368-a080-b12c8f8c5d2c, 5, Finished, Available, Finished)

In [None]:
df = spark.sql("SELECT * FROM Lakehouse.dbo.dataframe")

In [None]:
df.describe()

In [None]:
display(df)

##  Check for Missing or Null Values

In [None]:
# Count null values in each column
display(df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]))

## Drop Rows with Invalid Latitude/Longitude

In [None]:
# Drop rows with invalid latitude and longitude
df_filtered = df.filter((df["age"].cast("float").between(18, 70)))
display(df_filtered)


## Check for Duplicates

In [None]:
# Show duplicate rows based on selected key columns
display(df.groupBy("Firstname", "Lastname", "Country").count().filter("count > 1"))

# Optionally drop duplicates
# df_deduped = df.dropDuplicates(["category", "address", "latitude", "longitude"])


## Check for Invalid Status or Categories

In [None]:
# Define valid values for status and categories
eu_country = ["France", "Germany", "Italy", "Bulgaria"]

# Filter rows with invalid status or category
df_non_eu_country = df.filter(~df["Country"].isin(eu_country))

# Show invalid rows
display(df_non_eu_country)


## Normalize Addresses

In [None]:
# Normalize address by trimming and converting to lowercase
df_normalized = df.withColumn("gender", lower(trim(col("gender"))))
df_normalized.show()


1. # Comparing DataFrames

In [None]:
df = spark.sql("SELECT * FROM Lakehouse.dbo.dataframe")

for i in range(5):
    df = df.union(df)

count_string = str(df.count())
df.coalesce(1).write.mode('overwrite').parquet(f"Files/data/dataframe_{count_string}.parquet")

In [None]:
print("Let's start!")

In [None]:
%run 99 - Code

df_name = "dataframe_19200000.parquet"

In [None]:
df_name = "dataframe_19200000.parquet"

## Spark DataFrames

In [None]:
from pyspark.sql.functions import count, max, min
from pyspark.sql import functions as F

@timeit
def spark_dataframe():
    df_spark = spark.read.parquet(f"Files/data/{df_name}")

    df_spark = df_spark.withColumn("Firstname", F.lower(F.trim(F.col("Firstname")))) \
       .withColumn("Lastname", F.lower(F.trim(F.col("Lastname")))) \
       .withColumn("Email", F.lower(F.trim(F.col("Email")))) \
       .withColumn("Country", F.upper(F.trim(F.col("Country")))) \
       .withColumn("Age", F.col("Age").cast("int")) \
       .withColumn("Age_Category", F.when(F.col("Age") > 50, "Senior").otherwise("Adult")) \

    df_grouped = df_spark.groupBy("Country", "Age_Category").agg(
        F.count("*").alias("Count"),
        F.avg("Age").alias("Avg_Age")
        )

    return df_grouped

ret_spark = spark_dataframe()

## Pandas

In [None]:
import pandas as pd
import numpy as np
from functools import wraps
import os
import pyarrow.parquet

@timeit
def pandas_dataframe():
    df_pandas = pd.read_parquet("/lakehouse/default/Files/data/dataframe_19200000.parquet")
    
    df_pandas["Firstname"] = df_pandas["Firstname"].str.strip().str.lower()
    df_pandas["Lastname"] = df_pandas["Lastname"].str.strip().str.lower()
    df_pandas["Email"] = df_pandas["Email"].str.strip().str.lower()
    df_pandas["Country"] = df_pandas["Country"].str.strip().str.upper()

    df_pandas["Age"] = pd.to_numeric(df_pandas["Age"], errors='coerce')

    df_pandas["Age_Category"] = np.where(df_pandas["Age"] > 50, "Senior", "Adult")

    df_grouped = df_pandas.groupby(["Country", "Age_Category"]).agg(
         Count=("Age", "size"),
         Avg_Age=("Age", "mean")
    ).reset_index()

    return df_grouped

pandas_dataframe()

## Pandas-on-Spark (Koala)

In [None]:
import pyspark.pandas as ps
import numpy as np


@timeit
def koala_dataframe():
    # Lade die Parquet-Datei mit pyspark.pandas
    df_koala = ps.read_parquet(f"Files/data/{df_name}")
    
    # Bereinigung der Strings (Entfernen von Leerzeichen und Umwandlung in Kleinbuchstaben)
    df_koala["Firstname"] = df_koala["Firstname"].str.strip().str.lower()
    df_koala["Lastname"] = df_koala["Lastname"].str.strip().str.lower()
    df_koala["Email"] = df_koala["Email"].str.strip().str.lower()
    df_koala["Country"] = df_koala["Country"].str.strip().str.upper()

    # Konvertiere die Spalte "Age" in numerisch (mit Fehlerbehandlung)
    df_koala["Age"] = ps.to_numeric(df_koala["Age"], errors='coerce')

    # Alterskategorie basierend auf dem Alter hinzufügen (mit apply anstelle von np.where)
    df_koala["Age_Category"] = df_koala["Age"].apply(lambda x: "Senior" if x > 50 else "Adult")

    # Gruppierung und Aggregation (zähle die Einträge und berechne das Durchschnittsalter)
    df_grouped = df_koala.groupby(["Country", "Age_Category"]).agg(
        Count=("Age", "count"),
        Avg_Age=("Age", "mean")
    ).reset_index()


koala_dataframe()


## Install additional libraries

When installing additional libraries, there may always be a mismatch in the required versions. For example, installing the following libraries will install a pyarrow version that will prevent some Pandas functions from running.


In [None]:
pip install polars

In [None]:
%run 99 - Code

In [None]:
df_name = "dataframe_19200000.parquet"

## Polars

In [None]:
import polars as pl

@timeit
def polars_dataframe():
    # Lade die Parquet-Datei mit Polars
    df_polars = pl.read_parquet(f"/lakehouse/default/Files/data/{df_name}")
    
    # Bereinigung der Strings (Entfernen von Leerzeichen und Umwandlung in Klein-/Großbuchstaben)
    df_polars = df_polars.with_columns([
        pl.col("Firstname").str.strip_chars().str.to_lowercase().alias("Firstname"),
        pl.col("Lastname").str.strip_chars().str.to_lowercase().alias("Lastname"),
        pl.col("Email").str.strip_chars().str.to_lowercase().alias("Email"),
        pl.col("Country").str.strip_chars().str.to_uppercase().alias("Country")
    ])
    
    # Konvertiere die Spalte "Age" in numerisch (mit Fehlerbehandlung)
    df_polars = df_polars.with_columns(
        pl.col("Age").cast(pl.Float64)
    )
    
    # Alterskategorie basierend auf dem Alter hinzufügen (pl.lit() für konstante Werte)
    df_polars = df_polars.with_columns(
        pl.when(pl.col("Age") > 50)
          .then(pl.lit("Senior"))
          .otherwise(pl.lit("Adult"))
          .alias("Age_Category")
    )

    # Gruppierung und Aggregation (korrekt in Polars)
    df_grouped = df_polars.group_by(["Country", "Age_Category"]).agg([
        pl.count("Age").alias("Count"),
        pl.mean("Age").alias("Avg_Age")
    ])
    
    return df_grouped

polars_dataframe()

## Modin

Modin overwrites Pandas and does currently not work in the same session with Pandas.

In [None]:
pip install modin

In [None]:
%run 99 - Code

In [None]:
df_name = "dataframe_19200000.parquet"

In [None]:
import modin.pandas as pd
import numpy as np

@timeit
def modin_dataframe():
    # Lade die Parquet-Datei mit Modin (wie in Pandas)
    df_modin = pd.read_parquet(f"Files/data/{df_name}")
    
    # Bereinigung der Strings (Entfernen von Leerzeichen und Umwandlung in Klein-/Großbuchstaben)
    df_modin["Firstname"] = df_modin["Firstname"].str.strip().str.lower()
    df_modin["Lastname"] = df_modin["Lastname"].str.strip().str.lower()
    df_modin["Email"] = df_modin["Email"].str.strip().str.lower()
    df_modin["Country"] = df_modin["Country"].str.strip().str.upper()

    # Konvertiere die Spalte "Age" in numerisch (mit Fehlerbehandlung)
    df_modin["Age"] = pd.to_numeric(df_modin["Age"], errors='coerce')

    # Alterskategorie basierend auf dem Alter hinzufügen
    df_modin["Age_Category"] = np.where(df_modin["Age"] > 50, "Senior", "Adult")

    # Gruppierung und Aggregation (wie in Pandas)
    df_grouped = df_modin.groupby(["Country", "Age_Category"]).agg(
        Count=("Age", "size"),
        Avg_Age=("Age", "mean")
    ).reset_index()

    return df_grouped

modin_dataframe()

## Dask

In [1]:
%run 99 - Code

StatementMeta(, 2bbd35c5-9aa6-4962-a021-4e4a9ae2859f, 5, Finished, Available, Finished)

In [2]:
pip install dask

StatementMeta(, 2bbd35c5-9aa6-4962-a021-4e4a9ae2859f, 6, Finished, Available, Finished)

Collecting dask
  Downloading dask-2024.9.0-py3-none-any.whl.metadata (3.7 kB)
Collecting cloudpickle>=3.0.0 (from dask)
  Downloading cloudpickle-3.0.0-py3-none-any.whl.metadata (7.0 kB)
Collecting partd>=1.4.0 (from dask)
  Downloading partd-1.4.2-py3-none-any.whl.metadata (4.6 kB)
Collecting locket (from partd>=1.4.0->dask)
  Downloading locket-1.0.0-py2.py3-none-any.whl.metadata (2.8 kB)
Downloading dask-2024.9.0-py3-none-any.whl (1.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m37.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading cloudpickle-3.0.0-py3-none-any.whl (20 kB)
Downloading partd-1.4.2-py3-none-any.whl (18 kB)
Downloading locket-1.0.0-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: locket, cloudpickle, partd, dask
  Attempting uninstall: cloudpickle
    Found existing installation: cloudpickle 2.2.1
    Uninstalling cloudpickle-2.2.1:
      Successfully uninstalled cloudpickle-2.2.1
[31mERROR: pip's dependency resolv

In [3]:
df_name = "dataframe_19200000.parquet"

StatementMeta(, 2bbd35c5-9aa6-4962-a021-4e4a9ae2859f, 7, Finished, Available, Finished)

In [5]:
import dask.dataframe as dd
import pandas as pd

@timeit
def dask_dataframe():
    # Lade die Parquet-Datei mit Dask
    df_dask = dd.read_parquet(f"/lakehouse/default/Files/data/{df_name}")
    
    # Bereinigung der Strings (Entfernen von Leerzeichen und Umwandlung in Klein-/Großbuchstaben)
    df_dask["Firstname"] = df_dask["Firstname"].str.strip().str.lower()
    df_dask["Lastname"] = df_dask["Lastname"].str.strip().str.lower()
    df_dask["Email"] = df_dask["Email"].str.strip().str.lower()
    df_dask["Country"] = df_dask["Country"].str.strip().str.upper()

    # Konvertiere die Spalte "Age" in numerisch (mit Fehlerbehandlung)
    df_dask["Age"] = dd.to_numeric(df_dask["Age"], errors='coerce')

    # Alterskategorie basierend auf dem Alter hinzufügen
    df_dask["Age_Category"] = df_dask.where(df_dask["Age"] > 50, "Senior", "Adult")

    # Gruppierung und Aggregation (wie in Pandas, aber mit Dask)
    df_grouped = df_dask.groupby(["Country", "Age_Category"]).agg({
        "Age": ["size", "mean"]
    }).reset_index()

    # Da Dask "Lazy Evaluation" verwendet, müssen wir `compute()` aufrufen, um die Berechnungen auszuführen
    df_grouped = df_grouped.compute()

    return df_grouped

dask_dataframe()

StatementMeta(, d6a94965-f170-43c4-8d0d-aa055df6d179, 9, Finished, Available, Finished)

TypeError: _Frame.where() takes from 2 to 3 positional arguments but 4 were given

## DuckDB

In [1]:
pip install duckdb

StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 5, Finished, Available, Finished)

Collecting duckdb
  Downloading duckdb-1.2.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (966 bytes)
Downloading duckdb-1.2.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (20.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m20.2/20.2 MB[0m [31m119.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: duckdb
Successfully installed duckdb-1.2.0
Note: you may need to restart the kernel to use updated packages.


In [2]:
%run 99 - Code

StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 6, Finished, Available, Finished)

In [3]:
df_name = "dataframe_19200000.parquet"

StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 7, Finished, Available, Finished)

In [4]:
import duckdb
import pandas as pd
import numpy as np

@timeit
def duckdb_dataframe():
    # Verbinde mit DuckDB
    con = duckdb.connect()

    # SQL-Abfrage für alle Transformationen und Aggregationen
    query = f"""
    SELECT
        UPPER(TRIM(Country)) AS Country,
        ANY_VALUE(LOWER(TRIM(Firstname))) AS Firstname,
        ANY_VALUE(LOWER(TRIM(Lastname))) AS Lastname,
        ANY_VALUE(LOWER(TRIM(Email))) AS Email,
        CASE 
            WHEN TRY_CAST(Age AS INTEGER) > 50 THEN 'Senior'
            ELSE 'Adult'
        END AS Age_Category,
        COUNT(*) AS Count,
        AVG(TRY_CAST(Age AS INTEGER)) AS Avg_Age
    FROM parquet_scan('/lakehouse/default/Files/data/{df_name}')
    GROUP BY Country, Age_Category
    """

    # Führe die Abfrage aus und hole das Ergebnis als Pandas DataFrame
    df_grouped = con.execute(query).fetchdf()

    return df_grouped

duckdb_dataframe()

StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 8, Finished, Available, Finished)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Execution time: 8.562845706939697 seconds


Unnamed: 0,Country,Firstname,Lastname,Email,Age_Category,Count,Avg_Age
0,UKRAINE,jeffrey,smith,melissaanderson@example.net,Senior,43712,70.627379
1,GRENADA,kimberly,smith,ericsmith@example.net,Senior,42784,70.802543
2,GUATEMALA,joshua,smith,nmartin@example.com,Senior,43328,70.436484
3,BANGLADESH,christian,miller,nmartin@example.com,Senior,40896,70.552426
4,CHAD,dennis,park,kevin51@example.org,Senior,41440,70.883398
...,...,...,...,...,...,...,...
481,PALESTINIAN TERRITORY,jasmine,foster,roselevi@example.org,Adult,35872,34.198037
482,VANUATU,jessica,foster,carolinesingh@example.com,Adult,36288,33.583774
483,TAJIKISTAN,william,foster,mikaylathomas@example.com,Adult,33216,33.858382
484,PITCAIRN ISLANDS,anthony,foster,darlene93@example.org,Adult,35520,33.511712


StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 14, Finished, Available, Finished)

In [5]:
df = spark.sql("SELECT * FROM Lakehouse.dbo.dataframe LIMIT 1000")
display(df)

StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d9febc94-0b34-4f53-9ee4-53b5dc9d17ac)

In [6]:
con = duckdb.connect()

    # SQL-Abfrage für alle Transformationen und Aggregationen
query = f"""
SELECT
    UPPER(TRIM(Country)) AS Country,
    ANY_VALUE(LOWER(TRIM(Firstname))) AS Firstname,
    ANY_VALUE(LOWER(TRIM(Lastname))) AS Lastname,
    ANY_VALUE(LOWER(TRIM(Email))) AS Email,
    CASE 
        WHEN TRY_CAST(Age AS INTEGER) > 50 THEN 'Senior'
        ELSE 'Adult'
    END AS Age_Category,
    COUNT(*) AS Count,
    AVG(TRY_CAST(Age AS INTEGER)) AS Avg_Age
FROM delta_scan('/lakehouse/default/Tables/dbo/dataframe')
GROUP BY Country, Age_Category
"""

df_grouped = con.execute(query).fetchdf()
df_grouped


StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 10, Finished, Available, Finished)

Unnamed: 0,Country,Firstname,Lastname,Email,Age_Category,Count,Avg_Age
0,TRINIDAD AND TOBAGO,eric,smith,marcus62@example.com,Senior,1341,70.243102
1,KOREA,collin,smith,joshua84@example.com,Senior,2764,70.579957
2,BRUNEI DARUSSALAM,cynthia,smith,aprilthomas@example.org,Senior,1364,70.202346
3,PERU,eric,smith,vrice@example.org,Senior,1292,70.345201
4,SRI LANKA,cheryl,smith,kimberly79@example.org,Senior,1383,70.366594
...,...,...,...,...,...,...,...
481,BURKINA FASO,nancy,smith,stephen90@example.com,Adult,1119,33.846291
482,BRUNEI DARUSSALAM,christina,smith,jill01@example.net,Adult,1041,33.896254
483,SAINT VINCENT AND THE GRENADINES,mario,smith,yedwards@example.net,Senior,1263,70.764054
484,SOMALIA,ashley,smith,sduncan@example.net,Adult,1109,34.312894


In [7]:
%%sql

SELECT * 

FROM dataframe

StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 11, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 6 fields>

# DUCKDB Magic Command

In [20]:
from IPython.core.magic import register_cell_magic
from IPython.display import display
import duckdb as ddb

@register_cell_magic
def duckdb(line, cell):
    
    db_path = line.strip()  # Pfad oder leerer String

    try:
        # 1) Entscheiden, ob wir eine in-memory oder eine persistente DB wollen
        if db_path:
            conn = ddb.connect(db_path)
        else:
            conn = ddb.connect()  # Temporäre In-Memory-Verbindung

        # 2) SQL-Code ausführen und als DataFrame zurückgeben
        df_result = conn.execute(cell).fetchdf()
        display(df_result)

        # 3) Verbindung schließen
        conn.close()

    except Exception as e:
        print(f"Fehler bei der Ausführung von DuckDB: {e}")


StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 28, Finished, Available, Finished)

In [21]:
%%duckdb

SELECT *
FROM delta_scan('/lakehouse/default/Tables/dbo/dataframe')

StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 29, Finished, Available, Finished)

Unnamed: 0,Firstname,Lastname,Age,Gender,Country,Email
0,Stephanie,Smith,79,Male,Turkmenistan,brandi87@example.net
1,Tara,Smith,79,Male,Ethiopia,wademichael@example.org
2,Gary,Smith,79,Male,Holy See (Vatican City State),danieldavis@example.com
3,Brian,Smith,79,Male,Israel,jessicalloyd@example.org
4,Francisco,Smith,79,Male,Guinea,kswanson@example.net
...,...,...,...,...,...,...
599995,Michael,Fox,40,Female,Eritrea,josephromero@example.org
599996,Michael,Bolton,40,Female,Spain,qwebb@example.org
599997,Michael,Patton,40,Female,Greece,barrettmichelle@example.net
599998,Michael,Barnett,40,Female,El Salvador,zsolomon@example.com


In [24]:
%%duckdb my_database.duckdb

SELECT * FROM dataframe

StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 31, Finished, Available, Finished)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,Firstname,Lastname,Age,Gender,Country,Email
0,Jesse,Morrow,35,Other,Sweden,wademichael@example.org
1,Jesse,Morrow,35,Other,Sweden,wademichael@example.org
2,Jesse,Morrow,35,Other,Sweden,wademichael@example.org
3,Jesse,Morrow,35,Other,Sweden,wademichael@example.org
4,Jesse,Morrow,35,Other,Sweden,wademichael@example.org
...,...,...,...,...,...,...
19199995,Stephanie,Werner,75,Male,Uganda,ewalls@example.net
19199996,Stephanie,Watson,75,Male,Tokelau,john73@example.com
19199997,Stephanie,Blackwell,75,Male,South Georgia and the South Sandwich Islands,lindsey44@example.org
19199998,Stephanie,Salazar,75,Male,Denmark,jeffrey51@example.net


In [None]:
import duckdb

# Verbindung zu einer lokalen DuckDB-Datei herstellen
# - Falls "my_database.duckdb" nicht existiert, wird sie neu angelegt.
conn = duckdb.connect("my_database.duckdb")


In [16]:
# Lies CSV-Datei ein (on-the-fly)
conn.execute("""
    CREATE TABLE IF NOT EXISTS dataframe AS
    SELECT * 
    FROM parquet_scan('/lakehouse/default/Files/data/dataframe_19200000.parquet')
""")


StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 21, Finished, Available, Finished)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 22, Finished, Available, Finished)

<duckdb.duckdb.DuckDBPyConnection at 0x7ad268474e30>

In [17]:
conn.close()

StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 23, Finished, Available, Finished)

In [19]:
conn = duckdb.connect("my_database.duckdb")
# Jetzt existiert bereits "dataframe":
df = conn.execute("SELECT * FROM dataframe").fetchdf()
display(df)

conn.close()


StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 26, Finished, Available, Finished)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

SynapseWidget(Synapse.DataFrame, 6f95c2e1-8c9c-4f8f-a246-c35a2fdccd1a)

StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 27, Finished, Available, Finished)

StatementMeta(, 367b09ba-9b42-47fc-a1aa-419a7124faf4, 32, Finished, Available, Finished)