In [None]:
# Databricks notebook source
# MAGIC %md
# MAGIC # ETL Process for Football Data
# MAGIC This notebook performs an ETL process on football data using PySpark and integrates data from a PostgreSQL database.

# COMMAND ----------

import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import psycopg2
from psycopg2 import sql

# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# COMMAND ----------

# Function to load data from Unity Catalog
def load_data_from_unity_catalog(table_name):
    try:
        logger.info(f"Loading data from Unity Catalog table: {table_name}")
        df = spark.table(table_name)
        return df
    except Exception as e:
        logger.error(f"Error loading data from Unity Catalog table {table_name}: {e}")
        raise

# COMMAND ----------

# Function to connect to an external PostgreSQL database and fetch data
def fetch_data_from_postgresql(query, db_name, user, password, host, port):
    try:
        logger.info(f"Connecting to PostgreSQL database: {db_name}")
        conn = psycopg2.connect(dbname=db_name, user=user, password=password, host=host, port=port)
        cur = conn.cursor()
        cur.execute(query)
        data = cur.fetchall()
        cur.close()
        conn.close()
        return data
    except Exception as e:
        logger.error(f"Error fetching data from PostgreSQL: {e}")
        raise

# COMMAND ----------

# Load data from Unity Catalog
source_df = load_data_from_unity_catalog("catalog.source_db.football_data")

# COMMAND ----------

# Data Cleaning: Handle missing values and remove duplicates
try:
    logger.info("Cleaning data: handling missing values and removing duplicates")
    cleaned_df = source_df.dropna().dropDuplicates()
except Exception as e:
    logger.error(f"Error during data cleaning: {e}")
    raise

# COMMAND ----------

# Data Transformation: Apply necessary transformations
try:
    logger.info("Applying data transformations")
    transformed_df = cleaned_df.withColumn("goals", col("goals").cast("integer"))
except Exception as e:
    logger.error(f"Error during data transformation: {e}")
    raise

# COMMAND ----------

# Data Integration: Join with another dataset (assuming another_df is available)
try:
    logger.info("Joining datasets for data integration")
    other_df = load_data_from_unity_catalog("catalog.source_db.other_football_data")
    final_df = transformed_df.join(other_df, transformed_df["team_id"] == other_df["team_id"])
except Exception as e:
    logger.error(f"Error during data integration: {e}")
    raise

# COMMAND ----------

# Custom Calculations: Calculate performance score
try:
    logger.info("Calculating custom performance score")
    final_df = final_df.withColumn("performance_score", col("goals") * 0.5 + col("assists") * 0.3)
except Exception as e:
    logger.error(f"Error during custom calculations: {e}")
    raise

# COMMAND ----------

# Output: Save the transformed data to Unity Catalog
try:
    logger.info("Saving transformed data to Unity Catalog")
    final_df.write.format("delta").mode("overwrite").saveAsTable("catalog.target_db.football_analysis")
except Exception as e:
    logger.error(f"Error saving data to Unity Catalog: {e}")
    raise

# COMMAND ----------

logger.info("ETL process completed successfully")
