In [1]:
# Welcome to your new notebook
# Type here in the cell editor to add code!
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
    col, lit, when, current_timestamp, current_date, row_number, max as spark_max,
    monotonically_increasing_id, concat, lpad, coalesce, hash, abs as spark_abs,
    to_date, year, month, dayofmonth, dayofweek, weekofyear, quarter,
    date_format, datediff, last_day, next_day, add_months, expr
)
from pyspark.sql.types import StringType, IntegerType, DateType, TimestampType, BooleanType
from pyspark.sql.window import Window
from delta.tables import DeltaTable
import hashlib

# Gold layer configuration
GOLD_BASE_PATH = "abfss://LinkedIN@onelake.dfs.fabric.microsoft.com/LinkedIN.Lakehouse/Tables/gold/"
SILVER_BASE_PATH = "abfss://LinkedIN@onelake.dfs.fabric.microsoft.com/LinkedIN.Lakehouse/Tables/silver/"

class GoldLayerProcessor:
    def __init__(self, spark: SparkSession):
        self.spark = spark
        
    def generate_surrogate_key(self, df: DataFrame, key_prefix: str) -> DataFrame:
        """Generate surrogate keys using row number"""
        return df.withColumn(
            f"{key_prefix}_sk",
            row_number().over(Window.orderBy(monotonically_increasing_id()))
        )
    
    def add_scd2_columns(self, df: DataFrame) -> DataFrame:
        """Add SCD2 columns to dimension tables"""
        return df.withColumn("effective_start_date", current_date()) \
                 .withColumn("effective_end_date", lit(None).cast(DateType())) \
                 .withColumn("is_current", lit(True).cast(BooleanType())) 
    
    def create_dim_date(self, start_date: str = "2010-01-01", end_date: str = "2026-12-31") -> DataFrame:
        """Create dimension date table - No SCD2 processing"""
        
        # Generate date range
        date_df = self.spark.sql(f"""
            SELECT explode(sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day)) as date_value
        """)
        
        # Add date attributes
        dim_date = date_df.withColumn("date_sk", concat(lit("DATE"), lpad(row_number().over(Window.orderBy("date_value")).cast(StringType()), 10, "0"))) \
                         .withColumn("year", year("date_value")) \
                         .withColumn("month", month("date_value")) \
                         .withColumn("day", dayofmonth("date_value")) \
                         .withColumn("quarter", quarter("date_value")) \
                         .withColumn("week_of_year", weekofyear("date_value")) \
                         .withColumn("day_of_week", dayofweek("date_value")) \
                         .withColumn("month_name", date_format("date_value", "MMMM")) \
                         .withColumn("day_name", date_format("date_value", "EEEE")) \
                         .withColumn("is_weekend", when(col("day_of_week").isin([1, 7]), True).otherwise(False)) \
                         .withColumn("is_month_end", col("date_value") == last_day("date_value")) \
                         .withColumn("is_quarter_end", col("month").isin([3, 6, 9, 12]) & col("is_month_end")) \
                         .withColumn("is_year_end", (col("month") == 12) & col("is_month_end")) \
                         .withColumn("fiscal_year", when(col("month") >= 4, col("year") + 1).otherwise(col("year"))) \
                         .withColumn("fiscal_quarter", when(col("month").isin([4, 5, 6]), 1)
                                                    .when(col("month").isin([7, 8, 9]), 2)
                                                    .when(col("month").isin([10, 11, 12]), 3)
                                                    .otherwise(4)) \
                         .withColumn("created_timestamp", current_timestamp()) \
                         .withColumn("updated_timestamp", current_timestamp())\
                         .withColumn("date_sk",row_number().over(Window.orderBy(monotonically_increasing_id())))
        
        return dim_date
    
    def create_dim_user(self) -> DataFrame:
        """Create dimension user table by combining linkedin_users and user_identity"""
        
        # Load source tables
        linkedin_users = self.spark.read.format("delta").load(f"{SILVER_BASE_PATH}linkedin_users")
        user_identity = self.spark.read.format("delta").load(f"{SILVER_BASE_PATH}user_identity")
        
        # Join the tables
        dim_user = linkedin_users.alias("lu").join(
            user_identity.alias("ui"),
            col("lu.UserID") == col("ui.UserId"),
            "left"
        ).select(
            col("lu.UserID").alias("user_id"),
            col("lu.FullName").alias("full_name"),
            col("lu.Gender").alias("gender"),
            col("lu.Age").alias("age"),
            col("lu.Age_Band").alias("age_band"),
            col("lu.Country").alias("country"),
            col("lu.State_Code").alias("state_code"),
            col("lu.State_Name").alias("state_name"),
            col("lu.City").alias("city"),
            col("lu.Industry").alias("industry"),
            col("lu.JobTitle").alias("job_title"),
            col("lu.Skills").alias("skills"),
            col("lu.Skills_Count").alias("skills_count"),
            col("lu.Education_Level_Standardized").alias("education_level"),
            col("lu.Connections").alias("connections"),
            col("lu.Followers").alias("followers"),
            col("lu.Connection_Tier").alias("connection_tier"),
            col("lu.Follower_Tier").alias("follower_tier"),
            col("lu.Follower_Connection_Ratio").alias("follower_connection_ratio"),
            col("lu.Specialization").alias("specialization"),
            col("lu.Join_Date").alias("join_date"),
            col("ui.email").alias("email"),
            col("ui.phno").alias("phone_number")
        )
        
        # Generate surrogate key and add SCD2 columns
        dim_user = self.generate_surrogate_key(dim_user, "user")
        return self.add_scd2_columns(dim_user)
    
    def create_dim_post(self) -> DataFrame:
        """Create dimension post table by combining posts and post_performance"""
        
        # Load source tables
        posts = self.spark.read.format("delta").load(f"{SILVER_BASE_PATH}post")
        post_performance = self.spark.read.format("delta").load(f"{SILVER_BASE_PATH}post_performance")
        
        # Join the tables
        dim_post = posts.alias("p").join(
            post_performance.alias("pp"),
            col("p.PostID") == col("pp.PostID"),
            "left"
        ).select(
            col("p.PostID").alias("post_id"),
            col("p.UserID").alias("user_id"),
            col("p.Post_Type_Standardized").alias("post_type"),
            col("p.DatePosted").alias("date_posted"),
            col("p.PostLength").alias("post_length"),
            col("p.Post_Length_Category").alias("post_length_category"),
            col("p.Attachment_Type_Standardized").alias("attachment_type"),
            col("p.Is_Viral").alias("is_viral"),
            col("p.High_Engagement").alias("high_engagement"),
            col("p.EngagementScore").alias("engagement_score"),
            col("pp.PeakEngagementHour").alias("peak_engagement_hour"),
            col("pp.Performance_Tier").alias("performance_tier"),
            col("pp.Is_Viral_Content").alias("is_viral_content"),
            col("pp.Impressions").alias("impressions"),
            col("pp.Reactions").alias("reactions"),
            col("pp.Comments").alias("comments"),
            col("pp.Shares").alias("shares"),
            col("pp.Reach").alias("reach"),
            col("pp.Clicks").alias("clicks")
            
        )
        
        # Generate surrogate key and add SCD2 columns
        dim_post = self.generate_surrogate_key(dim_post, "post")
        return self.add_scd2_columns(dim_post)
    
    def create_dim_connection_growth(self) -> DataFrame:
        """Create dimension connection growth table"""
        
        connection_growth = self.spark.read.format("delta").load(f"{SILVER_BASE_PATH}connection_growth")
        
        dim_connection_growth = connection_growth.select(
            col("RecordID").alias("conn_id"),
            col("UserID").alias("user_id"),
            col("Date").alias("record_date"),
            col("ConnectionsCount").alias("connections_count"),
            col("NewConnections").alias("new_connections"),
            col("ConnectionGrowthRate").alias("connectiongrowth_rate"),
            col("InvitesSent").alias("invites_sent"),
            col("InvitesAccepted").alias("invites_accepted"),
            col("ProfileViews").alias("profile_views"),
            col("Growth_Category").alias("growth_category"),
            col("Activity_Level").alias("activity_level"),
            col("Network_Expanding").alias("network_expanding"),
        )
        
        # Generate surrogate key and add SCD2 columns
        dim_connection_growth = self.generate_surrogate_key(dim_connection_growth, "conn")
        return self.add_scd2_columns(dim_connection_growth)
    
    def create_dim_job_application(self) -> DataFrame:
        """Create dimension job application table"""
        
        job_applications = self.spark.read.format("delta").load(f"{SILVER_BASE_PATH}job_applications")
        
        dim_job_application = job_applications.select(
            col("ApplicationID").alias("application_id"),
            col("UserID").alias("user_id"),
            col("Company_Name").alias("company_name"),
            col("JobTitle").alias("job_title"),
            col("Industry").alias("industry"),
            col("ApplicationDate").alias("application_date"),
            col("Status_Standardized").alias("status"),
            col("ExperienceRequired").alias("experience_required"),
            col("Experience_Category").alias("experience_category"),
            col("SkillsRequired").alias("skills_required"),
            col("ReferralUsed").alias("referral_used"),
            col("Is_Successful").alias("is_successful"),
            col("Is_In_Progress").alias("is_in_progress"),
            col("Days_Since_Application").alias("days_since_application"),
            col("Country").alias("country"),
            col("State_Code").alias("state_code"),
            col("State_Name").alias("state_name"),
            col("City").alias("city")
        )
        
        # Generate surrogate key and add SCD2 columns
        dim_job_application = self.generate_surrogate_key(dim_job_application, "job")
        return self.add_scd2_columns(dim_job_application)
    
    def create_dim_user_activity(self) -> DataFrame:
        """Create dimension user activity table"""
        
        user_activity = self.spark.read.format("delta").load(f"{SILVER_BASE_PATH}user_activity")
        
        dim_user_activity = user_activity.select(
            col("RecordID").alias("record_id"),
            col("UserID").alias("user_id"),
            col("Date").alias("activity_date"),
            col("Logins").alias("logins"),
            col("MessagesSent").alias("messages_sent"),
            col("MessagesReceived").alias("messages_received"),
            col("JobsViewed").alias("jobs_viewed"),
            col("ProfilesViewed").alias("profiles_viewed"),
            col("PostsCreated").alias("posts_created"),
            col("Total_Activity_Score").alias("total_activity_score"),
            col("Activity_Level").alias("activity_level"),
            col("Is_Job_Seeker").alias("is_job_seeker"),
            col("Is_Social_Networker").alias("is_social_networker"),
        )
        
        # Generate surrogate key and add SCD2 columns
        dim_user_activity = self.generate_surrogate_key(dim_user_activity, "act")
        return self.add_scd2_columns(dim_user_activity)
    
    def create_dim_company_affiliation(self) -> DataFrame:
        """Create dimension company affiliation table"""
        
        company_affiliation = self.spark.read.format("delta").load(f"{SILVER_BASE_PATH}company_affiliation")
        
        dim_company_affiliation = company_affiliation.select(
            col("AffiliationID").alias("affiliation_id"),
            col("UserID").alias("user_id"),
            col("CompanyName").alias("company_name"),
            col("Industry").alias("industry"),
            col("JobRole").alias("job_role"),
            col("Department").alias("department"),
            col("StartDate").alias("start_date"),
            col("EndDate").alias("end_date"),
            col("Employment_Type_Standardized").alias("employment_type"),
            col("SalaryRange").alias("salary_range"),
            col("Salary_Category").alias("salary_category"),
            col("Is_Current_Employment").alias("is_current_employment"),
            col("Employment_Duration_Days").alias("employment_duration_days"),
            col("Employment_Duration_Years").alias("employment_duration_years"),
            col("Tenure_Category").alias("tenure_category"),
            col("Country").alias("country"),
            col("State_Code").alias("state_code"),
            col("State_Name").alias("state_name"),
            col("City").alias("city")
        )
        
        # Generate surrogate key and add SCD2 columns
        dim_company_affiliation = self.generate_surrogate_key(dim_company_affiliation, "comp")
        return self.add_scd2_columns(dim_company_affiliation)
    
    def fact_linkedin(self) -> DataFrame:
        """Create unified fact table with dimension surrogate keys and date_sk fields"""

        # Load dim_date
        dim_date = self.spark.read.format("delta").load(f"{GOLD_BASE_PATH}dim_date")

        # Load active records from all dimension tables (is_current = true)
        dim_user_activity = self.spark.read.format("delta").load(f"{GOLD_BASE_PATH}dim_user_activity").filter("is_current = true")
        dim_post = self.spark.read.format("delta").load(f"{GOLD_BASE_PATH}dim_post").filter("is_current = true")
        dim_job_application = self.spark.read.format("delta").load(f"{GOLD_BASE_PATH}dim_job_application").filter("is_current = true")
        dim_company_affiliation = self.spark.read.format("delta").load(f"{GOLD_BASE_PATH}dim_company_affiliation").filter("is_current = true")
        dim_connection_growth = self.spark.read.format("delta").load(f"{GOLD_BASE_PATH}dim_connection_growth").filter("is_current = true")
        dim_user = self.spark.read.format("delta").load(f"{GOLD_BASE_PATH}dim_user").filter("is_current = true")

        # Helper function to add date_sk from dim_date
        def attach_date_sk(df: DataFrame, date_col: str, sk_col: str) -> DataFrame:
            return df.alias("df").join(
                dim_date.alias("dd"),
                col(f"df.{date_col}") == col("dd.date_value"),
                "left"
            ).drop(date_col).withColumnRenamed("date_sk", sk_col)

        # Attach date_sk columns
        dim_user_activity = attach_date_sk(dim_user_activity, "activity_date", "activity_date_sk")
        dim_post = attach_date_sk(dim_post, "date_posted", "date_posted_sk")
        dim_job_application = attach_date_sk(dim_job_application, "application_date", "application_date_sk")
        dim_company_affiliation = attach_date_sk(dim_company_affiliation, "start_date", "start_date_sk")
        dim_company_affiliation = attach_date_sk(dim_company_affiliation, "end_date", "end_date_sk")
        dim_connection_growth = attach_date_sk(dim_connection_growth, "record_date", "record_date_sk")
        dim_user = attach_date_sk(dim_user,"join_date","join_date_sk")

        # Build the fact table
        fact_df = dim_user_activity.select(
            col("user_id"),
            col("act_sk"),
            col("activity_date_sk"),
            "record_id", "logins", "messages_sent", "messages_received",
            "jobs_viewed", "profiles_viewed", "posts_created", "total_activity_score"
        ).join(
            dim_post.select(
                "user_id",
                col("post_sk"),
                "post_id", "engagement_score", "date_posted_sk",
                "impressions", "reactions", "comments", "shares", "clicks", "reach"
            ),
            on="user_id", how="left"
        ).join(
            dim_job_application.select(
                "user_id",
                col("job_sk"),
                "application_id", "application_date_sk","experience_required","is_successful"
            ),
            on="user_id", how="left"
        ).join(
            dim_company_affiliation.select(
                "user_id",
                col("comp_sk"),
                "start_date_sk", "end_date_sk",
                "employment_type", "tenure_category", "industry"
            ),
            on="user_id", how="left"
        ).join(
            dim_connection_growth.select(
                "user_id",
                col("conn_sk"),
                "conn_id", "new_connections", "connectiongrowth_rate",
                "invites_sent", "invites_accepted", "record_date_sk",
                "connections_count", "profile_views"
            ),
            on="user_id", how="left"
        ).join(
            dim_user.select(
                "user_id", "connections","followers","join_date_sk"
            ),
            on="user_id", how="left"
        )

        return fact_df
    
    
    def implement_scd2_merge(self, dimension_name: str, new_df: DataFrame, business_keys: list):
        """Implement SCD2 merge for dimension tables"""
        
        target_path = f"{GOLD_BASE_PATH}dim_{dimension_name}"
        
       # Create hash for change detection
        hash_cols = [col for col in new_df.columns if col not in ['effective_start_date', 'effective_end_date', 'is_current'] + business_keys]

        new_df = new_df.withColumn(
            "record_hash",
            spark_abs(
                hash(
                    concat(
                        *[coalesce(col(c).cast(StringType()), lit("")) for c in hash_cols]
                    )
                )
            )
        )

        
        if DeltaTable.isDeltaTable(self.spark, target_path):
            target_table = DeltaTable.forPath(self.spark, target_path)
            
            # Get existing records
            existing_df = target_table.toDF().filter(col("is_current") == True)
            
            # Find changed records
            join_condition = " AND ".join([f"existing.{key} = new.{key}" for key in business_keys])
            
            changed_records = new_df.alias("new").join(
                existing_df.alias("existing"),
                expr(join_condition),
                "inner"
            ).filter(col("existing.record_hash") != col("new.record_hash"))
            
            # Close existing records
            if not changed_records.rdd.isEmpty():
                for row in changed_records.collect():
                    business_key_conditions = " AND ".join([f"{key} = '{row[f'new.{key}']}'" for key in business_keys])
                    
                    target_table.update(
                        condition = f"is_current = true AND {business_key_conditions}",
                        set = {
                            "end_date": "current_date()",
                            "is_current": "false",
                        }
                    )
            
            # Insert new and changed records
            target_table.alias("target").merge(
                new_df.alias("source"),
                join_condition
            ).whenNotMatchedInsertAll().execute()
            
        else:
            # First load - write directly
            new_df.write.format("delta").mode("overwrite").save(target_path)
    
    def process_gold_layer(self):
        """Main function to process all gold layer tables"""
        
        print("🚀 Starting Gold Layer Processing...")
        
        # Create dimension tables
        print("📊 Creating Dimension Tables...")
        
        # # 1. Dim Date - No SCD2 processing
        # print("  📅 Creating dim_date...")
        # dim_date = self.create_dim_date()
        # dim_date.write.format("delta").mode("overwrite").save(f"{GOLD_BASE_PATH}dim_date")
        
        # # 2. Dim User
        # print("  👤 Creating dim_user...")
        # dim_user = self.create_dim_user()
        # self.implement_scd2_merge("user", dim_user, ["user_id"])
        
        # # 3. Dim Post
        # print("  📝 Creating dim_post...")
        # dim_post = self.create_dim_post()
        # self.implement_scd2_merge("post", dim_post, ["post_id"])
        
        # # 4. Dim Connection Growth
        # print("  🔗 Creating dim_connection_growth...")
        # dim_connection_growth = self.create_dim_connection_growth()
        # self.implement_scd2_merge("connection_growth", dim_connection_growth, ["record_id"])
        
        # # 5. Dim Job Application
        # print("  💼 Creating dim_job_application...")
        # dim_job_application = self.create_dim_job_application()
        # self.implement_scd2_merge("job_application", dim_job_application, ["application_id"])
        
        # # 6. Dim User Activity
        # print("  📈 Creating dim_user_activity...")
        # dim_user_activity = self.create_dim_user_activity()
        # self.implement_scd2_merge("user_activity", dim_user_activity, ["record_id"])
        
        # # 7. Dim Company Affiliation
        # print("  🏢 Creating dim_company_affiliation...")
        # dim_company_affiliation = self.create_dim_company_affiliation()
        # self.implement_scd2_merge("company_affiliation", dim_company_affiliation, ["affiliation_id"])
        
        # # # Create fact table
        print("📊 Creating Fact Table...")
        fact_linkedin = self.fact_linkedin()
        
        # Write fact table (overwrite mode for facts)
        fact_linkedin.write.format("delta").mode("overwrite").save(f"{GOLD_BASE_PATH}fact_linkedin_analytics")
        
        print("✅ Gold Layer Processing Completed Successfully!")
        
        # Print summary
        print("\n📋 Summary:")
        print("  📊 Dimension Tables Created: 7")
        print("    - dim_date")
        print("    - dim_user")
        print("    - dim_post")
        print("    - dim_connection_growth")
        print("    - dim_job_application")
        print("    - dim_user_activity")
        print("    - dim_company_affiliation")
        print("  📈 Fact Tables Created: 1")
        print("    - fact_linkedin_analytics")

