In [30]:
# import library
import pyspark

In [31]:
# check pyspark version
pyspark.__version__

'3.5.0'

In [32]:
# import SparkSession
from pyspark.sql import SparkSession

In [33]:
spark = SparkSession \
    .builder \
    .appName("Final Project PySpark") \
    .getOrCreate()

In [34]:
spark

## Load and Handle Failure Data

In [35]:
from dotenv import load_dotenv
import os
import pandas as pd

In [36]:
load_dotenv(".env", override=True)

True

In [37]:
import csv
from datetime import datetime
import json

In [38]:
# INIT LOGS TO SAVE ALL LOGGING

def log_to_csv(log_msg: dict, filename: str):
    # Ensure the 'logs' directory exists
    log_dir = os.path.join(os.getcwd(), 'logs')
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)

    # Create the full file path inside 'logs'
    file_path = os.path.join(log_dir, filename)

    # Define the column headers
    headers = ["step", "status", "source", "table_name", "etl_date"]

    try:
        # Check if the file exists
        file_exists = os.path.isfile(file_path)

        with open(file_path, mode='a', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=headers)

            # Write the header only if the file doesn't exist
            if not file_exists:
                writer.writeheader()

            # Append the log message
            writer.writerow(log_msg)

        print(f"Log written to {file_path}")
    
    except Exception as e:
        print(f"Error writing log to {file_path}: {e}")

## Extract

### CSV Files

