# create the PySpark session and initialize to environ

In [205]:
from PySpark.environ_setup import set_up_env
from pyspark.sql import SparkSession

# import the function which sets up the env
set_up_env()

spark = (
    SparkSession.builder
    .appName("PySpark DataFrame ETL Pipeline")
    .master("local[*]")
    .getOrCreate()
)

spark

# load the datasets

In [206]:
df_customers = spark.read.csv('./datasets/customers.csv', header=True, inferSchema=True)
df_transactions = spark.read.csv('./datasets/transactions.csv', header=True, inferSchema=True)

In [207]:
df_customers.show(5)

+-----------+----------+---------+-----------+-------+--------+
|customer_id|first_name|last_name|signup_date|country| segment|
+-----------+----------+---------+-----------+-------+--------+
|        101|     Nikos|   Petrou| 2023-10-01| Greece| Premium|
|        102|     Maria|  Ioannou| 2023-11-15| Greece|Standard|
|        103|      John|    Smith| 2022-05-20|     UK|Standard|
|        104|     Emily|    Brown| 2021-09-30|    USA| Premium|
|        105|    Carlos|   Mendez| 2022-12-10|    USA|Standard|
+-----------+----------+---------+-----------+-------+--------+
only showing top 5 rows


In [208]:
df_transactions.show(5)

+--------------+-----------+---------------------+------+-------------+--------------+----------------+
|transaction_id|customer_id|transaction_timestamp|amount|       region|payment_method|product_category|
+--------------+-----------+---------------------+------+-------------+--------------+----------------+
|             1|        101|  2024-01-05 10:15:00| 120.5|       EUROPE|          CARD|     Electronics|
|             2|        102|  2024-01-06 12:30:00|  75.0|       EUROPE|          CASH|           Books|
|             3|        101|  2024-01-15 16:45:00| 230.0|       EUROPE|          CARD|       Furniture|
|             4|        103|  2024-02-02 09:10:00| 49.99|       EUROPE|          CARD|        Clothing|
|             5|        104|  2024-02-11 18:25:00| 310.0|NORTH_AMERICA|          CARD|     Electronics|
+--------------+-----------+---------------------+------+-------------+--------------+----------------+
only showing top 5 rows


# check the schema

In [209]:
df_transactions.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- transaction_timestamp: timestamp (nullable = true)
 |-- amount: double (nullable = true)
 |-- region: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- product_category: string (nullable = true)



In [210]:
df_customers.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- signup_date: date (nullable = true)
 |-- country: string (nullable = true)
 |-- segment: string (nullable = true)



# correct the schema

In [211]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, TimestampType, DoubleType

# for customers
schema = StructType([
    StructField('customer_id', IntegerType(), False),
    StructField('first_name', StringType(), False),
    StructField('last_name', StringType(), False),
    StructField('signup_date', DateType(), True),
    StructField('country', StringType(), True),
    StructField('segment', StringType(), True)
]
)

# create again the dataset with the correct schema
df_customers = spark.read.csv('./datasets/customers.csv', header=True, schema=schema)

df_customers.show(5)

+-----------+----------+---------+-----------+-------+--------+
|customer_id|first_name|last_name|signup_date|country| segment|
+-----------+----------+---------+-----------+-------+--------+
|        101|     Nikos|   Petrou| 2023-10-01| Greece| Premium|
|        102|     Maria|  Ioannou| 2023-11-15| Greece|Standard|
|        103|      John|    Smith| 2022-05-20|     UK|Standard|
|        104|     Emily|    Brown| 2021-09-30|    USA| Premium|
|        105|    Carlos|   Mendez| 2022-12-10|    USA|Standard|
+-----------+----------+---------+-----------+-------+--------+
only showing top 5 rows


In [212]:
df_customers.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- signup_date: date (nullable = true)
 |-- country: string (nullable = true)
 |-- segment: string (nullable = true)



In [213]:
df_transactions.printSchema()

