In [1]:
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, StringType, FloatType
)
from pyspark.sql.functions import col, when
from pyspark.storagelevel import StorageLevel
from typing import Dict


class SparkCSVToParquet:
    def __init__(self, app_name: str):
        load_dotenv()
        self.spark = self._create_spark_session(app_name)
        print(f"[INFO] Spark session started: {app_name}")

    def _create_spark_session(self, app_name: str) -> SparkSession:
        aws_access_key = os.getenv("AWS_ACCESS_KEY_ID")
        aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")

        return SparkSession.builder \
            .appName(app_name) \
            .config("spark.hadoop.fs.s3a.access.key", aws_access_key) \
            .config("spark.hadoop.fs.s3a.secret.key", aws_secret_key) \
            .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
            .getOrCreate()

    def read_csv(self, path: str, schema: StructType, delimiter: str = ";") -> DataFrame:
        print(f"[INFO] Reading CSV: {path}")
        return self.spark.read \
            .option("header", True) \
            .option("delimiter", delimiter) \
            .schema(schema) \
            .csv(path)

    def transform_column_to_date(self, df: DataFrame, column_name: str) -> DataFrame:
        print(f"[INFO] Casting column '{column_name}' to DateType")
        return df.withColumn(column_name, col(column_name).cast("date"))

    def cache_and_persist(self, df: DataFrame) -> DataFrame:
        return df.cache().persist(StorageLevel.MEMORY_AND_DISK)

    def save_as_parquet(self, df: DataFrame, path: str, mode: str = "overwrite"):
        print(f"[INFO] Saving DataFrame to Parquet: {path}")
        df.write.mode(mode).parquet(path)

    def stop(self):
        print("[INFO] Stopping Spark session")
        self.spark.stop()