spark = SparkSession.builder \
        .appName("LinkedIn Gold Layer Processing") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()
    
    # Initialize Gold Layer Processor
gold_processor = GoldLayerProcessor(spark)
    
    # Process Gold Layer
gold_processor.process_gold_layer()
    
spark.stop()

StatementMeta(, f9a77fb2-81f2-43ea-b62c-1e386a34f599, 3, Finished, Available, Finished)

🚀 Starting Gold Layer Processing...
📊 Creating Dimension Tables...
📊 Creating Fact Table...
✅ Gold Layer Processing Completed Successfully!

📋 Summary:
  📊 Dimension Tables Created: 7
    - dim_date
    - dim_user
    - dim_post
    - dim_connection_growth
    - dim_job_application
    - dim_user_activity
    - dim_company_affiliation
  📈 Fact Tables Created: 1
    - fact_linkedin_analytics


## **DAX**

In [2]:
'''CTR - click through Rate
Engagement Rate
Performance tier --> based on engagement rate
InviteAcceptance Rate
message_response_rate
'''

StatementMeta(, f9a77fb2-81f2-43ea-b62c-1e386a34f599, 4, Finished, Available, Finished)

'CTR - click through Rate\nEngagement Rate\nPerformance tier --> based on engagement rate\nInviteAcceptance Rate\nmessage_response_rate\n'

