In [None]:
import os
from pathlib import Path
from faker import Faker
import pyspark.sql.functions as F # Spark-SQL col, sum, round, window usw. 
from pyspark.sql import SparkSession # Einstiegsunkt für DataFrames usw. 
from pyspark.sql.window import Window


In [None]:
SPARK_APP_NAME = os.getenv('SPARK_APP_NAME', 'spark-comprehensive-tutorial')
BASE_DIR = Path('/workspace/notebooks')
OUTPUT_DIR = BASE_DIR / 'tutorial_output' / 'spark_only'
RAW_DIR = OUTPUT_DIR / 'raw'
CURATED_DIR = OUTPUT_DIR / 'curated'

spark = (
    SparkSession.builder
    .appName(SPARK_APP_NAME)
    .master(os.getenv('SPARK_MASTER', 'local[*]'))
    .config('spark.sql.shuffle.partitions', '8')
    .config('spark.sql.session.timeZone', 'UTC')
    .getOrCreate()
)

spark.sparkContext.setLogLevel('WARN')
print('Spark Version:', spark.version)
print('Output Dir:', OUTPUT_DIR)


## 1) Rohdaten erzeugen (Customers, Products, Orders, Order Items)

In [None]:
fake = Faker('en_US')
Faker.seed(42)

countries = ['DE', 'US', 'IN', 'JP', 'SE']
segments = ['SMB', 'Enterprise', 'Consumer']
categories = ['Hardware', 'Software', 'Accessories']

customers = [
    {'customer_id': i, 'country': countries[i % len(countries)], 'segment': segments[i % len(segments)]}
    for i in range(1, 201)
]

products = [
    {
        'product_id': i,
        'product_name': f'product_{i:03d}',
        'category': categories[i % len(categories)],
        'unit_price': float((i % 30 + 1) * 3.5),
    }
    for i in range(1, 101)
]

orders = []
for i in range(1, 1201):
    day = (i % 28) + 1
    orders.append({
        'order_id': i,
        'customer_id': (i % 200) + 1,
        'status': 'PAID' if i % 5 != 0 else 'CANCELLED',
        'order_ts': f'2026-01-{day:02d} {(i % 24):02d}:{(i % 60):02d}:00',
    })

items = []
for order_id in range(1, 1201):
    line_count = (order_id % 4) + 1
    for line in range(1, line_count + 1):
        product_id = ((order_id * line) % 100) + 1
        quantity = (line % 5) + 1
        items.append({
            'order_id': order_id,
            'line_id': line,
            'product_id': product_id,
            'quantity': quantity,
        })

customers_df = spark.createDataFrame(customers)
products_df = spark.createDataFrame(products)
orders_df = spark.createDataFrame(orders).withColumn('order_ts', F.to_timestamp('order_ts'))
items_df = spark.createDataFrame(items)

customers_df.show(5, truncate=False)
orders_df.show(5, truncate=False)
items_df.show(5, truncate=False)
products_df.show(5, truncate=False)


In [None]:
order_lines_df = (
    items_df
    .join(products_df, on="product_id", how="inner")
    .withColumn("line_value", F.col("quantity")*F.col("unit_price"))
)

order_lines_df.show(10, truncate=False)

In [None]:
order_summary_df = (
    orders_df
    .join(customers_df, on="customer_id", how = "inner")
    .join(order_lines_df, on="order_id", how = "inner")
    .withColumn("order_date", F.to_date("order_ts"))
    .groupBy("order_id", "order_ts", "order_date", "status", "customer_id", "country", "segment")
    .agg(
        F.count("*").alias("line_count"),
        F.sum("quantity").alias("item_count"),
        F.round(F.sum("line_value"),2).alias("order_value"),
    )
)
order_summary_df.show(10, truncate=False)
order_summary_df.printSchema()

In [None]:
RAW_DIR.mkdir(parents=True, exist_ok=True)
CURATED_DIR.mkdir(parents=True, exist_ok=True)


orders_df.write.mode("overwrite").parquet(str(RAW_DIR / 'orders_parquet'))
items_df.write.mode('overwrite').parquet(str(RAW_DIR / 'items_parquet'))
products_df.write.mode('overwrite').json(str(RAW_DIR / 'products_json')) # demonstration
customers_df.write.mode('overwrite').parquet(str(RAW_DIR / 'customers_parquet'))

(
    order_summary_df
    .filter(F.col("status") == "PAID")
    .write
    .mode("overwrite")
    .partitionBy("order_date", "country")
    .parquet(str(CURATED_DIR / "paid_orders_partitioned"))
)

In [None]:
orders_df.coalesce(1).rdd.getNumPartitions()

In [None]:
data = [
    (1,'one'),
    (2,'two'),
    (3,'three'),
    (4,'four'),
    (5,'five'),
    (6,'six'),
    (7, 'seven'),
    (8, 'eight'),
    (9, 'nine'),
]

df = spark.createDataFrame(data, ['id', 'number'])

df.show()

In [None]:
mix = df.repartition(8)
mix.rdd.glom().collect() # Inhalt jeder Partition ansehen