if __name__ == "__main__":
    processor = SparkCSVToParquet("CSV to Parquet Pipeline")

    schemas: Dict[str, StructType] = {
        "card": StructType([
            StructField("card_id", IntegerType(), True),
            StructField("disp_id", IntegerType(), True),
            StructField("type", StringType(), True),
            StructField("issued", StringType(), True),
        ]),
        "client": StructType([
            StructField("client_id", IntegerType(), True),
            StructField("gender", StringType(), True),
            StructField("birth_date", StringType(), True),
            StructField("district_id", IntegerType(), True),
        ]),
        "disp": StructType([
            StructField("disp_id", IntegerType(), True),
            StructField("client_id", IntegerType(), True),
            StructField("account_id", IntegerType(), True),
            StructField("type", StringType(), True),
        ]),
        "account": StructType([
            StructField("account_id", IntegerType(), True),
            StructField("district_id", IntegerType(), True),
            StructField("frequency", StringType(), True),
            StructField("date", StringType(), True),
        ]),
        "district": StructType([
            StructField("district_id", IntegerType(), True),
            StructField("district_name", StringType(), True),
            StructField("region", StringType(), True),
            StructField("population", IntegerType(), True),
            StructField("A2", IntegerType(), True),
            StructField("A3", IntegerType(), True),
            StructField("A4", IntegerType(), True),
            StructField("A5", IntegerType(), True),
            StructField("A6", IntegerType(), True),
            StructField("A7", IntegerType(), True),
            StructField("A8", IntegerType(), True),
            StructField("A9", IntegerType(), True),
            StructField("A10", FloatType(), True),
            StructField("A11", FloatType(), True),
            StructField("A12", IntegerType(), True),
            StructField("A13", IntegerType(), True),
            StructField("A14", IntegerType(), True),
            StructField("A15", IntegerType(), True),
            StructField("A16", IntegerType(), True),
        ]),
        "loan": StructType([
            StructField("loan_id", IntegerType(), True),
            StructField("account_id", IntegerType(), True),
            StructField("date", StringType(), True),
            StructField("amount", IntegerType(), True),
            StructField("duration", IntegerType(), True),
            StructField("payments", FloatType(), True),
            StructField("status", StringType(), True)
        ]),
        "order": StructType([
            StructField("order_id", IntegerType(), True),
            StructField("account_id", IntegerType(), True),
            StructField("bank_to", StringType(), True),
            StructField("account_to", IntegerType(), True),
            StructField("amount", FloatType(), True),
            StructField("k_symbol", StringType(), True)
        ]),
        # Add schema for trans
        "trans": StructType([
            StructField("trans_id", IntegerType(), True),
            StructField("account_id", IntegerType(), True),
            StructField("date", StringType(), True),
            StructField("type", StringType(), True),
            StructField("operation", StringType(), True),
            StructField("amount", FloatType(), True),
            StructField("balance", FloatType(), True),
            StructField("k_symbol", StringType(), True),
            StructField("branch", StringType(), True),
            StructField("bank", StringType(), True),
            StructField("account", IntegerType(), True)
        ])
    }

    sources = {
        "card": {
            "path": "s3a://nmourmx-scigility/Bronze/card/card.csv",
            "date_col": "issued",
            "output": "s3a://nmourmx-scigility/Silver/card_parquet/",
        },
        "client": {
            "path": "s3a://nmourmx-scigility/Bronze/client/client.csv",
            "date_col": "birth_date",
            "output": "s3a://nmourmx-scigility/Silver/client_parquet/",
        },
        "disp": {
            "path": "s3a://nmourmx-scigility/Bronze/disp/disp.csv",
            "date_col": None,
            "output": "s3a://nmourmx-scigility/Silver/disp_parquet/",
        },
        "account": {
            "path": "s3a://nmourmx-scigility/Bronze/account/account.csv",
            "date_col": "date",
            "output": "s3a://nmourmx-scigility/Silver/account_parquet/",
        },
        "district": {
            "path": "s3a://nmourmx-scigility/Bronze/district/district.csv",
            "date_col": None,
            "output": "s3a://nmourmx-scigility/Silver/district_parquet/",
        },
        "loan": {
            "path": "s3a://nmourmx-scigility/Bronze/loan/loan.csv",
            "date_col": "date",
            "output": "s3a://nmourmx-scigility/Silver/loan_parquet/",
        },
        "order": {
            "path": "s3a://nmourmx-scigility/Bronze/order/order.csv",
            "date_col": None,
            "output": "s3a://nmourmx-scigility/Silver/order_parquet/",
        },
        # Add trans source
        "trans": {
            "path": "s3a://nmourmx-scigility/Bronze/trans/trans.csv",
            "date_col": "date",
            "output": "s3a://nmourmx-scigility/Silver/trans_fixed_parquet/",
        }
    }

    for name, config in sources.items():
        print(f"\n========== Processing {name.upper()} ==========")
        df = processor.read_csv(config["path"], schemas[name])
        if config["date_col"]:
            df = processor.transform_column_to_date(df, config["date_col"])

        # Special fix for trans type typo
        if name == "trans":
            from pyspark.sql.functions import when
            print("[INFO] Fixing typo in 'type' column for trans dataset")
            df = df.withColumn("type", when(col("type") == "PRJIEM", "PRIJEM").otherwise(col("type")))
            # Select distinct values from the 'type' column
            distinct_types = df.select("type").distinct()      
            # Show the distinct values
            distinct_types.show()

        df = processor.cache_and_persist(df)
        df.show(10)
        processor.save_as_parquet(df, config["output"])

    processor.stop()


25/07/29 11:36:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


[INFO] Spark session started: CSV to Parquet Pipeline

[INFO] Reading CSV: s3a://nmourmx-scigility/Bronze/card/card.csv


25/07/29 11:36:58 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


[INFO] Casting column 'issued' to DateType


25/07/29 11:37:02 WARN CacheManager: Asked to cache already cached data.
                                                                                

+-------+-------+-------+----------+
|card_id|disp_id|   type|    issued|
+-------+-------+-------+----------+
|      1|      9|   gold|1998-10-16|
|      2|     19|classic|1998-03-13|
|      3|     41|   gold|1995-09-03|
|      4|     42|classic|1998-11-26|
|      5|     51| junior|1995-04-24|
|      7|     56|classic|1998-06-11|
|      8|     60| junior|1998-05-20|
|      9|     76|classic|1997-10-25|
|     10|     77|classic|1996-12-07|
|     11|     79|   gold|1997-10-25|
+-------+-------+-------+----------+
only showing top 10 rows

[INFO] Saving DataFrame to Parquet: s3a://nmourmx-scigility/Silver/card_parquet/


                                                                                


[INFO] Reading CSV: s3a://nmourmx-scigility/Bronze/client/client.csv
[INFO] Casting column 'birth_date' to DateType


25/07/29 11:37:12 WARN CacheManager: Asked to cache already cached data.