root
 |-- transaction_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- transaction_timestamp: timestamp (nullable = true)
 |-- amount: double (nullable = true)
 |-- region: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- product_category: string (nullable = true)



In [214]:
df_transactions.show(5)

+--------------+-----------+---------------------+------+-------------+--------------+----------------+
|transaction_id|customer_id|transaction_timestamp|amount|       region|payment_method|product_category|
+--------------+-----------+---------------------+------+-------------+--------------+----------------+
|             1|        101|  2024-01-05 10:15:00| 120.5|       EUROPE|          CARD|     Electronics|
|             2|        102|  2024-01-06 12:30:00|  75.0|       EUROPE|          CASH|           Books|
|             3|        101|  2024-01-15 16:45:00| 230.0|       EUROPE|          CARD|       Furniture|
|             4|        103|  2024-02-02 09:10:00| 49.99|       EUROPE|          CARD|        Clothing|
|             5|        104|  2024-02-11 18:25:00| 310.0|NORTH_AMERICA|          CARD|     Electronics|
+--------------+-----------+---------------------+------+-------------+--------------+----------------+
only showing top 5 rows


In [215]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DoubleType

# for transactions
schema = StructType([
    StructField('transaction_id', IntegerType(), False),
    StructField('customer_id', IntegerType(), True),
    StructField('transaction_timestamp', TimestampType(), True),
    StructField('amount', DoubleType(), True),
    StructField('region', StringType(), True),
    StructField('payment_method', StringType(), True),
    StructField('product_category', StringType(), True),
]
)

# create again the dataset with the correct schema
df_transactions = spark.read.csv('./datasets/transactions.csv', header=True, schema=schema)

df_transactions.show(5)

+--------------+-----------+---------------------+------+-------------+--------------+----------------+
|transaction_id|customer_id|transaction_timestamp|amount|       region|payment_method|product_category|
+--------------+-----------+---------------------+------+-------------+--------------+----------------+
|             1|        101|  2024-01-05 10:15:00| 120.5|       EUROPE|          CARD|     Electronics|
|             2|        102|  2024-01-06 12:30:00|  75.0|       EUROPE|          CASH|           Books|
|             3|        101|  2024-01-15 16:45:00| 230.0|       EUROPE|          CARD|       Furniture|
|             4|        103|  2024-02-02 09:10:00| 49.99|       EUROPE|          CARD|        Clothing|
|             5|        104|  2024-02-11 18:25:00| 310.0|NORTH_AMERICA|          CARD|     Electronics|
+--------------+-----------+---------------------+------+-------------+--------------+----------------+
only showing top 5 rows


# Data Cleaning & Type Casting

* for transactions dataframe

In [216]:
# convert 'transaction_timestamp' to timestamp type

df_transactions.show(5)

+--------------+-----------+---------------------+------+-------------+--------------+----------------+
|transaction_id|customer_id|transaction_timestamp|amount|       region|payment_method|product_category|
+--------------+-----------+---------------------+------+-------------+--------------+----------------+
|             1|        101|  2024-01-05 10:15:00| 120.5|       EUROPE|          CARD|     Electronics|
|             2|        102|  2024-01-06 12:30:00|  75.0|       EUROPE|          CASH|           Books|
|             3|        101|  2024-01-15 16:45:00| 230.0|       EUROPE|          CARD|       Furniture|
|             4|        103|  2024-02-02 09:10:00| 49.99|       EUROPE|          CARD|        Clothing|
|             5|        104|  2024-02-11 18:25:00| 310.0|NORTH_AMERICA|          CARD|     Electronics|
+--------------+-----------+---------------------+------+-------------+--------------+----------------+
only showing top 5 rows


In [217]:
df_transactions.select('transaction_timestamp').printSchema()

root
 |-- transaction_timestamp: timestamp (nullable = true)



In [218]:
from pyspark.sql.functions import col, to_date

# create a new column called date
df_transactions = df_transactions.withColumn(
    'date', to_date(col('transaction_timestamp'))
)

