In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.ml import Pipeline
import pyspark
from pyspark.sql.functions import *

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('appName').setMaster('local[*]').set('spark.driver.memory', '12g')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
spark

In [6]:
customer = spark.read.parquet("../data/raw/customer.parquet")
catalog_sales = spark.read.parquet("../data/raw/catalog_sales.parquet")
web_sales = spark.read.parquet("../data/raw/web_sales.parquet")
store_sales = spark.read.parquet("../data/raw/store_sales.parquet")
catalog_returns = spark.read.parquet("../data/raw/catalog_returns.parquet")
web_returns = spark.read.parquet("../data/raw/web_returns.parquet")
store_returns = spark.read.parquet("../data/raw/store_returns.parquet")
household_demographics = spark.read.parquet("../data/raw/household_demographics.parquet")
customer_demographics = spark.read.parquet("../data/raw/customer_demographics.parquet")
customer_address = spark.read.parquet("../data/raw/customer_address.parquet")
date_dim = spark.read.parquet("../data/raw/date_dim.parquet")

# Build the CTE equivalent using DataFrame API
result_df = customer.alias("c") \
    .join(
        catalog_sales.alias("cs"),
        (col("c.c_customer_sk") == col("cs.cs_ship_customer_sk")) &
        (col("c.c_customer_sk") == col("cs.cs_bill_customer_sk")),
        "left"
    ) \
    .join(
        web_sales.alias("ws"),
        (col("c.c_customer_sk") == col("ws.ws_ship_customer_sk")) &
        (col("c.c_customer_sk") == col("ws.ws_bill_customer_sk")),
        "left"
    ) \
    .join(
        store_sales.alias("ss"),
        col("c.c_customer_sk") == col("ss.ss_customer_sk"),
        "inner"
    ) \
    .join(
        catalog_returns.alias("cr"),
        (col("c.c_customer_sk") == col("cr.cr_returning_customer_sk")) &
        (col("c.c_customer_sk") == col("cr.cr_refunded_customer_sk")),
        "left"
    ) \
    .join(
        web_returns.alias("wr"),
        (col("c.c_customer_sk") == col("wr.wr_returning_customer_sk")) &
        (col("c.c_customer_sk") == col("wr.wr_refunded_customer_sk")),
        "left"
    ) \
    .join(
        store_returns.alias("sr"),
        col("c.c_customer_sk") == col("sr.sr_customer_sk"),
        "left"
    ) \
    .join(
        household_demographics.alias("hd"),
        col("c.c_current_hdemo_sk") == col("hd.hd_demo_sk"),
        "inner"
    ) \
    .join(
        customer_demographics.alias("cd"),
        col("c.c_current_cdemo_sk") == col("cd.cd_demo_sk"),
        "inner"
    ) \
    .join(
        customer_address.alias("ca"),
        col("c.c_current_addr_sk") == col("ca.ca_address_sk"),
        "inner"
    ) \
    .join(
        date_dim.alias("dd"),
        col("c.c_first_sales_date_sk") == col("dd.d_date_sk"),
        "inner"
    )

# Show results
result_df.show(20)

+-------------+----------------+------------------+------------------+-----------------+----------------------+---------------------+------------+------------+-----------+---------------------+-----------+-------------+------------+---------------+-------+--------------------+------------------+---------------+---------------+---------------+-------------------+----------------+----------------+---------------+-------------------+----------------+----------------+---------------+-----------------+------------------+---------------+---------------+----------+-----------+---------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------------+--------------------+---------------+---------------+---------------+----------+-----------------

In [7]:
result_df.count()

239280690

In [8]:
spark.catalog.clearCache()
result_df.unpersist()
for df in [customer, catalog_sales, web_sales, store_sales, catalog_returns, web_returns, store_returns, household_demographics, customer_demographics, customer_address, date_dim]:
    df.unpersist()

In [9]:
spark.stop()
sc.stop()