In [53]:
from pyspark.sql.functions import when, col, lit, when

from spark_connect import sc

spark = sc.spark
bucket = 'test-stock-data-lake'

In [54]:
sub_comp = spark.read \
        .option("inferSchema", True) \
        .option('header', True) \
        .option('sep', ',') \
        .csv(f's3a://{bucket}/subsidiaries/subs_comp.csv')

names_map = {
    'stock_code': 'parent_stock_code',
    'comp_name': 'subsidiary_name',
    'comp_type': 'subsidiary_type'

}

sub_comp = sub_comp \
        .withColumnsRenamed(names_map) \
        .withColumn('subsidiary_type',
                    when(col('subsidiary_type') == 'aff', lit('Affiliate'))
                    .otherwise(lit('Subsidiary'))
        )

sub_comp.show(truncate=False)

+-----------------+-----------------------------------------------------------------------+---------------+---------------+---------------+
|parent_stock_code|subsidiary_name                                                        |subsidiary_type|charter_capital|ownership_ratio|
+-----------------+-----------------------------------------------------------------------+---------------+---------------+---------------+
|ACB              |Công ty TNHH MTV Cho thuê tài chính Ngân hàng Á Châu (ACBL)            |Subsidiary     |300.0          |100.0          |
|ACB              |Công ty TNHH Quản lý nợ và Khai thác tài sản Ngân hàng Á Châu (ACBA)   |Subsidiary     |340.0          |100.0          |
|ACB              |Công ty TNHH Chứng khoán ACB                                           |Subsidiary     |1500.0         |100.0          |
|ACB              |Công ty TNHH MTV Quản lý Quỹ ACB (ACBC)                                |Subsidiary     |50.0           |100.0          |
|BCM              |C

In [55]:
from spark_write_postgres import SparkWritePostgres
from configs.db_config import get_database_config

db_config = get_database_config()

swp = SparkWritePostgres(spark, db_config)

swp.write_data(sub_comp, table_name='fact_subsidiary')

-------- Successfully wrote data to Postgres: fact_subsidiary --------