df_transactions.select('date').show()

+----------+
|      date|
+----------+
|2024-01-05|
|2024-01-06|
|2024-01-15|
|2024-02-02|
|2024-02-11|
|2024-02-19|
|2024-03-01|
|2024-03-07|
|2024-03-15|
|2024-03-21|
|2024-03-25|
|2024-03-29|
+----------+



In [219]:
# drop transactions_timestamp column
df_transactions = df_transactions.drop('transaction_timestamp')

# check
print(True) if 'transaction_timestamp' in df_transactions.columns else print(False)

False


In [220]:
df_transactions.show(5)

+--------------+-----------+------+-------------+--------------+----------------+----------+
|transaction_id|customer_id|amount|       region|payment_method|product_category|      date|
+--------------+-----------+------+-------------+--------------+----------------+----------+
|             1|        101| 120.5|       EUROPE|          CARD|     Electronics|2024-01-05|
|             2|        102|  75.0|       EUROPE|          CASH|           Books|2024-01-06|
|             3|        101| 230.0|       EUROPE|          CARD|       Furniture|2024-01-15|
|             4|        103| 49.99|       EUROPE|          CARD|        Clothing|2024-02-02|
|             5|        104| 310.0|NORTH_AMERICA|          CARD|     Electronics|2024-02-11|
+--------------+-----------+------+-------------+--------------+----------------+----------+
only showing top 5 rows


In [221]:
# convert upper values from region and payment_method cols to title format (first only capital)
from pyspark.sql.functions import initcap, col

df_transactions = (
    df_transactions.withColumn('region', initcap(col('region')))
    .withColumn('payment_method', initcap(col('payment_method')))
)

df_transactions.show(5)

+--------------+-----------+------+-------------+--------------+----------------+----------+
|transaction_id|customer_id|amount|       region|payment_method|product_category|      date|
+--------------+-----------+------+-------------+--------------+----------------+----------+
|             1|        101| 120.5|       Europe|          Card|     Electronics|2024-01-05|
|             2|        102|  75.0|       Europe|          Cash|           Books|2024-01-06|
|             3|        101| 230.0|       Europe|          Card|       Furniture|2024-01-15|
|             4|        103| 49.99|       Europe|          Card|        Clothing|2024-02-02|
|             5|        104| 310.0|North_america|          Card|     Electronics|2024-02-11|
+--------------+-----------+------+-------------+--------------+----------------+----------+
only showing top 5 rows


In [222]:
# get all year-months from date column
from pyspark.sql.functions import year, month

df_transactions = (df_transactions.withColumn('year', year(col('date'))).withColumn('month', month(col('date'))))

years = df_transactions.select('year').collect()
months = df_transactions.select('month').collect()

print(years)
print()
print(months)

[Row(year=2024), Row(year=2024), Row(year=2024), Row(year=2024), Row(year=2024), Row(year=2024), Row(year=2024), Row(year=2024), Row(year=2024), Row(year=2024), Row(year=2024), Row(year=2024)]

[Row(month=1), Row(month=1), Row(month=1), Row(month=2), Row(month=2), Row(month=2), Row(month=3), Row(month=3), Row(month=3), Row(month=3), Row(month=3), Row(month=3)]


* for customers dataframe

In [223]:
df_customers.show(5)

+-----------+----------+---------+-----------+-------+--------+
|customer_id|first_name|last_name|signup_date|country| segment|
+-----------+----------+---------+-----------+-------+--------+
|        101|     Nikos|   Petrou| 2023-10-01| Greece| Premium|
|        102|     Maria|  Ioannou| 2023-11-15| Greece|Standard|
|        103|      John|    Smith| 2022-05-20|     UK|Standard|
|        104|     Emily|    Brown| 2021-09-30|    USA| Premium|
|        105|    Carlos|   Mendez| 2022-12-10|    USA|Standard|
+-----------+----------+---------+-----------+-------+--------+
only showing top 5 rows


In [224]:
# check for data type
df_customers.select('signup_date').printSchema()