In [14]:
def extract_csv(file_path, table_name):
    try:
        # Read CSV using Spark
        df = spark.read.option("header", "true").csv(file_path)

        # Show extracted data
        df.show()

        # Log success
        log_to_csv({
            "step": "Extract",
            "status": "Success",
            "source": "CSV",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        return df
    
    except Exception as e:
        # Log failure
        log_to_csv({
            "step": "Extract",
            "status": f"Failed: {e}",
            "source": "CSV",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        print(f"Error extracting {file_path}: {e}")
        return None

In [15]:
# Extract from CSV

df_people = extract_csv("data/people.csv", "people_data")
df_people.show()

+---------+---------+----------+----------+--------------------+--------------------+
|people_id|object_id|first_name| last_name|          birthplace|    affiliation_name|
+---------+---------+----------+----------+--------------------+--------------------+
|        1|      p:2|       Ben|   Elowitz|                NULL|           Blue Nile|
|        2|      p:3|     Kevin|  Flaherty|                NULL|            Wetpaint|
|        3|      p:4|      Raju|   Vegesna|                NULL|                Zoho|
|        4|      p:5|       Ian|     Wenig|                NULL|                Zoho|
|        5|      p:6|     Kevin|      Rose|         Redding, CA|        i/o Ventures|
|        6|      p:7|       Jay|   Adelson|         Detroit, MI|                Digg|
|        7|      p:8|      Owen|     Byrne|                NULL|                Digg|
|        8|      p:9|       Ron|Gorodetzky|                NULL|                Digg|
|        9|     p:10|      Mark|Zuckerberg|           

In [16]:
# Extract from CSV

df_relations = extract_csv("data/relationships.csv", "relationships_data")
df_relations.show()

+---------------+----------------+----------------------+--------------------+--------------------+-------+--------+--------------------+--------------------+--------------------+
|relationship_id|person_object_id|relationship_object_id|            start_at|              end_at|is_past|sequence|               title|          created_at|          updated_at|
+---------------+----------------+----------------------+--------------------+--------------------+-------+--------+--------------------+--------------------+--------------------+
|              1|             p:2|                   c:1|                NULL|                NULL|  false|       8|Co-Founder/CEO/Bo...|2007-05-25 07:03:...|2013-06-03 09:58:...|
|              2|             p:3|                   c:1|                NULL|                NULL|  false|  279242|        VP Marketing|2007-05-25 07:04:...|2010-05-21 16:31:...|
|              3|             p:4|                   c:3|                NULL|                NULL| 

### Database

In [10]:
# Set Variable for Database (data source)

DB_URL = os.getenv("DB_URL")
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASS")

In [11]:
def extract_from_db():
    try:
        # Get list of tables from the database
        table_list = spark.read \
            .format("jdbc") \
            .option("url", DB_URL) \
            .option("dbtable", "information_schema.tables") \
            .option("user", DB_USER) \
            .option("password", DB_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load() \
            .filter("table_schema = 'public'") \
            .select("table_name") \
            .rdd.flatMap(lambda x: x).collect()

        print(f"Found tables: {table_list}")

        tables = {}
        for table in table_list:
            try:
                # Read each table into a DataFrame
                df = spark.read \
                    .format("jdbc") \
                    .option("url", DB_URL) \
                    .option("dbtable", table) \
                    .option("user", DB_USER) \
                    .option("password", DB_PASS) \
                    .option("driver", "org.postgresql.Driver") \
                    .load()

                tables[table] = df

                # Log success for each table
                log_to_csv({
                    "step": "Extract",
                    "status": "Success",
                    "source": "PostgreSQL",
                    "table_name": table,
                    "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                }, "etl_log.csv")

                print(f"Successfully extracted table: {table}")

            except Exception as e:
                # Log failure for specific table
                log_to_csv({
                    "step": "Extract",
                    "status": f"Failed: {e}",
                    "source": "PostgreSQL",
                    "table_name": table,
                    "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                }, "etl_log.csv")
                print(f"Failed to extract table: {table} - Error: {e}")

        return tables
    
    except Exception as e:
        # Log failure for the whole extraction process
        log_to_csv({
            "step": "Extract",
            "status": f"Failed: {e}",
            "source": "PostgreSQL",
            "table_name": "N/A",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")
        print(f"Failed to extract tables: {e}")
        return {}

In [19]:
# Extract from db

tables = extract_from_db()
print(f"Extracted tables: {list(tables.keys())}")

Found tables: ['company', 'acquisition', 'funding_rounds', 'funds', 'investments', 'ipos']
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: company
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: acquisition
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: funding_rounds
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: funds
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: investments
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: ipos
Extracted tables: ['company', 'acquisition', 'funding_rounds', 'funds', 'investments', 'ipos']


In [20]:
# Read all table

df_acquisition = tables["acquisition"]
df_company = tables["company"]
df_funding_rounds = tables["funding_rounds"]
df_funds = tables["funds"]
df_investments = tables["investments"]
df_ipos = tables["ipos"]

# check
df_company.show()

+---------+---------+-----------------+--------------------+--------------------+----------+--------------+----------+----------+------------+---------+-----------+-------------------+-------------------+
|office_id|object_id|      description|              region|            address1|  address2|          city|  zip_code|state_code|country_code| latitude|  longitude|         created_at|         updated_at|
+---------+---------+-----------------+--------------------+--------------------+----------+--------------+----------+----------+------------+---------+-----------+-------------------+-------------------+
|        8|      c:8|                 |              SF Bay|959 Skyway Road, ...|          |    San Carlos|     94070|        CA|         USA|37.506885|-122.247573|2007-01-01 22:19:54|2007-01-01 22:19:54|
|        9|      c:9|     Headquarters|         Los Angeles|9229 W. Sunset Blvd.|          |West Hollywood|     90069|        CA|         USA|34.090368|-118.393064|2007-01-01 22:19

### From API

In [21]:
import requests

In [22]:
def extract_api(link_api: str, list_parameter: dict, data_name: str):
    try:
        # Establish connection to API
        resp = requests.get(link_api, params=list_parameter)
        resp.raise_for_status()  # Raise an exception for HTTP errors

        # Parse the response JSON
        raw_response = resp.json()

        # Convert JSON data to pandas DataFrame
        df_api = pd.DataFrame(raw_response)

        if df_api.empty:
            raise ValueError("Empty response from API")

        # Convert pandas DataFrame to PySpark DataFrame
        spark_df = spark.createDataFrame(df_api)

        # Log success
        log_msg = {
            "step": "Extract",
            "status": "Success",
            "source": "API",
            "table_name": data_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        log_to_csv(log_msg, "etl_log.csv")

        print(f"Successfully extracted data from API: {data_name}")
        return spark_df

    except requests.exceptions.RequestException as e:
        # Log request failure
        log_msg = {
            "step": "Extract",
            "status": f"Failed: {e}",
            "source": "API",
            "table_name": data_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        log_to_csv(log_msg, "etl_log.csv")
        print(f"Request failed: {e}")

    except ValueError as e:
        # Log parsing failure
        log_msg = {
            "step": "Extract",
            "status": f"Failed: {e}",
            "source": "API",
            "table_name": data_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        log_to_csv(log_msg, "etl_log.csv")
        print(f"Parsing error: {e}")

    except Exception as e:
        # Catch any other errors
        log_msg = {
            "step": "Extract",
            "status": f"Failed: {e}",
            "source": "API",
            "table_name": data_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }
        log_to_csv(log_msg, "etl_log.csv")
        print(f"An error occurred: {e}")

    return None

In [23]:
# Extract from API (year 2008 - 2010)

link_api = "https://api-milestones.vercel.app/api/data"
list_parameter = {
    "start_date": "2008-01-01",
    "end_date": "2010-12-31"
}

df_milestones = extract_api(link_api, list_parameter, "milestones")
df_milestones.show()

Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted data from API: milestones
+--------------------+--------------------+------------+--------------+------------+---------+--------------------+--------------------+--------------------+
|          created_at|         description|milestone_at|milestone_code|milestone_id|object_id|  source_description|          source_url|          updated_at|
+--------------------+--------------------+------------+--------------+------------+---------+--------------------+--------------------+--------------------+
|2008-06-18 08:14:...|Survives iPhone 3...|  2008-06-09|         other|           1|     c:12|Twitter Fails To ...|http://www.techcr...|2008-06-18 08:14:...|
|2008-06-18 08:50:...|More than 4 Billi...|  2008-06-18|         other|           3|     c:59|11 Billion Videos...|http://www.comsco...|2008-06-18 08:50:...|
|2008-06-19 04:14:...|Reddit goes Open ...|  2008-06-18|         other|           4|    c:314|reddit goes open 

## Load - Staging 

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException

In [11]:
!pip install pangres



In [39]:
# Set Variable for Staging

DB_STAGING_URL = os.getenv("DB_STAGING_URL")
DB_STAGING_USER = os.getenv("DB_STAGING_USER")
DB_STAGING_PASS = os.getenv("DB_STAGING_PASS")

In [33]:
from pyspark.sql.utils import AnalysisException
from pangres import upsert
from sqlalchemy import create_engine
import pandas as pd
from datetime import datetime

def load_staging2(df, table_name, mode="overwrite", use_upsert=False, idx_name=None, schema=None, source=None):
    try:
        if use_upsert:
            # Convert Spark DataFrame to Pandas DataFrame
            data = df.toPandas()

            # Create connection to PostgreSQL
            conn = create_engine(f"postgresql://{DB_STAGING_USER}:{DB_STAGING_PASS}@host.docker.internal:5432/pyspark_task_staging")

            # Set index for upsert
            if idx_name is None:
                raise ValueError("Index name is required for upsert mode")

            data = data.set_index(idx_name)

            # Upsert
            upsert(
                con=conn,
                df=data,
                table_name=table_name,
                schema=schema,
                if_row_exists="update"
            )
            print(f"Data upserted to table '{table_name}' successfully!")
        else:
            # Load using Spark
            df.write \
                .format("jdbc") \
                .option("url", "jdbc:postgresql://host.docker.internal:5432/pyspark_task_staging") \
                .option("dbtable", table_name) \
                .option("user", DB_STAGING_USER) \
                .option("password", DB_STAGING_PASS) \
                .option("driver", "org.postgresql.Driver") \
                .mode(mode) \
                .save()

            print(f"Data loaded to table '{table_name}' successfully!")

        # Success log
        log_msg = {
            "step": "Load Staging",
            "status": "Success",
            "source": "Staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }

    except Exception as e:
        print(f"Error loading data to table '{table_name}': {e}")

        # Failed DataFrame
        failed_data = df.toPandas() if not use_upsert else data
        failed_data['error_message'] = str(e)
        failed_data['etl_date'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        # Fail log
        log_msg = {
            "step": "Load Staging",
            "status": "Failed",
            "source": "Staging",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_message": str(e)
        }

        # Save failed data to CSV
        failed_log_path = f'logs/failed_{table_name}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
        failed_data.to_csv(failed_log_path, index=False)
        print(f"Failed data saved to: {failed_log_path}")

    finally:
        # Delete error_message before save it to log
        if 'error_message' in log_msg:
            del log_msg['error_message']

        # Simpan log ke CSV
        log_to_csv(log_msg, 'etl_log.csv')

    return df if not use_upsert else data


In [34]:
# from API

load_staging2(df_milestones, "milestones")

Data loaded to table 'milestones' successfully!
Log written to /home/jovyan/work/logs/etl_log.csv


DataFrame[created_at: string, description: string, milestone_at: string, milestone_code: string, milestone_id: bigint, object_id: string, source_description: string, source_url: string, updated_at: string]

In [35]:
# from CSV

load_staging2(df_relations, "relationship")
load_staging2(df_people, "people", mode="overwrite")

Data loaded to table 'relationship' successfully!
Log written to /home/jovyan/work/logs/etl_log.csv
Data loaded to table 'people' successfully!
Log written to /home/jovyan/work/logs/etl_log.csv


DataFrame[people_id: string, object_id: string, first_name: string, last_name: string, birthplace: string, affiliation_name: string]

In [36]:
# from database

load_staging2(df_acquisition, "acquisition") 
load_staging2(df_funding_rounds, "funding_rounds") 
load_staging2(df_funds, "funds")
load_staging2(df_investments, "investments")
load_staging2(df_ipos, "ipo")
load_staging2(df_company, "company") 

Data loaded to table 'acquisition' successfully!
Log written to /home/jovyan/work/logs/etl_log.csv
Data loaded to table 'funding_rounds' successfully!
Log written to /home/jovyan/work/logs/etl_log.csv
Data loaded to table 'funds' successfully!
Log written to /home/jovyan/work/logs/etl_log.csv
Data loaded to table 'investments' successfully!
Log written to /home/jovyan/work/logs/etl_log.csv
Data loaded to table 'ipo' successfully!
Log written to /home/jovyan/work/logs/etl_log.csv
Data loaded to table 'company' successfully!
Log written to /home/jovyan/work/logs/etl_log.csv


DataFrame[office_id: int, object_id: string, description: string, region: string, address1: string, address2: string, city: string, zip_code: string, state_code: string, country_code: string, latitude: decimal(9,6), longitude: decimal(9,6), created_at: timestamp, updated_at: timestamp]

## Extract Data from Staging

In [13]:
spark.catalog.clearCache()

In [40]:
# Set Variable for Staging

DB_STAGING_URL = os.getenv("DB_STAGING_URL")
DB_STAGING_USER = os.getenv("DB_STAGING_USER")
DB_STAGING_PASS = os.getenv("DB_STAGING_PASS")

In [65]:
def extract_from_staging():
    try:
        # Get list of tables from staging
        table_list = spark.read \
            .format("jdbc") \
            .option("url", DB_STAGING_URL) \
            .option("dbtable", "(SELECT table_name FROM information_schema.tables WHERE table_schema = 'public') AS tbl") \
            .option("user", DB_STAGING_USER) \
            .option("password", DB_STAGING_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load() \
            .select("table_name") \
            .rdd.flatMap(lambda x: x).collect()

        print(f"Found tables in staging: {table_list}")

        tables = {}
        for table in table_list:
            try:
                # Read each table into a DataFrame
                df = spark.read \
                    .format("jdbc") \
                    .option("url", DB_STAGING_URL) \
                    .option("dbtable", table) \
                    .option("user", DB_STAGING_USER) \
                    .option("password", DB_STAGING_PASS) \
                    .option("driver", "org.postgresql.Driver") \
                    .load()

                tables[table] = df

                # Log success for each table
                log_to_csv({
                    "step": "Extract",
                    "status": "Success",
                    "source": "PostgreSQL (Staging)",
                    "table_name": table,
                    "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                }, "etl_log.csv")

                print(f"Successfully extracted table: {table}")

            except Exception as e:
                # Log failure for specific table
                log_to_csv({
                    "step": "Extract",
                    "status": f"Failed: {e}",
                    "source": "PostgreSQL (Staging)",
                    "table_name": table,
                    "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                }, "etl_log.csv")
                print(f"Failed to extract table: {table} - Error: {e}")

        return tables
    
    except Exception as e:
        # Log failure for the whole extraction process
        log_to_csv({
            "step": "Extract",
            "status": f"Failed: {e}",
            "source": "PostgreSQL (Staging)",
            "table_name": "N/A",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")
        print(f"Failed to extract tables: {e}")
        return {}

In [66]:
# Extract All Tables from Staging

data = extract_from_staging()
print(f"Extracted tables: {list(data.keys())}")

Found tables in staging: ['investments', 'relationship', 'people', 'ipo', 'company', 'acquisition', 'funding_rounds', 'milestones', 'funds']
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: investments
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: relationship
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: people
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: ipo
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: company
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: acquisition
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: funding_rounds
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: milestones
Log written to /home/jovyan/work/logs/etl_log.csv
Successfully extracted table: funds
Extracted tables: ['investments', 'relationship', '

In [68]:
# Read All Data from Staging

acquisition = data["acquisition"]
company = data["company"]
funding_rounds = data["funding_rounds"]
funds = data["funds"]
investments = data["investments"]
ipos = data["ipo"]
milestones = data["milestones"]
people = data["people"]
relationship = data["relationship"]

# check
company.show()

+---------+---------+-----------------+--------------------+--------------------+----------+--------------+----------+----------+------------+---------+-----------+-------------------+-------------------+
|office_id|object_id|      description|              region|            address1|  address2|          city|  zip_code|state_code|country_code| latitude|  longitude|         created_at|         updated_at|
+---------+---------+-----------------+--------------------+--------------------+----------+--------------+----------+----------+------------+---------+-----------+-------------------+-------------------+
|        8|      c:8|                 |              SF Bay|959 Skyway Road, ...|          |    San Carlos|     94070|        CA|         USA|37.506885|-122.247573|2007-01-01 22:19:54|2007-01-01 22:19:54|
|        9|      c:9|     Headquarters|         Los Angeles|9229 W. Sunset Blvd.|          |West Hollywood|     90069|        CA|         USA|34.090368|-118.393064|2007-01-01 22:19

## Data Profiling

In [75]:
import os
import json
from datetime import datetime, date
from decimal import Decimal

# Helper function to convert values to JSON format
def convert_to_serializable(obj):
    if isinstance(obj, Decimal):
        return float(obj)
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    return obj

def profile_data(person, df, table_name, format_file):
    try:
        n_rows = df.count()
        n_cols = len(df.columns)
        
        column_info = {}
        for col in df.columns:
            data_type = df.schema[col].dataType.simpleString()
            sample_values = df.select(col).distinct().limit(5).rdd.flatMap(lambda x: x).collect()
            null_count = df.filter(df[col].isNull()).count()
            unique_count = df.select(col).distinct().count()
            
            # Min and max values (if numeric or date type)
            try:
                min_value = df.agg({col: "min"}).collect()[0][0]
                max_value = df.agg({col: "max"}).collect()[0][0]
            except:
                min_value = None
                max_value = None
            
            # Persentase missing value
            percentage_missing = round((null_count / n_rows) * 100, 2) if n_rows > 0 else 0.0
            
            # Ambil 5 nilai unik sebagai sampel
            unique_values = df.select(col).distinct().limit(5).rdd.flatMap(lambda x: x).collect()
            
            # Persentase valid date (khusus untuk tipe date dan datetime)
            percentage_valid_date = None
            if data_type in ['date', 'timestamp']:
                valid_date_count = df.filter(df[col].isNotNull()).count()
                percentage_valid_date = round((valid_date_count / n_rows) * 100, 2) if n_rows > 0 else 0.0

            column_info[col] = {
                "data_type": data_type,
                "sample_values": [convert_to_serializable(v) for v in sample_values] if sample_values else None,
                "unique_count": unique_count,
                "unique_value": [convert_to_serializable(v) for v in unique_values] if unique_values else None,
                "null_count": null_count,
                "percentage_missing_value": percentage_missing,
                "min_value": convert_to_serializable(min_value),
                "max_value": convert_to_serializable(max_value),
                "percentage_valid_date": percentage_valid_date
            }
        
        dict_profiling = {
            "created_at": datetime.now().isoformat(),
            "person_in_charge": person,
            "profiling_result": {
                "table_name": table_name,
                "format_file": format_file,
                "n_rows": n_rows,
                "n_cols": n_cols,
                "report": column_info
            }
        }
        
        # Save profiling result to JSON
        folder_path = "data_profiling"
        os.makedirs(folder_path, exist_ok=True)

        file_path = os.path.join(folder_path, f"{table_name}_profiling.json")
        with open(file_path, "w") as f:
            json.dump(dict_profiling, f, indent=4, default=convert_to_serializable)

        print(f"Profiling saved to: {file_path}")

        # Create success log message
        log_msg = {
            "step": "Profiling",
            "status": "Success",
            "source": format_file,
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S") 
        }

    except Exception as e:
        print(f"Error profiling table {table_name}: {e}")

        # Create fail log message
        log_msg = {
            "step": "Profiling",
            "status": f"Failed: {e}",
            "source": format_file,
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S") 
        }

    finally:
        # Save log to CSV
        log_to_csv(log_msg, "etl_log.csv")

    return dict_profiling if 'dict_profiling' in locals() else None


In [76]:
# test 1
profiling_result = profile_data("Mr. A", people, "people_data", "from Staging")
print(json.dumps(profiling_result, indent=2))

Profiling saved to: data_profiling/people_data_profiling.json
Log written to /home/jovyan/work/logs/etl_log.csv
{
  "created_at": "2025-03-22T09:50:39.336805",
  "person_in_charge": "Mr. A",
  "profiling_result": {
    "table_name": "people_data",
    "format_file": "from Staging",
    "n_rows": 226709,
    "n_cols": 6,
    "report": {
      "people_id": {
        "data_type": "string",
        "sample_values": [
          "177264",
          "296",
          "91421",
          "467",
          "177595"
        ],
        "unique_count": 226709,
        "unique_value": [
          "177264",
          "296",
          "91421",
          "467",
          "177595"
        ],
        "null_count": 0,
        "percentage_missing_value": 0.0,
        "min_value": "1",
        "max_value": "99999",
        "percentage_valid_date": null
      },
      "object_id": {
        "data_type": "string",
        "sample_values": [
          "p:105829",
          "p:73",
          "p:171",
          "p

In [77]:
# Profiling All Data

profile_data("Mr. CCC", relationship, "relationship_data", "from Staging")
profile_data("Mrs. H", acquisition, "acquisition_data", "from Staging")
profile_data("Mrs. OP", company, "company_data", "from Staging")
profile_data("Mr. CCC", funding_rounds, "funding_rounds_data", "from Staging")
profile_data("Mr. A", funds, "funds_data", "from Staging")
profile_data("Mrs. H", investments, "investments_data", "from Staging")
profile_data("Mr. A", ipos, "ipos_data", "from Staging")
profile_data("Mrs. OP", milestones, "milestones_data", "from Staging")

Profiling saved to: data_profiling/relationship_data_profiling.json
Log written to /home/jovyan/work/logs/etl_log.csv
Profiling saved to: data_profiling/acquisition_data_profiling.json
Log written to /home/jovyan/work/logs/etl_log.csv
Profiling saved to: data_profiling/company_data_profiling.json
Log written to /home/jovyan/work/logs/etl_log.csv
Profiling saved to: data_profiling/funding_rounds_data_profiling.json
Log written to /home/jovyan/work/logs/etl_log.csv
Profiling saved to: data_profiling/funds_data_profiling.json
Log written to /home/jovyan/work/logs/etl_log.csv
Profiling saved to: data_profiling/investments_data_profiling.json
Log written to /home/jovyan/work/logs/etl_log.csv
Profiling saved to: data_profiling/ipos_data_profiling.json
Log written to /home/jovyan/work/logs/etl_log.csv
Profiling saved to: data_profiling/milestones_data_profiling.json
Log written to /home/jovyan/work/logs/etl_log.csv


{'created_at': '2025-03-22T09:53:41.820808',
 'person_in_charge': 'Mrs. OP',
 'profiling_result': {'table_name': 'milestones_data',
  'format_file': 'from Staging',
  'n_rows': 8152,
  'n_cols': 9,
  'report': {'created_at': {'data_type': 'string',
    'sample_values': ['2010-09-30 04:46:05.000',
     '2010-10-04 23:53:31.000',
     '2010-05-26 23:08:38.000',
     '2010-07-09 05:01:33.000',
     '2010-07-10 12:53:28.000'],
    'unique_count': 7504,
    'unique_value': ['2010-09-30 04:46:05.000',
     '2010-10-04 23:53:31.000',
     '2010-05-26 23:08:38.000',
     '2010-07-09 05:01:33.000',
     '2010-07-10 12:53:28.000'],
    'null_count': 0,
    'percentage_missing_value': 0.0,
    'min_value': '2008-06-18 08:14:06.000',
    'max_value': '2013-12-10 20:15:30.000',
    'percentage_valid_date': None},
   'description': {'data_type': 'string',
    'sample_values': ["Viewfinity named in 'Hottest Boston Companies' List",
     'Centralway invested in \n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n

## Transformation

In [41]:
!pip install unidecode



In [42]:
from pyspark.sql.functions import col, lit, to_date, when, udf
from pyspark.sql.types import IntegerType, StringType
from sqlalchemy import create_engine
from pangres import upsert
import pandas as pd
import re
from unidecode import unidecode
import unicodedata

In [43]:
# Setup Connection to Data Warehouse

DWH_URL = os.getenv("DWH_URL")
DWH_USER = os.getenv("DWH_USER")
DWH_PASS = os.getenv("DWH_PASS")
engine = create_engine(f"postgresql://{DWH_USER}:{DWH_PASS}@host.docker.internal:5432/pyspark_task_dwh")

In [44]:
# Function to Save and Track Invalid IDs

import os
import pandas as pd
from datetime import datetime

def save_invalid_ids(invalid_ids, table_name, folder='logs', filename='invalid_ids.csv'):
    if not invalid_ids:
        print(f"No invalid IDs to save from table '{table_name}'.")
        return
    
    try:
        os.makedirs(folder, exist_ok=True)
        file_path = os.path.join(folder, filename)
        
        # Konversi list ke DataFrame
        df = pd.DataFrame(invalid_ids, columns=['entity_type', 'object_id'])
        df['table_name'] = table_name
        df['timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        
        # Kalau file sudah ada, cek duplikasi biar gak nambah terus
        if os.path.exists(file_path):
            existing_df = pd.read_csv(file_path)
            # Gabung dataframe lalu drop duplikat
            df = pd.concat([existing_df, df]).drop_duplicates(subset=['entity_type', 'object_id', 'table_name'])
        
        # Tulis ulang ke file (bukan append) biar data tetap konsisten
        df.to_csv(file_path, mode='w', index=False)
        
        print(f"{len(invalid_ids)} invalid IDs from table '{table_name}' saved to {file_path}")
    
    except Exception as e:
        print(f"Error saving invalid IDs from table '{table_name}': {e}")



### Helper Function to Analyze Column Values

In [51]:
# Cleaning Values to Make it More Readable

@udf(returnType=IntegerType())
def clean_integer(value):
    if isinstance(value, str):
        match = re.match(r"^[a-zA-Z]:(\d+)", value) 
        if match:
            return int(match.group(1))  # Catch value after ":"
        else:
            return None 
    return value
    

@udf(returnType=StringType())
def clean_text(value):
    if value:
        try:
            # Handle encoding issue 
            value = value.encode('latin1').decode('utf-8')
        except (UnicodeEncodeError, UnicodeDecodeError):
            pass
        # Normalization
        value = unicodedata.normalize("NFKD", value)
        # Handle strange character
        value = re.sub(r'[^\x00-\x7F]+', '', value)
        value = value.strip()
        value = unidecode(value)
    return value


@udf(returnType=StringType())
def normalize_text(value):
    if not isinstance(value, str) or not value.strip():
        return None  
    
    # Make it lowercase
    value = value.lower()
    
    # HDelete strange char
    value = re.sub(r'[^\w\s,&/]', '', value)  # Alphanumeric, space, coma, apersand, slash
    
    value = re.sub(r'[/,&]', ' ', value) 
    
    # Delete exaggerated space
    value = re.sub(r'\s+', ' ', value).strip()
    
    return value


@udf(returnType=StringType())
def clean_alpha_text(text):
    if text:
        # Delete all strange char, except alphanumeric and space
        return re.sub(r'[^\w\s]', '', text).strip()
    return None


@udf(returnType=StringType())
def fix_encoding(s):
    if s is not None:
        try:
            return unidecode(s)
        except Exception as e:
            return None
    return s

In [46]:
# For Extracting Prefix and Numeric ID

@udf(returnType=StringType())
def extract_prefix(value):
    if value and ":" in value:
        return value.split(":")[0]
    return None

@udf(returnType=IntegerType())
def extract_id(value):
    if value and ":" in value:
        try:
            return int(value.split(":")[1])
        except ValueError:
            return None
    return None

In [47]:
# For Handle Stock-related Column

@udf(returnType=StringType())
def extract_stock_market(value):
    if value and ":" in value:
        return value.split(":")[0]
    return None

@udf(returnType=StringType())
def extract_stock_symbol(value):
    if value and ":" in value:
        return value.split(":")[1]
    return None

### Company Data

In [22]:
from pyspark.sql.functions import col, udf, split
from pyspark.sql.types import StringType, IntegerType
from datetime import datetime

def transform_company(df):
    try:
        log_to_csv({
            "step": "Transform",
            "status": "STARTED",
            "source": "staging",
            "table_name": "company",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 1: Replace "" to null
        df = df.na.replace("", None)

        # Step 2: Format data type
        df = df.withColumn("latitude", col("latitude").cast("decimal(9,6)"))
        df = df.withColumn("longitude", col("longitude").cast("decimal(9,6)"))
        
        # Extract Extract prefix and ID 
        df = df.withColumn("entity_type", extract_prefix(col("object_id")))
        df = df.withColumn("object_id", extract_id(col("object_id")))

        # Step 3: Encoding
        df = df.withColumn("description", clean_text("description"))
        df = df.withColumn("address1", clean_text("address1"))
        df = df.withColumn("zip_code", clean_text("zip_code"))
        df = df.withColumn("region", clean_text("region"))
        
        log_to_csv({
            "step": "Format Data",
            "status": "SUCCESS",
            "source": "staging",
            "table_name": "company",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 4: Mapping to target column
        df_transformed = df.select(
            col("office_id").alias("company_id"),
            col("entity_type").alias("entity_type"),
            col("object_id").alias("object_id"),  # INT
            col("description").alias("description"),
            col("address1").alias("address"),
            col("region").alias("region"),
            col("city").alias("city"),
            col("zip_code").alias("zip_code"),
            col("state_code").alias("state_code"),
            col("country_code").alias("country_code"),
            col("latitude").alias("latitude"),
            col("longitude").alias("longitude"),
            col("created_at").alias("created_at"),
            col("updated_at").alias("updated_at")
        )

        log_to_csv({
            "step": "Map Data",
            "status": f"SUCCESS ({df_transformed.count()} rows)",
            "source": "staging",
            "table_name": "company",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 5: Data cleansing 
        df_transformed = df_transformed.fillna({
            "description": "Unknown",
            "address": "Unknown",
            "region": "Unknown",
            "city": "Unknown",
            "zip_code": "Unknown",
            "state_code": "Unknown",
            "country_code": "Unknown"
        })

        # Step 6: Drop duplicate data  and latitude/longitude with value = 0
        df_transformed = df_transformed.dropDuplicates(["object_id"])
        df_transformed = df_transformed.filter((col("latitude") != 0) & (col("longitude") != 0))

        log_to_csv({
            "step": "Clean Data",
            "status": f"SUCCESS ({df_transformed.count()} rows after cleansing)",
            "source": "staging",
            "table_name": "company",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        return df_transformed

        print("The data is successfully transformed")

    except Exception as e:
        log_to_csv({
            "step": "Transform",
            "status": f"FAILED - {str(e)}",
            "source": "staging",
            "table_name": "company",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")
        raise

    


In [23]:
# Read from staging
company = spark.read \
       .format("jdbc") \
       .option("url", DB_STAGING_URL) \
       .option("dbtable", "company") \
       .option("user", DB_STAGING_USER) \
       .option("password", DB_STAGING_PASS) \
       .option("driver", "org.postgresql.Driver") \
       .load()

In [24]:
transformed_df = transform_company(company)

Log written to /home/jovyan/work/logs/etl_log.csv
Log written to /home/jovyan/work/logs/etl_log.csv
Log written to /home/jovyan/work/logs/etl_log.csv
Log written to /home/jovyan/work/logs/etl_log.csv


### People Data

In [53]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, broadcast
from datetime import datetime
import os
import pandas as pd

def transform_people(df):
    try:
        log_to_csv({
            "step": "Transform",
            "status": "STARTED",
            "source": "Staging",
            "table_name": "people",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 1: Replace empty strings with NULL
        df = df.na.replace("", None)

        # Step 2: Create full_name from first_name + last_name
        df = df.withColumn("full_name", concat_ws(" ", col("first_name"), col("last_name")))

        # Step 3: Extract prefix and ID dari object_id
        df = df.withColumn("entity_type", extract_prefix(col("object_id")))
        df = df.withColumn("object_id", extract_id(col("object_id")))

        log_to_csv({
            "step": "Format Data",
            "status": "SUCCESS",
            "source": "staging",
            "table_name": "people",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 4: Mapping to target kolom
        df_transformed = df.select(
            col("people_id").alias("people_id"),
            col("entity_type").alias("entity_type"), 
            col("object_id").alias("object_id"),
            col("full_name").alias("full_name"),
            col("birthplace").alias("birthplace"),
            col("affiliation_name").alias("affiliation_name"),
        )

        df_transformed = df_transformed.withColumn("object_id", clean_integer(col("object_id")))
        df_transformed = df_transformed.withColumn("full_name", clean_alpha_text(col("full_name")))
        df_transformed = df_transformed.withColumn("birthplace", fix_encoding(col("birthplace")))
        df_transformed = df_transformed.withColumn("affiliation_name", clean_alpha_text(col("affiliation_name")))

        log_to_csv({
            "step": "Map Data",
            "status": f"SUCCESS ({df_transformed.count()} rows)",
            "source": "staging",
            "table_name": "people",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 5: Cleaning data
        df_transformed = df_transformed.fillna({
            "full_name": "Unknown",
            "birthplace": "Unknown",
            "affiliation_name": "Unknown"
        })

        # Step 6: Drop some data
        df_transformed = df_transformed.dropDuplicates(["entity_type", "object_id"])
        df_transformed = df_transformed.filter(col("full_name") != "Unknown")

        log_to_csv({
            "step": "Clean Data",
            "status": f"SUCCESS ({df_transformed.count()} rows after cleansing)",
            "source": "staging",
            "table_name": "people",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 7: Validate object_id in `dim_company`
        dim_company = spark.read \
            .format("jdbc") \
            .option("url", DWH_URL) \
            .option("dbtable", "dim_company") \
            .option("user", DWH_USER) \
            .option("password", DWH_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load() \
            .select("object_id")

        # Valid data (match with `dim_company`)
        df_valid = df_transformed.join(
            broadcast(dim_company),
            on="object_id",
            how="inner"
        )

        # Invalid data
        df_invalid = df_transformed.join(
            broadcast(dim_company),
            on="object_id",
            how="left_anti"
        )

        if df_invalid.count() > 0:
            # Convert to pandas and save to csv using `save_invalid_ids`
            invalid_ids = df_invalid.select("entity_type", "object_id").toPandas().values.tolist()
            save_invalid_ids(invalid_ids, table_name="people")

            log_to_csv({
                "step": "Validation",
                "status": f"{df_invalid.count()} rows missing object_id",
                "source": "staging",
                "table_name": "people",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }, "etl_log.csv")

        return df_valid
        
        print("The data is successfully transformed")
        
    except Exception as e:
        log_to_csv({
            "step": "Transform",
            "status": f"FAILED - {str(e)}",
            "source": "staging",
            "table_name": "people",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")
        raise


In [54]:
# Read from staging
people = spark.read \
    .format("jdbc") \
    .option("url", DB_STAGING_URL) \
    .option("dbtable", "people") \
    .option("user", DB_STAGING_USER) \
    .option("password", DB_STAGING_PASS) \
    .option("driver", "org.postgresql.Driver") \
    .load()


In [55]:
# Transform People Data
transformed_people = transform_people(people)

Log written to /home/jovyan/work/logs/etl_log.csv
Log written to /home/jovyan/work/logs/etl_log.csv
Log written to /home/jovyan/work/logs/etl_log.csv
Log written to /home/jovyan/work/logs/etl_log.csv


  existing_df = pd.read_csv(file_path)


204464 invalid IDs from table 'people' saved to logs/invalid_ids.csv
Log written to /home/jovyan/work/logs/etl_log.csv


### Milestones Data

In [None]:
def transform_milestones(df):
    try:
        log_to_csv({
            "step": "Transform",
            "status": "STARTED",
            "source": "staging",
            "table_name": "milestones",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 1: Replace "" to null
        df = df.na.replace("", None)
        df = df.na.replace("NaN", None)
        
        # Step 2: Format data type
        df = df.withColumn("milestone_date", to_date(col("milestone_at")))

        # Step 3: Extract prefix and ID dari object_id
        df = df.withColumn("entity_type", extract_prefix(col("object_id")))
        df = df.withColumn("object_id", extract_id(col("object_id")))
        
        log_to_csv({
            "step": "Format Data",
            "status": "SUCCESS",
            "source": "staging",
            "table_name": "milestones",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 4: Mapping to target column
        df_transformed = df.select(
            col("milestone_id").alias("milestone_id"),
            col("entity_type").alias("entity_type"),
            col("object_id").alias("object_id"),
            col("milestone_date").alias("milestone_date"),
            col("description").alias("description"),
            col("source_url").alias("source_url"),
            col("source_description").alias("source_description"),
            col("created_at").alias("created_at"),
            col("updated_at").alias("updated_at")
        )
            
        # Step 5: Handle strange values
        df_transformed = df_transformed.withColumn("object_id", clean_integer(col("object_id")))
        df_transformed = df_transformed.withColumn("description", clean_alpha_text("description"))
        df_transformed = df_transformed.withColumn("source_url", when(col("source_url").rlike(r"^(http|https)://.*"), col("source_url")).otherwise("Unknown"))
        df_transformed = df_transformed.withColumn("source_description", clean_alpha_text("source_description"))

        log_to_csv({
            "step": "Map Data",
            "status": f"SUCCESS ({df_transformed.count()} rows)",
            "source": "staging",
            "table_name": "milestones",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 6: Cleaning data
        df_transformed = df_transformed.fillna({
            "source_url": "Unknown",
            "description": "No Description",
            "source_description": "Unknown"
        })
        
        # Step 7: Drop duplicate data 
        df_transformed = df_transformed.dropDuplicates(["milestone_id"])
        df_transformed = df_transformed.dropDuplicates(["entity_type", "object_id"])

        log_to_csv({
            "step": "Clean Data",
            "status": f"SUCCESS ({df_transformed.count()} rows after cleansing)",
            "source": "staging",
            "table_name": "milestones",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 8: Validation object_id in `dim_company`
        dim_company = spark.read \
            .format("jdbc") \
            .option("url", DWH_URL) \
            .option("dbtable", "dim_company") \
            .option("user", DWH_USER) \
            .option("password", DWH_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load() \
            .select("object_id")

        # Convert object_id to integer (if needed)
        df_transformed = df_transformed.withColumn("object_id", col("object_id").cast("int"))
        
        # Filter NULL object_id explicitly
        df_transformed = df_transformed.filter(col("object_id").isNotNull())
        
        # Valid data (match with `dim_company`)
        df_valid = df_transformed.join(
            broadcast(dim_company),
            on="object_id",
            how="inner"
        )
        
        # Invalid data
        df_invalid = df_transformed.join(
            broadcast(dim_company),
            on="object_id",
            how="left_anti"
        )
        
        if df_invalid.count() > 0:
            # Convert to pandas and save to csv using `save_invalid_ids`
            invalid_ids = df_invalid.select("entity_type", "object_id").toPandas().values.tolist()
            save_invalid_ids(invalid_ids, table_name="milestone")
        
            log_to_csv({
                "step": "Validation",
                "status": f"{df_invalid.count()} rows missing object_id",
                "source": "staging",
                "table_name": "milestone",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }, "etl_log.csv")

        return df_valid
        
        print("The data is successfully transformed")


    except Exception as e:
        log_to_csv({
            "step": "Transform",
            "status": f"FAILED - {str(e)}",
            "source": "staging",
            "table_name": "milestones",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")
        raise


In [None]:
# Read from staging
milestones = spark.read \
    .format("jdbc") \
    .option("url", DB_STAGING_URL) \
    .option("dbtable", "milestones") \
    .option("user", DB_STAGING_USER) \
    .option("password", DB_STAGING_PASS) \
    .option("driver", "org.postgresql.Driver") \
    .load()


In [None]:
# Transform Milestones Data
transformed_milestones = transform_milestones(milestones)


### Acquisition Data - Fact

In [None]:
from pyspark.sql.functions import col, to_date, broadcast
from datetime import datetime

def transform_acquisition(df):
    try:
        log_to_csv({
            "step": "Transform",
            "status": "STARTED",
            "source": "staging",
            "table_name": "acquisition",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 1: Replace "" to null
        df = df.na.replace("", None)

        # Step 2: Format data type
        df = df.withColumn("price_amount", col("price_amount").cast("decimal(15,2)"))
        df = df.withColumn("acquired_at", to_date(col("acquired_at")))

        # Step 3: Extract prefix and ID
        df = df.withColumn("acquiring_entity_type", extract_prefix(col("acquiring_object_id")))
        df = df.withColumn("acquired_entity_type", extract_prefix(col("acquired_object_id")))
        df = df.withColumn("acquiring_object_id", extract_id(col("acquiring_object_id")))
        df = df.withColumn("acquired_object_id", extract_id(col("acquired_object_id")))

        log_to_csv({
            "step": "Format Data",
            "status": f"SUCCESS ({df.count()} rows)",
            "source": "staging",
            "table_name": "acquisition",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 4: Mapping to target column
        df_transformed = df.select(
            col("acquisition_id"),
            col("acquiring_entity_type"),
            col("acquiring_object_id"),
            col("acquired_entity_type"),
            col("acquired_object_id"),
            col("price_amount"),
            col("price_currency_code"),
            col("acquired_at"),
            col("source_url"),
            col("created_at"),
            col("updated_at")
        )

        # Step 5: Clean integer values
        df_transformed = df_transformed.withColumn("acquiring_object_id", clean_integer(col("acquiring_object_id")))
        df_transformed = df_transformed.withColumn("acquired_object_id", clean_integer(col("acquired_object_id")))
        df_transformed = df_transformed.dropna(subset="acquired_at")
        
        log_to_csv({
            "step": "Map Data",
            "status": f"SUCCESS ({df_transformed.count()} rows)",
            "source": "staging",
            "table_name": "acquisition",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 6: Handle null values
        df_transformed = df_transformed.fillna({
            "price_amount": 0.0,
            "price_currency_code": "Unknown",
            "source_url": "Unknown",
            "term_code": "Unknown"
        })

        # Step 7: Drop duplicates dan unknown data
        df_transformed = df_transformed.dropDuplicates(["acquisition_id"])
        df_transformed = df_transformed.filter(col("term_code") != "Unknown")
        df_transformed = df_transformed.filter(col("price_currency_code") != "Unknown")
        df_transformed = df_transformed.filter(col("price_amount") != 0.0)

        log_to_csv({
            "step": "Clean Data",
            "status": f"SUCCESS ({df_transformed.count()} rows after cleansing)",
            "source": "staging",
            "table_name": "acquisition",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 8: Validation object_id in `dim_company`
        dim_company = spark.read \
            .format("jdbc") \
            .option("url", DWH_URL) \
            .option("dbtable", "dim_company") \
            .option("user", DWH_USER) \
            .option("password", DWH_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load() \
            .select("object_id", "company_id")

        # Valid data
        df_valid = (
            df_transformed
            .join(broadcast(dim_company.alias("acquiring")), col("acquiring_object_id") == col("acquiring.object_id"), "left")
            .join(broadcast(dim_company.alias("acquired")), col("acquired_object_id") == col("acquired.object_id"), "left")
            .select(df_transformed["*"],  
                    col("acquiring.company_id").alias("acquiring_company_id"),  
                    col("acquired.company_id").alias("acquired_company_id"))
        )
        
        # Invalid data
        df_invalid = df_valid.filter(col("acquiring_company_id").isNull() | col("acquired_company_id").isNull())
        
        df_valid = df_valid.filter(col("acquiring_company_id").isNotNull() & col("acquired_company_id").isNotNull())

        if df_invalid.count() > 0:
            invalid_ids = df_invalid.select("acquiring_entity_type", "acquiring_object_id", "acquired_entity_type", "acquired_object_id").toPandas().values.tolist()
            save_invalid_ids(invalid_ids, table_name="acquisition")
            
            log_to_csv({
                "step": "Validation",
                "status": f"{df_invalid.count()} rows with missing object_id",
                "source": "staging",
                "table_name": "acquisition",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }, "etl_log.csv")

        return df_valid
        
        print("The data is successfully transformed")
    
    except Exception as e:
        log_to_csv({
            "step": "ETL Process",
            "status": f"FAILED - {str(e)}",
            "source": "staging",
            "table_name": "acquisition",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")
        raise


In [None]:
# Read from staging
acquisition = spark.read \
    .format("jdbc") \
    .option("url", DB_STAGING_URL) \
    .option("dbtable", "acquisition") \
    .option("user", DB_STAGING_USER) \
    .option("password", DB_STAGING_PASS) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [None]:
# transform acquisition data

transformed_acquisition = transform_acquisition(acquisition)

### Investments Data - Fact

In [None]:
from pyspark.sql.functions import col, to_date, broadcast
from datetime import datetime

def transform_investments(df):
    try:
        log_to_csv({
            "step": "Transform",
            "status": "STARTED",
            "source": "Staging",
            "table_name": "investments",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 1: Replace "" to null
        df = df.na.replace("", None)

        # Step 2: Extract prefix and ID
        df = df.withColumn("funded_entity_type", extract_prefix(col("funded_object_id")))
        df = df.withColumn("investor_entity_type", extract_prefix(col("investor_object_id")))
        df = df.withColumn("funded_object_id", extract_id(col("funded_object_id")))
        df = df.withColumn("investor_object_id", extract_id(col("investor_object_id")))

        log_to_csv({
            "step": "Format Data",
            "status": f"SUCCESS ({df.count()} rows)",
            "source": "staging",
            "table_name": "investments",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 3: Mapping to target column
        df_transformed = df.select(
            col("investment_id").alias("investment_id"),
            col("funding_round_id").alias("funding_round_id"),
            col("funded_entity_type").alias("funded_entity_type"),
            col("funded_object_id").alias("funded_object_id"),
            col("investor_entity_type").alias("investor_entity_type"),
            col("investor_object_id").alias("investor_object_id"),
            col("created_at").alias("created_at"),
            col("updated_at").alias("updated_at")
        )

        # Step 4: Clean integer values
        df_transformed = df_transformed.withColumn("funded_object_id", clean_integer(col("funded_object_id")))
        df_transformed = df_transformed.withColumn("investor_object_id", clean_integer(col("investor_object_id")))
        
        log_to_csv({
            "step": "Map Data",
            "status": f"SUCCESS ({df_transformed.count()} rows)",
            "source": "staging",
            "table_name": "investments",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 6: Drop duplicates
        df_transformed = df_transformed.dropDuplicates(["investment_id"])

        log_to_csv({
            "step": "Clean Data",
            "status": f"SUCCESS ({df_transformed.count()} rows after cleansing)",
            "source": "staging",
            "table_name": "investments",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 7: Validation object_id in `dim_company`
        companies = spark.read \
            .format("jdbc") \
            .option("url", DWH_URL) \
            .option("dbtable", "dim_company") \
            .option("user", DWH_USER) \
            .option("password", DWH_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load() \
            .select(col("object_id"), col("company_id"))
        
        people = spark.read \
            .format("jdbc") \
            .option("url", DWH_URL) \
            .option("dbtable", "dim_people") \
            .option("user", DWH_USER) \
            .option("password", DWH_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load() \
            .select(col("object_id"), col("people_id"))

        # Join with companies and people tables
        df_transformed = df_transformed \
            .join(companies.alias("comp"), df_transformed["investor_object_id"] == col("comp.object_id"), "left") \
            .join(people.alias("peop"), df_transformed["investor_object_id"] == col("peop.object_id"), "left") \
            .withColumn("investor_object_id", when(col("comp.company_id").isNotNull(), col("comp.company_id"))
                        .otherwise(col("peop.people_id"))) \
            .withColumn("investor_entity_type", when(col("comp.company_id").isNotNull(), lit("company"))
                        .otherwise(lit("people")))

        df_transformed = df_transformed \
            .join(companies.alias("funded"), df_transformed["funded_object_id"] == col("funded.object_id"), "left") \
            .withColumn("mapped_funded_object_id", col("funded.company_id"))

        df_transformed = df_transformed.select(
            df_transformed["investment_id"],
            df_transformed["funding_round_id"],
            df_transformed["funded_entity_type"],
            df_transformed["investor_entity_type"],
            df_transformed["investor_object_id"],
            df_transformed["created_at"],
            df_transformed["updated_at"],
            col("mapped_funded_object_id").alias("funded_object_id")
        )

        # Validation
        df_valid = (
            df_transformed
            .join(broadcast(companies.alias("investor")), col("investor_object_id") == col("investor.object_id"), "left")
            .join(broadcast(companies.alias("funded")), col("funded_object_id") == col("funded.object_id"), "left")
            .select(df_transformed["*"],  
                    col("investor.company_id").alias("investor_company_id"),  
                    col("funded.company_id").alias("funded_company_id"))
        )

        df_invalid = df_valid.filter(col("investor_object_id").isNull() | col("funded_object_id").isNull())
        df_valid = df_valid.filter(col("investor_object_id").isNotNull() & col("funded_object_id").isNotNull())
        
        if df_invalid.count() > 0:
            invalid_ids = df_invalid.select("investor_entity_type", "investor_object_id", 
                                            "funded_entity_type", "funded_object_id").toPandas().values.tolist()
            save_invalid_ids(invalid_ids, table_name="investments")
            
            log_to_csv({
                "step": "Validation",
                "status": f"{df_invalid.count()} rows with missing object_id",
                "source": "staging",
                "table_name": "investments",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }, "etl_log.csv")

        return df_valid
        
        print("The data is successfully transformed")
    
    except Exception as e:
        log_to_csv({
            "step": "Transform",
            "status": f"FAILED - {str(e)}",
            "source": "staging",
            "table_name": "investments",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")
        raise


In [None]:
# Read from staging
investments = spark.read \
    .format("jdbc") \
    .option("url", DB_STAGING_URL) \
    .option("dbtable", "investments") \
    .option("user", DB_STAGING_USER) \
    .option("password", DB_STAGING_PASS) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [None]:
# Transform investments data

transformed_investments = transform_investments(investments)

### Funding Rounds Data

In [None]:
from pyspark.sql.functions import col, to_date, broadcast, when, lit
from pyspark.sql.types import IntegerType, StringType
from datetime import datetime

def transform_funding_rounds(df):
    try:
        log_to_csv({
            "step": "Transform",
            "status": "STARTED",
            "source": "Staging",
            "table_name": "funding_rounds",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 1: Replace "" to null
        df = df.na.replace("", None)

        # Step 2: Extract prefix and ID
        df = df.withColumn("funding_entity_type", extract_prefix(col("object_id")))
        df = df.withColumn("object_id", extract_id(col("object_id")))
        df = df.withColumn("object_id", col("object_id").cast(IntegerType()))

        # Step 3: Format data type
        df = df.withColumn("funding_date", to_date(col("funded_at")))
        df = df.withColumn("funding_entity_type", col("funding_entity_type").cast(StringType()))
        df = df.withColumn("participants", col("participants").cast(IntegerType()))

        log_to_csv({
            "step": "Format Data",
            "status": f"SUCCESS ({df.count()} rows)",
            "source": "staging",
            "table_name": "funding_rounds",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 4: Mapping to target column
        df_transformed = df.select(
            col("funding_round_id"),
            col("funding_entity_type"),
            col("object_id").alias("funding_object_id"),
            col("funding_round_type").alias("round_type"),
            col("funding_date"),
            col("raised_currency_code").alias("raised_currency"),
            col("raised_amount"),
            col("raised_amount_usd"),
            col("pre_money_currency_code").alias("pre_money_currency"),
            col("pre_money_valuation"),
            col("pre_money_valuation_usd"),
            col("post_money_currency_code").alias("post_money_currency"),
            col("post_money_valuation"),
            col("post_money_valuation_usd"),
            col("participants"),
            col("source_url"),
            col("source_description"),
            col("created_at"),
            col("updated_at")
        )

        # Step 5: Handle null values
        df_transformed = df_transformed.fillna({
            "round_type": "Unknown",
            "raised_currency": "USD",
            "pre_money_currency": "USD",
            "post_money_currency": "USD",
            "raised_amount_usd": 0.0,
            "source_description": "Unknown",
        })

        # Step 6: Drop duplicates dan invalid data
        df_transformed = df_transformed.dropDuplicates(["funding_round_id"])
        df_transformed = df_transformed.filter(col("round_type") != "Unknown")

        df_transformed = df_transformed.withColumn(
            "source_url", when(col("source_url").rlike(r"^(http|https)://.*"), col("source_url")).otherwise("Unknown")
        )
        df_transformed = df_transformed.withColumn(
            "source_description", clean_alpha_text("source_description")
        )

        log_to_csv({
            "step": "Clean Data",
            "status": f"SUCCESS ({df_transformed.count()} rows after cleansing)",
            "source": "staging",
            "table_name": "funding_rounds",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 7: Load dim_company and dim_people for validation
        companies = spark.read.format("jdbc") \
            .option("url", DWH_URL) \
            .option("dbtable", "dim_company") \
            .option("user", DWH_USER) \
            .option("password", DWH_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load().select(col("company_id"))

        people = spark.read.format("jdbc") \
            .option("url", DWH_URL) \
            .option("dbtable", "dim_people") \
            .option("user", DWH_USER) \
            .option("password", DWH_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load().select(col("people_id"))

        # Validation on funded_object_id
        df_valid = df_transformed \
            .join(broadcast(companies), df_transformed["funding_object_id"] == companies["company_id"], "left") \
            .join(broadcast(people), df_transformed["funding_object_id"] == people["people_id"], "left") \
            .withColumn("funding_object_id",
                        when(col("company_id").isNotNull(), col("company_id"))
                        .otherwise(col("people_id"))) \
            .drop("company_id", "people_id")

        df_invalid = df_valid.filter(col("company_id").isNull() | col("people_id").isNull() | col("funding_object_id").isNull())
        df_valid = df_valid.filter(col("company_id").isNotNull() & col("people_id").isNotNull() & col("funding_object_id").isNotNull())

        if df_invalid.count() > 0:
            invalid_ids = df_invalid.limit(10).toPandas().values.tolist()
            save_invalid_ids(invalid_ids, table_name="funding_rounds")
            
            log_to_csv({
                "step": "Validation",
                "status": f"{df_invalid.count()} rows with missing object_id",
                "source": "staging",
                "table_name": "funding_rounds",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }, "etl_log.csv")

        return df_valid
        
        print("The data is successfully transformed")
    
    except Exception as e:
        log_to_csv({
            "step": "Transform",
            "status": f"FAILED - {str(e)}",
            "source": "staging",
            "table_name": "funding_rounds",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")
        raise

In [None]:
# Read from staging

funding_rounds = spark.read \
    .format("jdbc") \
    .option("url", DB_STAGING_URL) \
    .option("dbtable", "funding_rounds") \
    .option("user", DB_STAGING_USER) \
    .option("password", DB_STAGING_PASS) \
    .option("driver", "org.postgresql.Driver") \
    .load()


In [None]:
# Transform funding rounds

transformed_funding_rounds = transform_funding_rounds(funding_rounds)

### Relationship Data - Fact

In [None]:
from pyspark.sql.functions import col, to_date, broadcast, to_timestamp
from datetime import datetime

def transform_relationship(df):
    try:
        log_to_csv({
            "step": "Transform",
            "status": "STARTED",
            "source": "Staging",
            "table_name": "relationship",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 1: Replace "" to null
        df = df.na.replace("", None)

        # Step 2: Format data type
        df = df.withColumn("start_at", to_date(col("start_at")))
        df = df.withColumn("end_at", to_date(col("end_at")))
        df = df.withColumn("created_at", to_timestamp(col("created_at")))
        df = df.withColumn("updated_at", to_timestamp(col("updated_at")))

        # Step 3: Extract prefix and ID, normalize
        df = df.withColumn("people_entity_type", extract_prefix(col("person_object_id")))
        df = df.withColumn("relationship_entity_type", extract_prefix(col("relationship_object_id")))
        df = df.withColumn("people_object_id", extract_id(col("person_object_id")))
        df = df.withColumn("relationship_object_id", extract_id(col("relationship_object_id")))
        df = df.withColumn("title", normalize_text(col("title")))

        log_to_csv({
            "step": "Format Data",
            "status": f"SUCCESS ({df.count()} rows)",
            "source": "staging",
            "table_name": "relationship",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 3: Mapping to target column
        df_transformed = df.select(
            col("relationship_id"),
            col("people_entity_type").alias("people_entity_type"),
            col("people_object_id").alias("people_object_id"),
            col("relationship_entity_type").alias("relationship_entity_type"),
            col("relationship_object_id").alias("relationship_object_id"),
            col("start_at").alias("start_at"),
            col("end_at").alias("end_at"),
            col("title").alias("title"),
            col("created_at").alias("created_at"),
            col("updated_at").alias("updated_at")
        )
        
        log_to_csv({
            "step": "Map Data",
            "status": f"SUCCESS ({df_transformed.count()} rows)",
            "source": "staging",
            "table_name": "relationship",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 4: Handle null values
        df_transformed = df_transformed.fillna({
            "title": "Unknown"
        })

        # Step 5: Drop duplicates and null values
        df_transformed = df_transformed.dropDuplicates(["relationship_id"])

        log_to_csv({
            "step": "Clean Data",
            "status": f"SUCCESS ({df_transformed.count()} rows after cleansing)",
            "source": "staging",
            "table_name": "relationship",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 6: Load dim_company dan dim_people for validation
        companies = spark.read.format("jdbc") \
            .option("url", DWH_URL) \
            .option("dbtable", "dim_company") \
            .option("user", DWH_USER) \
            .option("password", DWH_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load().select(col("company_id"))

        people = spark.read.format("jdbc") \
            .option("url", DWH_URL) \
            .option("dbtable", "dim_people") \
            .option("user", DWH_USER) \
            .option("password", DWH_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load().select(col("people_id"))

        # Validation on funded_object_id
        df_valid = df_transformed \
            .join(broadcast(companies), df_transformed["relationship_object_id"] == companies["company_id"], "left") \
            .join(broadcast(people), df_transformed["people_object_id"] == people["people_id"], "left") \
            .withColumn("relationship_object_id",
                        when(col("company_id").isNotNull(), col("company_id"))
                        .otherwise(col("people_id"))) \
            .drop("company_id", "people_id")

        df_invalid = df_valid.filter(col("company_id").isNull() | col("people_id").isNull() | col("people_object_id").isNull() | col("relationship_object_id").isNull())
        df_valid = df_valid.filter(col("company_id").isNotNull() & col("people_id").isNotNull() & col("people_object_id").isNotNull() & col("relationship_object_id").isNotNull())

        if df_invalid.count() > 0:
            invalid_ids = df_invalid.limit(10).toPandas().values.tolist()
            save_invalid_ids(invalid_ids, table_name="relationship")

            log_to_csv({
                "step": "Validation",
                "status": f"{df_invalid.count()} rows with missing object_id",
                "source": "staging",
                "table_name": "relationship",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }, "etl_log.csv")

        return df_valid
        
        print("The data is successfully transformed")


    except Exception as e:
        log_to_csv({
            "step": "Transform",
            "status": f"FAILED - {str(e)}",
            "source": "staging",
            "table_name": "relationship",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")
        raise


In [None]:
# Read from staging
relationship = spark.read \
    .format("jdbc") \
    .option("url", DB_STAGING_URL) \
    .option("dbtable", "relationship") \
    .option("user", DB_STAGING_USER) \
    .option("password", DB_STAGING_PASS) \
    .option("driver", "org.postgresql.Driver") \
    .load()


In [None]:
# Transform relationship data

transformed_relationship = transform_relationship(relationship)


### IPO Data - Fact

In [None]:
from pyspark.sql.functions import col, to_date, broadcast, to_timestamp
from datetime import datetime

def transform_ipo(df):
    try:
        log_to_csv({
            "step": "Transform",
            "status": "STARTED",
            "source": "Staging",
            "table_name": "ipo",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 1: Replace "" to null
        df = df.na.replace("", None)

        # Step 2: Format data type
        df = df.withColumn("public_at", to_date(col("public_at")))

        # Step 3: Extract prefix and ID, normalize
        df = df.withColumn("ipo_entity_type", extract_prefix(col("object_id")))
        df = df.withColumn("ipo_object_id", extract_id(col("object_id")))
        df = df.withColumn("stock_market", extract_stock_market(col("stock_symbol")))
        df = df.withColumn("stock_symbol", extract_stock_symbol(col("stock_symbol")))

        log_to_csv({
            "step": "Format Data",
            "status": f"SUCCESS ({df.count()} rows)",
            "source": "staging",
            "table_name": "ipo",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")


        # Step 4: Mapping to target column
        df_transformed = df.select(
            col("ipo_id").alias("ipo_id"),
            col("ipo_entity_type").alias("ipo_entity_type"),
            col("ipo_object_id").alias("ipo_object_id"),
            col("valuation_currency_code").alias("valuation_currency"),
            col("valuation_amount").alias("valuation_amount"),
            col("raised_currency_code").alias("raised_currency"),
            col("raised_amount").alias("raised_amount"),
            col("public_at").alias("public_at"),
            col("stock_market").alias("stock_market"),
            col("stock_symbol").alias("stock_symbol"),
            col("source_url").alias("source_url"),
            col("source_description").alias("source_description"),
            col("created_at").alias("created_at"),
            col("updated_at").alias("updated_at")
        )
        
        log_to_csv({
            "step": "Map Data",
            "status": f"SUCCESS ({df_transformed.count()} rows)",
            "source": "staging",
            "table_name": "ipo",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 5: Handle null values
        df_transformed = df_transformed.fillna({
            "valuation_amount": 0.0,
            "valuation_currency": "USD",
            "raised_amount": 0.0,
            "raised_currency": "USD",
            "stock_market": "N/A",
            "stock_symbol": "N/A",
            "source_url": "Unknown",
            "source_description": "Unknown"
        })

        # Step 6: Drop duplicates 
        df_transformed = df_transformed.dropDuplicates(["ipo_id"])

        log_to_csv({
            "step": "Clean Data",
            "status": f"SUCCESS ({df_transformed.count()} rows after cleansing)",
            "source": "staging",
            "table_name": "ipos",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 7: Load dim_company and dim_people for validation
        companies = spark.read.format("jdbc") \
            .option("url", DWH_URL) \
            .option("dbtable", "dim_company") \
            .option("user", DWH_USER) \
            .option("password", DWH_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load().select(col("company_id"))

        people = spark.read.format("jdbc") \
            .option("url", DWH_URL) \
            .option("dbtable", "dim_people") \
            .option("user", DWH_USER) \
            .option("password", DWH_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load().select(col("people_id"))

        # Validation on funded_object_id
        df_valid = df_transformed \
            .join(broadcast(companies), df_transformed["ipo_object_id"] == companies["company_id"], "left") \
            .join(broadcast(people), df_transformed["ipo_object_id"] == people["people_id"], "left") \
            .withColumn("ipo_object_id",
                        when(col("company_id").isNotNull(), col("company_id"))
                        .otherwise(col("people_id"))) \
            .drop("company_id", "people_id")

        df_invalid = df_valid.filter(col("company_id").isNull() | col("people_id").isNull() | col("ipo_object_id").isNull())
        df_valid = df_valid.filter(col("company_id").isNotNull() & col("people_id").isNotNull() & col("ipo_object_id").isNotNull())

        if df_invalid.count() > 0:
            invalid_ids = df_invalid.limit(10).toPandas().values.tolist()
            save_invalid_ids(invalid_ids, table_name="ipo")

            log_to_csv({
                "step": "Validation",
                "status": f"{df_invalid.count()} rows with missing object_id",
                "source": "staging",
                "table_name": "ipo",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }, "etl_log.csv")

        return df_valid
        
        print("The data is successfully transformed")
    
    except Exception as e:
        log_to_csv({
            "step": "Transform",
            "status": f"FAILED - {str(e)}",
            "source": "staging",
            "table_name": "ipo",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")
        raise


In [None]:
# Read from staging

ipo = spark.read \
    .format("jdbc") \
    .option("url", DB_STAGING_URL) \
    .option("dbtable", "ipo") \
    .option("user", DB_STAGING_USER) \
    .option("password", DB_STAGING_PASS) \
    .option("driver", "org.postgresql.Driver") \
    .load()


In [None]:
# Transform IPO Data

transformed_ipo = transform_ipo(ipo)

### Funds Data

In [None]:
from pyspark.sql.functions import col, to_date, broadcast, when, lit
from pyspark.sql.types import IntegerType, StringType
from datetime import datetime

def transform_funds(df):
    try:
        log_to_csv({
            "step": "Transform",
            "status": "STARTED",
            "source": "Staging",
            "table_name": "funds",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 1: Replace "" to null
        df = df.na.replace("", None)

        # Step 2: Extract prefix and ID 
        df = df.withColumn("fund_entity_type", extract_prefix(col("object_id")))
        df = df.withColumn("fund_object_id", extract_id(col("object_id")))
        df = df.withColumn("object_id", col("object_id").cast(IntegerType()))
        
        # Step 3: Format data type
        df = df.withColumn("funding_date", to_date(col("funded_at")))

        log_to_csv({
            "step": "Format Data",
            "status": f"SUCCESS ({df.count()} rows)",
            "source": "staging",
            "table_name": "funds",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 4: Mapping to target column
        df_transformed = df.select(
            col("fund_id"),
            col("fund_entity_type"),
            col("fund_object_id").alias("fund_object_id"),
            col("name").alias("fund_name"),
            col("funding_date").alias("funding_date"),
            col("raised_currency_code").alias("raised_currency"),
            col("raised_amount"),
            col("source_url"),
            col("source_description"),
            col("created_at"),
            col("updated_at")
        )

        # Step 5: Handle null values
        df_transformed = df_transformed.fillna({
            "raised_currency": "USD",
            "raised_amount": 0.0,
            "source_url": "Unknown",
            "source_description": "Unknown"
        })

        # Step 6: Drop duplicates dan invalid data
        df_transformed = df_transformed.dropDuplicates(["fund_id"])
        df_transformed = df_transformed.na.drop(subset=["funding_date"])

        df_transformed = df_transformed.withColumn(
            "source_url", when(col("source_url").rlike(r"^(http|https)://.*"), col("source_url")).otherwise("Unknown")
        )
        df_transformed = df_transformed.withColumn(
            "source_description", clean_alpha_text("source_description")
        )

        log_to_csv({
            "step": "Clean Data",
            "status": f"SUCCESS ({df_transformed.count()} rows after cleansing)",
            "source": "staging",
            "table_name": "funds",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")

        # Step 7: Load dim_company and dim_people for validation
        companies = spark.read.format("jdbc") \
            .option("url", DWH_URL) \
            .option("dbtable", "dim_company") \
            .option("user", DWH_USER) \
            .option("password", DWH_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load().select(col("company_id"))

        people = spark.read.format("jdbc") \
            .option("url", DWH_URL) \
            .option("dbtable", "dim_people") \
            .option("user", DWH_USER) \
            .option("password", DWH_PASS) \
            .option("driver", "org.postgresql.Driver") \
            .load().select(col("people_id"))

        # Validation on funded_object_id
        df_valid = df_transformed \
            .join(broadcast(companies), df_transformed["fund_object_id"] == companies["company_id"], "left") \
            .join(broadcast(people), df_transformed["fund_object_id"] == people["people_id"], "left") \
            .withColumn("fund_object_id",
                        when(col("company_id").isNotNull(), col("company_id"))
                        .otherwise(col("people_id"))) \
            .drop("company_id", "people_id")

        df_invalid = df_valid.filter(col("company_id").isNull() | col("people_id").isNull() | col("fund_object_id").isNull())
        df_valid = df_valid.filter(col("company_id").isNotNull() & col("people_id").isNotNull() & col("fund_object_id").isNotNull())

        if df_invalid.count() > 0:
            invalid_ids = df_invalid.limit(10).toPandas().values.tolist()
            save_invalid_ids(invalid_ids, table_name="funds")

            log_to_csv({
                "step": "Validation",
                "status": f"{df_invalid.count()} rows with missing object_id",
                "source": "staging",
                "table_name": "funds",
                "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            }, "etl_log.csv")

        return df_valid
        
        print("The data is successfully transformed")

    except Exception as e:
        log_to_csv({
            "step": "Transform",
            "status": f"FAILED - {str(e)}",
            "source": "staging",
            "table_name": "funds",
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }, "etl_log.csv")
        raise


In [None]:
# Read from staging

funds = spark.read \
    .format("jdbc") \
    .option("url", DB_STAGING_URL) \
    .option("dbtable", "funds") \
    .option("user", DB_STAGING_USER) \
    .option("password", DB_STAGING_PASS) \
    .option("driver", "org.postgresql.Driver") \
    .load()


In [None]:
# Transform funds Data

transform_funds(funds)

## Load to Data Warehouse


In [51]:
def load_to_dwh(df, table_name, mode="overwrite", use_upsert=False, idx_name=None, schema=None, source=None, log_path="etl_log.csv"):
    from pangres import upsert
    from sqlalchemy import create_engine
    import os

    os.makedirs("logs", exist_ok=True)

    try:
        if not DWH_USER or not DWH_PASS:
            raise EnvironmentError("DWH_USER or DWH_PASS is not set")

        if use_upsert:
            data = df.toPandas()
            if idx_name is None:
                raise ValueError("Index name is required for upsert mode")
            data = data.set_index(idx_name)

            conn = create_engine(f"postgresql://{DWH_USER}:{DWH_PASS}@host.docker.internal:5432/pyspark_task_dwh")
            upsert(
                con=conn,
                df=data,
                table_name=table_name,
                schema=schema,
                if_row_exists="update"
            )
            print(f"Data upserted to table '{table_name}' successfully!")
        else:
            df.write \
              .format("jdbc") \
              .option("url", "jdbc:postgresql://host.docker.internal:5432/pyspark_task_dwh") \
              .option("dbtable", table_name) \
              .option("user", DWH_USER) \
              .option("password", DWH_PASS) \
              .option("driver", "org.postgresql.Driver") \
              .mode(mode) \
              .save()
            print(f"Data loaded to table '{table_name}' successfully!")

        log_msg = {
            "step": "Load to DWH",
            "status": "Success",
            "source": "transformed data",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }

    except Exception as e:
        print(f"Error loading data to table '{table_name}': {e}")
        failed_data = data if use_upsert else df.toPandas()
        failed_data['error_message'] = str(e)
        failed_data['etl_date'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        log_msg = {
            "step": "Load to DWH",
            "status": "Failed",
            "source": "transformed data",
            "table_name": table_name,
            "etl_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "error_message": str(e)
        }

        failed_log_path = f'logs/failed_{table_name}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
        failed_data.to_csv(failed_log_path, index=False)
        print(f"Failed data saved to: {failed_log_path}")

    finally:
        log_msg.pop("error_message", None)
        log_to_csv(log_msg, log_path)

    return df if not use_upsert else data


In [55]:
# dim_company

load_to_dwh(transformed_company, table_name="dim_company", use_upsert=True, idx_name="company_id", schema="public")

NameError: name 'transformed_company' is not defined

In [56]:
# dim_people

load_to_dwh(transformed_people, table_name="dim_people", use_upsert=True, idx_name="people_id", schema="public")

Data upserted to table 'dim_people_test' successfully!
Log written to /home/jovyan/work/logs/etl_log.csv


Unnamed: 0_level_0,object_id,entity_type,full_name,birthplace,affiliation_name
people_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
3,4,p,Raju Vegesna,Unknown,Zoho
14,16,p,Alex Welch,Unknown,C7 Group
21,23,p,Scott Penberthy,Unknown,Photobucket
22,24,p,Alice Lankester,"Surrey, England",Apple
68,78,p,Rajiv Dutta,Unknown,eBay
...,...,...,...,...,...
221830,262990,p,Barry Gavin,Unknown,Unaffiliated
221833,262993,p,Rostislav Raykov,Unknown,Unaffiliated
221887,263050,p,Sandeep Gupta,Unknown,Unaffiliated
222040,263212,p,Thomas A Kennedy,Unknown,Unaffiliated


In [None]:
# fact_acquisition

load_to_dwh(transformed_acquisition, table_name="fact_acquisition", use_upsert=True, idx_name="acquisition_id", schema="public")

In [None]:
# fact_investments

load_to_dwh(transformed_investments, table_name="fact_investments", use_upsert=True, idx_name="investment_id", schema="public")

In [None]:
# dim_funding_rounds

load_to_dwh(transformed_funding_rounds, table_name="dim_funding_rounds", use_upsert=True, idx_name="funding_round_id", schema="public")

In [None]:
# fact_relationship

load_to_dwh(transformed_relationship, table_name="fact_relationship", use_upsert=True, idx_name="relationship_id", schema="public")

In [None]:
# fact_ipo

load_to_dwh(transformed_ipo, table_name="fact_ipo", use_upsert=True, idx_name="ipo_id", schema="public")

In [None]:
# dim_funds

load_to_dwh(transformed_funds, table_name="dim_funds", use_upsert=True, idx_name="fund_id", schema="public")