+---------+------+----------+-----------+
|client_id|gender|birth_date|district_id|
+---------+------+----------+-----------+
|        1|     F|1970-12-13|         18|
|        2|     M|1945-02-04|          1|
|        3|     F|1940-10-09|          1|
|        4|     M|1956-12-01|          5|
|        5|     F|1960-07-03|          5|
|        6|     M|1919-09-22|         12|
|        7|     M|1929-01-25|         15|
|        8|     F|1938-02-21|         51|
|        9|     M|1935-10-16|         60|
|       10|     M|1943-05-01|         57|
+---------+------+----------+-----------+
only showing top 10 rows

[INFO] Saving DataFrame to Parquet: s3a://nmourmx-scigility/Silver/client_parquet/


                                                                                


[INFO] Reading CSV: s3a://nmourmx-scigility/Bronze/disp/disp.csv


25/07/29 11:37:15 WARN CacheManager: Asked to cache already cached data.


+-------+---------+----------+---------+
|disp_id|client_id|account_id|     type|
+-------+---------+----------+---------+
|      1|        1|         1|    OWNER|
|      2|        2|         2|    OWNER|
|      3|        3|         2|DISPONENT|
|      4|        4|         3|    OWNER|
|      5|        5|         3|DISPONENT|
|      6|        6|         4|    OWNER|
|      7|        7|         5|    OWNER|
|      8|        8|         6|    OWNER|
|      9|        9|         7|    OWNER|
|     10|       10|         8|    OWNER|
+-------+---------+----------+---------+
only showing top 10 rows

[INFO] Saving DataFrame to Parquet: s3a://nmourmx-scigility/Silver/disp_parquet/


                                                                                


[INFO] Reading CSV: s3a://nmourmx-scigility/Bronze/account/account.csv
[INFO] Casting column 'date' to DateType


25/07/29 11:37:17 WARN CacheManager: Asked to cache already cached data.


+----------+-----------+----------------+----------+
|account_id|district_id|       frequency|      date|
+----------+-----------+----------------+----------+
|         1|         18|POPLATEK MESICNE|1995-03-24|
|         2|          1|POPLATEK MESICNE|1993-02-26|
|         3|          5|POPLATEK MESICNE|1997-07-07|
|         4|         12|POPLATEK MESICNE|1996-02-21|
|         5|         15|POPLATEK MESICNE|1997-05-30|
|         6|         51|POPLATEK MESICNE|1994-09-27|
|         7|         60|POPLATEK MESICNE|1996-11-24|
|         8|         57|POPLATEK MESICNE|1995-09-21|
|         9|         70|POPLATEK MESICNE|1993-01-27|
|        10|         54|POPLATEK MESICNE|1996-08-28|
+----------+-----------+----------------+----------+
only showing top 10 rows

[INFO] Saving DataFrame to Parquet: s3a://nmourmx-scigility/Silver/account_parquet/


                                                                                


[INFO] Reading CSV: s3a://nmourmx-scigility/Bronze/district/district.csv


25/07/29 11:37:20 WARN CacheManager: Asked to cache already cached data.


+-----------+--------------+---------------+----------+---+---+---+---+---+----+-----+----+----+-----+-----+-----+----+----+----+
|district_id| district_name|         region|population| A2| A3| A4| A5| A6|  A7|   A8|  A9| A10|  A11|  A12|  A13| A14| A15| A16|
+-----------+--------------+---------------+----------+---+---+---+---+---+----+-----+----+----+-----+-----+-----+----+----+----+
|          1|   Hl.m. Praha|         Prague|   1204953|  0|  0|  0|  1|  1|NULL|12541|NULL|0.43|167.0|85677|99107|NULL|NULL|NULL|
|          2|       Benesov|central Bohemia|     88884| 80| 26|  6|  2|  5|NULL| 8507|NULL|1.85|132.0| 2159| 2674|NULL|NULL|NULL|
|          3|        Beroun|central Bohemia|     75232| 55| 26|  4|  1|  5|NULL| 8980|NULL|2.21|111.0| 2824| 2813|NULL|NULL|NULL|
|          4|        Kladno|central Bohemia|    149893| 63| 29|  6|  2|  6|NULL| 9753|NULL|5.05|109.0| 5244| 5892|NULL|NULL|NULL|
|          5|         Kolin|central Bohemia|     95616| 65| 30|  4|  1|  6|NULL| 9307|NULL

                                                                                