root
 |-- signup_date: date (nullable = true)



# Data Enrichment

In [225]:
# create a new dataframe combined by the 2 dataframes, (JOIN)
df_customers.show(5)

+-----------+----------+---------+-----------+-------+--------+
|customer_id|first_name|last_name|signup_date|country| segment|
+-----------+----------+---------+-----------+-------+--------+
|        101|     Nikos|   Petrou| 2023-10-01| Greece| Premium|
|        102|     Maria|  Ioannou| 2023-11-15| Greece|Standard|
|        103|      John|    Smith| 2022-05-20|     UK|Standard|
|        104|     Emily|    Brown| 2021-09-30|    USA| Premium|
|        105|    Carlos|   Mendez| 2022-12-10|    USA|Standard|
+-----------+----------+---------+-----------+-------+--------+
only showing top 5 rows


In [226]:
df_transactions.show()

+--------------+-----------+------+-------------+--------------+----------------+----------+----+-----+
|transaction_id|customer_id|amount|       region|payment_method|product_category|      date|year|month|
+--------------+-----------+------+-------------+--------------+----------------+----------+----+-----+
|             1|        101| 120.5|       Europe|          Card|     Electronics|2024-01-05|2024|    1|
|             2|        102|  75.0|       Europe|          Cash|           Books|2024-01-06|2024|    1|
|             3|        101| 230.0|       Europe|          Card|       Furniture|2024-01-15|2024|    1|
|             4|        103| 49.99|       Europe|          Card|        Clothing|2024-02-02|2024|    2|
|             5|        104| 310.0|North_america|          Card|     Electronics|2024-02-11|2024|    2|
|             6|        105|  89.9|North_america|          Cash|          Beauty|2024-02-19|2024|    2|
|             7|        101| 150.0|       Europe|          Card|

In [227]:
df_joined = df_customers.join(df_transactions, on='customer_id', how='left')

df_joined.show()

+-----------+----------+---------+-----------+-------+--------+--------------+------+-------------+--------------+----------------+----------+----+-----+
|customer_id|first_name|last_name|signup_date|country| segment|transaction_id|amount|       region|payment_method|product_category|      date|year|month|
+-----------+----------+---------+-----------+-------+--------+--------------+------+-------------+--------------+----------------+----------+----+-----+
|        101|     Nikos|   Petrou| 2023-10-01| Greece| Premium|             7| 150.0|       Europe|          Card|     Electronics|2024-03-01|2024|    3|
|        101|     Nikos|   Petrou| 2023-10-01| Greece| Premium|             3| 230.0|       Europe|          Card|       Furniture|2024-01-15|2024|    1|
|        101|     Nikos|   Petrou| 2023-10-01| Greece| Premium|             1| 120.5|       Europe|          Card|     Electronics|2024-01-05|2024|    1|
|        102|     Maria|  Ioannou| 2023-11-15| Greece|Standard|            1

# Transformations

In [228]:
# add new column is_high_value. if the amount > 0, then 1, else 0
from pyspark.sql.functions import when, col

df_joined = df_joined.withColumn(
    'is_high_value', when(col('amount') > 200, 1).otherwise(0)
)

df_joined.select('is_high_value').show()

+-------------+
|is_high_value|
+-------------+
|            0|
|            1|
|            0|
|            0|
|            0|
|            0|
|            1|
|            1|
|            0|
|            0|
|            0|
|            0|
+-------------+



In [229]:
# grab specific and most important columns

df_joined.show()

+-----------+----------+---------+-----------+-------+--------+--------------+------+-------------+--------------+----------------+----------+----+-----+-------------+
|customer_id|first_name|last_name|signup_date|country| segment|transaction_id|amount|       region|payment_method|product_category|      date|year|month|is_high_value|
+-----------+----------+---------+-----------+-------+--------+--------------+------+-------------+--------------+----------------+----------+----+-----+-------------+
|        101|     Nikos|   Petrou| 2023-10-01| Greece| Premium|             7| 150.0|       Europe|          Card|     Electronics|2024-03-01|2024|    3|            0|
|        101|     Nikos|   Petrou| 2023-10-01| Greece| Premium|             3| 230.0|       Europe|          Card|       Furniture|2024-01-15|2024|    1|            1|
|        101|     Nikos|   Petrou| 2023-10-01| Greece| Premium|             1| 120.5|       Europe|          Card|     Electronics|2024-01-05|2024|    1|       