In [None]:
mix.repartition(3).rdd.glom().collect()

In [None]:
mix.coalesce(3).rdd.glom().collect() # möglichst wenig Shuffles

In [17]:
# so bitte nicht

d = spark.read.csv("data/customs_data.csv", header = True, sep = ";")
#d.collect()

In [18]:
d.show(truncate=False)

+-------+-------+----------+------+------+--------+------+--------+-------------+-----------+-----------------------------+
|month  |country|code      |value |netto |quantity|region|district|direction_eng|measure_eng|load_date                    |
+-------+-------+----------+------+------+--------+------+--------+-------------+-----------+-----------------------------+
|01/2016|IT     |6204695000|131   |1     |7       |46000 |01      |IM           |ShT        |2024-07-01T00:00:00.000+03:00|
|01/2016|CN     |9001900009|112750|18    |0       |46000 |01      |IM           |1          |2024-01-01T00:00:00.000+03:00|
|01/2016|BY     |8414302004|392   |57    |8       |50000 |06      |IM           |ShT        |2024-06-01T00:00:00.000+03:00|
|01/2016|US     |9018509000|54349 |179   |0       |40000 |02      |IM           |1          |2024-04-01T00:00:00.000+03:00|
|01/2016|EE     |9021101000|17304 |372   |0       |46000 |01      |IM           |1          |2024-02-01T00:00:00.000+03:00|
|01/2016

In [19]:
d.show(2, False, True)

-RECORD 0--------------------------------------
 month         | 01/2016                       
 country       | IT                            
 code          | 6204695000                    
 value         | 131                           
 netto         | 1                             
 quantity      | 7                             
 region        | 46000                         
 district      | 01                            
 direction_eng | IM                            
 measure_eng   | ShT                           
 load_date     | 2024-07-01T00:00:00.000+03:00 
-RECORD 1--------------------------------------
 month         | 01/2016                       
 country       | CN                            
 code          | 9001900009                    
 value         | 112750                        
 netto         | 18                            
 quantity      | 0                             
 region        | 46000                         
 district      | 01                     

In [20]:
d.select("country").distinct().show(truncate=False) # 2*1024 / 128 = 16 



+-------+
|country|
+-------+
|EE     |
|FR     |
|LT     |
|MY     |
|IL     |
|AT     |
|EU     |
|BA     |
|PK     |
|MM     |
|JO     |
|AU     |
|NZ     |
|SA     |
|HT     |
|DZ     |
|BB     |
|AD     |
|LS     |
|SL     |
+-------+
only showing top 20 rows


                                                                                

In [None]:
df_de = (
    d
    .where(F.col('country') == 'DE')
    .where(F.col('value').isNotNull())
) # DataFrame-API

# SQL-String

df_de2 = (
    d
    .where(''' country == "DE" ''')
    .where(''' value IS NOT NULL ''')
)

print(df_de.count() == df_de2.count())
df_de.show(truncate=False)


In [27]:
final = (
    df_de
    .select(
        'month',
        'country',
        'code',
        'value',
        'netto',
        'quantity',
        'region',
        'district',
        'direction_eng',
        'measure_eng',
        F.col('load_date').cast('date'), # Typumwandlung
    )
)

In [28]:
(
    final
    .write
    .format('csv')
    .options(header='True', sep=';')
    .csv('data/final_no_control')
)

partition_num = final.rdd.getNumPartitions()
print(f'Anzahl Partitionen {partition_num}')



Anzahl Partitionen 16


                                                                                

In [29]:
(
    final
    .coalesce(1)
    .write
    .format('csv')
    .options(header='True', sep=';')
    .csv('data/final_one_file')
)

partition_num = final.coalesce(1).rdd.getNumPartitions()
print(f'Anzahl Partitionen {partition_num}')


(
    final
    .write
    .partitionBy('load_date')
    .format('csv')
    .options(header='True', sep=';')
    .csv('data/final_partitioned')
)

print_df = final.select('load_date').distinct()
print(f'Load_date distinct: {print_df.count()}')


(
    final
    .repartition(1, 'load_date')
    .write
    .partitionBy('load_date')
    .format('csv')
    .options(header='True', sep=';')
    .csv('data/final_partitioned_repart')
)

partition_num = final.repartition(1, 'load_date').rdd.getNumPartitions()
print(f'Anzahl Partitionen {partition_num}')

                                                                                

Anzahl Partitionen 1


                                                                                

Load_date distinct: 10




Anzahl Partitionen 1


In [30]:
reader_no_control = (
    spark
    .read
    .csv('data/final_no_control/', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_final_one_file = (
    spark
    .read
    .csv('data/final_one_file/', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_partitioned = (
    spark
    .read
    .csv('data/final_partitioned', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_partitioned_repart = (
    spark
    .read
    .csv('data/final_partitioned_repart', header=True, sep=';')
    .where(''' load_date = "2024-01-01" ''')
)

reader_no_control.count()

                                                                                

350998

In [33]:
df = final.toPandas().head()

                                                                                

In [35]:
df.rdd

AttributeError: 'DataFrame' object has no attribute 'rdd'