[INFO] Reading CSV: s3a://nmourmx-scigility/Bronze/loan/loan.csv
[INFO] Casting column 'date' to DateType


25/07/29 11:37:23 WARN CacheManager: Asked to cache already cached data.


+-------+----------+----------+------+--------+--------+------+
|loan_id|account_id|      date|amount|duration|payments|status|
+-------+----------+----------+------+--------+--------+------+
|   4959|         2|1994-01-05| 80952|      24|  3373.0|     A|
|   4961|        19|1996-04-29| 30276|      12|  2523.0|     B|
|   4962|        25|1997-12-08| 30276|      12|  2523.0|     A|
|   4967|        37|1998-10-14|318480|      60|  5308.0|     D|
|   4968|        38|1998-04-19|110736|      48|  2307.0|     C|
|   4973|        67|1996-05-02|165960|      24|  6915.0|     A|
|   4986|        97|1997-08-10|102876|      12|  8573.0|     A|
|   4988|       103|1997-12-06|265320|      36|  7370.0|     D|
|   4989|       105|1998-12-05|352704|      48|  7348.0|     C|
|   4990|       110|1997-09-08|162576|      36|  4516.0|     C|
+-------+----------+----------+------+--------+--------+------+
only showing top 10 rows

[INFO] Saving DataFrame to Parquet: s3a://nmourmx-scigility/Silver/loan_parque

                                                                                


[INFO] Reading CSV: s3a://nmourmx-scigility/Bronze/order/order.csv


25/07/29 11:37:25 WARN CacheManager: Asked to cache already cached data.


+--------+----------+-------+----------+------+--------+
|order_id|account_id|bank_to|account_to|amount|k_symbol|
+--------+----------+-------+----------+------+--------+
|   29401|         1|     YZ|  87144583|2452.0|    SIPO|
|   29402|         2|     ST|  89597016|3372.7|    UVER|
|   29403|         2|     QR|  13943797|7266.0|    SIPO|
|   29404|         3|     WX|  83084338|1135.0|    SIPO|
|   29405|         3|     CD|  24485939| 327.0|    NULL|
|   29406|         3|     AB|  59972357|3539.0|POJISTNE|
|   29407|         4|     UV|  26693541|2078.0|    SIPO|
|   29408|         4|     UV|   5848086|1285.0|    SIPO|
|   29409|         5|     GH|  37390208|2668.0|    SIPO|
|   29410|         6|     AB|  44486999|3954.0|    SIPO|
+--------+----------+-------+----------+------+--------+
only showing top 10 rows

[INFO] Saving DataFrame to Parquet: s3a://nmourmx-scigility/Silver/order_parquet/


                                                                                


[INFO] Reading CSV: s3a://nmourmx-scigility/Bronze/trans/trans.csv
[INFO] Casting column 'date' to DateType
[INFO] Fixing typo in 'type' column for trans dataset


25/07/29 11:37:31 WARN CacheManager: Asked to cache already cached data.        


+------+
|  type|
+------+
| VYDAJ|
|PRIJEM|
| VYBER|
+------+



                                                                                

+--------+----------+----------+------+--------------+-------+-------+--------+------+----+--------+
|trans_id|account_id|      date|  type|     operation| amount|balance|k_symbol|branch|bank| account|
+--------+----------+----------+------+--------------+-------+-------+--------+------+----+--------+
|  967842|      1532|1998-05-31|PRIJEM|          NULL|  253.0|62661.0|    UROK|    AR|NULL|       0|
|  271012|      1499|1998-01-09|PRIJEM|         VKLAD| 5500.0|49790.0|    NULL|    AR|NULL|       0|
|  971490|      1645|1996-11-30|PRIJEM|          NULL|   43.0|19460.0|    UROK|    AR|NULL|       0|
|  605699|      3366|1996-10-28| VYDAJ|         VYBER| 9300.0|38433.0|    NULL|    AR|NULL|       0|
|  443107|      2471|1998-07-30| VYDAJ|         VYBER| 7800.0|56237.0|    NULL|    AR|NULL|       0|
|  627433|      3503|1995-06-07| VYDAJ|         VYBER|28640.0|41949.0|    NULL|    AR|NULL|       0|
|  390740|      2162|1997-12-03|PRIJEM|         VKLAD|11111.0|38640.0|    NULL|    AR|NULL|

                                                                                

[INFO] Stopping Spark session