In [230]:
df_joined = df_joined.select([
    'customer_id', 'first_name', 'last_name', 'country', 'segment', 'transaction_id', 'date', 'year', 'month', 'amount',
    'region', 'payment_method', 'product_category', 'is_high_value'
])

df_joined.show(5)

+-----------+----------+---------+-------+--------+--------------+----------+----+-----+------+------+--------------+----------------+-------------+
|customer_id|first_name|last_name|country| segment|transaction_id|      date|year|month|amount|region|payment_method|product_category|is_high_value|
+-----------+----------+---------+-------+--------+--------------+----------+----+-----+------+------+--------------+----------------+-------------+
|        101|     Nikos|   Petrou| Greece| Premium|             7|2024-03-01|2024|    3| 150.0|Europe|          Card|     Electronics|            0|
|        101|     Nikos|   Petrou| Greece| Premium|             3|2024-01-15|2024|    1| 230.0|Europe|          Card|       Furniture|            1|
|        101|     Nikos|   Petrou| Greece| Premium|             1|2024-01-05|2024|    1| 120.5|Europe|          Card|     Electronics|            0|
|        102|     Maria|  Ioannou| Greece|Standard|            11|2024-03-25|2024|    3|135.75|Europe|    

# Aggregations (Analytics Step)

In [231]:
from pyspark.sql.functions import sum, avg, count, col

df_aggregate = df_joined.groupBy('region', 'year', 'month').agg(
    sum(col('amount')).alias('total_amount'),
    avg(col('amount')).alias('avg_amount'),
    count(col('*')).alias('transaction_count'),
    sum(when(col("is_high_value") == 1, 1).otherwise(0)).alias('high_value_count')
)

df_aggregate.show()

+-------------+----+-----+------------+------------------+-----------------+----------------+
|       region|year|month|total_amount|        avg_amount|transaction_count|high_value_count|
+-------------+----+-----+------------+------------------+-----------------+----------------+
|       Europe|2024|    1|       425.5|141.83333333333334|                3|               1|
|       Europe|2024|    3|      285.75|           142.875|                2|               0|
|       Europe|2024|    2|       49.99|             49.99|                1|               0|
|         Asia|2024|    3|       245.5|            122.75|                2|               0|
|North_america|2024|    2|       399.9|            199.95|                2|               1|
|North_america|2024|    3|       580.0|             290.0|                2|               1|
+-------------+----+-----+------------+------------------+-----------------+----------------+



In [232]:
# clear the avg_amount to 2 decimals
from pyspark.sql.functions import round, col

df_aggregate = df_aggregate.withColumn(
    'avg_amount', round(col('avg_amount'), 2)
)

df_aggregate.show()

+-------------+----+-----+------------+----------+-----------------+----------------+
|       region|year|month|total_amount|avg_amount|transaction_count|high_value_count|
+-------------+----+-----+------------+----------+-----------------+----------------+
|       Europe|2024|    1|       425.5|    141.83|                3|               1|
|       Europe|2024|    3|      285.75|    142.88|                2|               0|
|       Europe|2024|    2|       49.99|     49.99|                1|               0|
|         Asia|2024|    3|       245.5|    122.75|                2|               0|
|North_america|2024|    2|       399.9|    199.95|                2|               1|
|North_america|2024|    3|       580.0|     290.0|                2|               1|
+-------------+----+-----+------------+----------+-----------------+----------------+



# extract the final dataset

In [233]:
df_aggregate.toPandas().to_csv('./output/data_agg.csv')