In [20]:
%load_ext autoreload
%autoreload 2

from pyspark.sql import SparkSession
from src.config import Dirs


spark = SparkSession.builder.appName("Spark DataFrames").getOrCreate()
application_id = spark.sparkContext.applicationId
print(f"Application ID: {application_id}")

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
Application ID: local-1756583398789


In [23]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

customers_schema = StructType([
    StructField('customer_id', IntegerType(), True),
    StructField('first_name', StringType(), True),
    StructField('last_name', StringType(), True),
    StructField('country', StringType(), True)
    ])

orders_schema = StructType([
    StructField('order_id', IntegerType(), True),
    StructField('customer_id', IntegerType(), True),
    StructField('order_date', DateType(), True),
    StructField('amount', IntegerType(), True)
    ])

In [None]:
customers_path = str(Dirs.data_path / "customers.csv")
orders_path = str(Dirs.data_path / "orders.csv")

df_customers = spark.read.csv(customers_path, header=True, schema=customers_schema)
df_orders = spark.read.csv(orders_path, header=True, schema=orders_schema)

In [26]:
df_customers.show(), df_orders.show()

+-----------+----------+---------+-------+
|customer_id|first_name|last_name|country|
+-----------+----------+---------+-------+
|        101|     Alice|    Smith|     DE|
|        102|       Bob|   Müller|     CH|
|        103|     Clara|    Rossi|     IT|
|        104|    Daniel|  Johnson|     US|
|        105|     Elena|    Novak|     CZ|
|        106|     Felix|     Wong|     CN|
|        107|  Gabriela|    Silva|     BR|
|        108|      Hans|    Meier|     AT|
|        109|      Ines|    López|     ES|
|        110|     Jakub| Kowalski|     PL|
|        111|     Karen|   O'Neil|     IE|
|        112|     Lukas|    Bauer|     DE|
|        113|      NULL|    Bauer|     DE|
|        114|      John|     NULL|     DE|
|        115|        23| Kowalski|     PL|
|       NULL|    Daniel|  Johnson|     US|
+-----------+----------+---------+-------+

+--------+-----------+----------+------+
|order_id|customer_id|order_date|amount|
+--------+-----------+----------+------+
|       1|      

(None, None)

In [None]:
from pyspark.sql.functions import count

large_orders = df_orders.groupBy("customer_id") \
                        .agg(count("customer_id").alias("order_count")) \
                        .filter("order_count > 2")

In [None]:
final_df = large_orders.join(df_customers, on="customer_id") \
                        .select(df_customers.first_name, df_customers.last_name, large_orders.order_count)
final_df.show()

+----------+---------+-----------+
|first_name|last_name|order_count|
+----------+---------+-----------+
|     Alice|    Smith|          3|
+----------+---------+-----------+