In [None]:
import base64
import pandas as pd
from IPython.display import HTML
from synapse.ml import *
from synapse.ml.fabric import *

# Step 1: Get all tables in the Gold schema dynamically
gold_tables = [f"gold.{t.name}" for t in spark.catalog.listTables("gold")]

# Step 2: Create Excel file from all Gold tables
local_excel_path = "/tmp/gold_data.xlsx"

with pd.ExcelWriter(local_excel_path, engine='openpyxl') as writer:
    for table in gold_tables:
        try:
            df = spark.read.table(table)
            pdf = df.limit(1000).toPandas()
            sheet_name = table.replace("gold.", "")[:31]  # Excel sheet names max 31 chars
            pdf.to_excel(writer, sheet_name=sheet_name, index=False)
        except Exception as e:
            print(f"⚠️ Skipping table {table} due to error: {e}")

# Step 3: Encode the Excel file to Base64 for download
with open(local_excel_path, "rb") as f:
    data = f.read()
    b64 = base64.b64encode(data).decode()

# Step 4: Create download link
file_name = "gold_data.xlsx"
href = f'<a download="{file_name}" href="data:application/vnd.openxmlformats-officedocument.spreadsheetml.sheet;base64,{b64}">📥 Click here to download gold_data.xlsx</a>'

display(HTML(href))


StatementMeta(, c575211c-86bc-436d-bee2-18698c6cc10d, 4, Finished, Available, Finished)