In [26]:
# ---------------------------------------
# PART 1: Setup & Test Reading a CSV File
# ---------------------------------------

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

spark = SparkSession.builder.getOrCreate()

print("Spark session started")


StatementMeta(, 50a483ea-f96a-48fe-8ae3-05ae328d0975, 28, Finished, Available, Finished)

Spark session started


In [27]:
# ======================================================
# PART 2 — Student folders + Safe CSV loader + Record template
# ======================================================

from pyspark.sql.utils import AnalysisException

# ------------------------------------------------------
# 1. List of student folders (based on your Lakehouse)
# ------------------------------------------------------
students = [
    "Ahmed",
    "Aya",
    "Manar",
    "Mennatullah",
    "Mohamed Elbasuoney"
]

print("Students detected:", students)

# ------------------------------------------------------
# 2. Safe CSV Loader Function
# ------------------------------------------------------
def load_csv_safe(path):
    """
    Safely loads a CSV file.
    Returns None if file does not exist or fails to load.
    """
    try:
        df = spark.read.csv(path, header=True, inferSchema=True)
        if df.count() == 0:
            return None
        return df
    except Exception:
        return None

# ------------------------------------------------------
# 3. Initialize student record dictionary
# (Matches exactly the final table schema)
# ------------------------------------------------------
student_records = []

for student in students:

    record = {
        "StudentFolder": student,
        "FullName": None,
        "Email": None,
        "Headline": None,
        "Summary": None,
        "Skills": None,
        "Certifications": None,
        "Education": None,
        "ConnectionsCount": None,
        "InvitationsCount": None,
        "ProfilePhotoUrl": None,
        "BackgroundPhotoUrl": None,
        "HeaderPhotoUrl": None
    }

    student_records.append(record)

student_records


StatementMeta(, 50a483ea-f96a-48fe-8ae3-05ae328d0975, 29, Finished, Available, Finished)

Students detected: ['Ahmed', 'Aya', 'Manar', 'Mennatullah', 'Mohamed Elbasuoney']


[{'StudentFolder': 'Ahmed',
  'FullName': None,
  'Email': None,
  'Headline': None,
  'Summary': None,
  'Skills': None,
  'Certifications': None,
  'Education': None,
  'ConnectionsCount': None,
  'InvitationsCount': None,
  'ProfilePhotoUrl': None,
  'BackgroundPhotoUrl': None,
  'HeaderPhotoUrl': None},
 {'StudentFolder': 'Aya',
  'FullName': None,
  'Email': None,
  'Headline': None,
  'Summary': None,
  'Skills': None,
  'Certifications': None,
  'Education': None,
  'ConnectionsCount': None,
  'InvitationsCount': None,
  'ProfilePhotoUrl': None,
  'BackgroundPhotoUrl': None,
  'HeaderPhotoUrl': None},
 {'StudentFolder': 'Manar',
  'FullName': None,
  'Email': None,
  'Headline': None,
  'Summary': None,
  'Skills': None,
  'Certifications': None,
  'Education': None,
  'ConnectionsCount': None,
  'InvitationsCount': None,
  'ProfilePhotoUrl': None,
  'BackgroundPhotoUrl': None,
  'HeaderPhotoUrl': None},
 {'StudentFolder': 'Mennatullah',
  'FullName': None,
  'Email': None,
  'H

In [28]:
# ======================================================
# PART 3 — Extract Profile.csv fields (FullName, Headline, Summary)
# ======================================================

for record in student_records:

    student = record["StudentFolder"]
    path_profile = f"Files/profiles/{student}/Profile.csv"

    print(f"\nProcessing Profile.csv for: {student}")

    df_profile = load_csv_safe(path_profile)

    if df_profile is None:
        print("  -> Profile.csv NOT FOUND")
        continue

    # Extract column names
    cols = df_profile.columns
    row = df_profile.first()

    # ------------------------------------------------------
    # FULL NAME
    # ------------------------------------------------------
    if "First Name" in cols and "Last Name" in cols:
        record["FullName"] = f"{row['First Name']} {row['Last Name']}"
    elif "Name" in cols:
        record["FullName"] = row["Name"]

    # ------------------------------------------------------
    # HEADLINE
    # ------------------------------------------------------
    if "Headline" in cols:
        record["Headline"] = row["Headline"]

    # ------------------------------------------------------
    # SUMMARY
    # ------------------------------------------------------
    if "Summary" in cols:
        record["Summary"] = row["Summary"]

    print("  -> FullName:", record["FullName"])
    print("  -> Headline:", record["Headline"])
    print("  -> Summary OK")


StatementMeta(, 50a483ea-f96a-48fe-8ae3-05ae328d0975, 30, Finished, Available, Finished)


Processing Profile.csv for: Ahmed
  -> FullName: Ahmed Salah
  -> Headline: Microsoft Certified: Power BI Data Analyst Associate (PL-300) | Power BI Developer | Junior Data Analyst | SQL | Excel | Python
  -> Summary OK

Processing Profile.csv for: Aya
  -> FullName: Aya Fathy 
  -> Headline: Power BI Developer Trainee @ITI l Junior Data Analyst l Edu Mentor @iSchool l ExTeaching Assistant @ Pharos University
  -> Summary OK

Processing Profile.csv for: Manar
  -> FullName: Manar Elmslmany
  -> Headline: Power BI Developer Trainee @ ITI | Junior Data Analyst
  -> Summary OK

Processing Profile.csv for: Mennatullah
  -> FullName: Menna Hamed
  -> Headline: Computer and information technology graduated| Power BI Developer Trainee at ITI| Data Analysis & Freelancing Graduate – NTI | Digital Egypt Pioneers Trainee in power bi developer track
  -> Summary OK

Processing Profile.csv for: Mohamed Elbasuoney
  -> FullName: Mohamed Elbasuoney
  -> Headline: Statistical Analyst @ GOTHI | Youth 

In [29]:
# ======================================================
# PART 4 — Extract Skills, Certifications, Education
# ======================================================

for record in student_records:

    student = record["StudentFolder"]
    base = f"Files/profiles/{student}"

    print(f"\nExtracting data for: {student}")

    # ======================================================
    # 1. SKILLS.csv
    # ======================================================
    df_skills = load_csv_safe(f"{base}/Skills.csv")

    if df_skills is not None:
        skill_cols = df_skills.columns
        
        # Find skill-like column
        skill_col = None
        for c in skill_cols:
            if "skill" in c.lower() or "name" in c.lower():
                skill_col = c
                break

        if skill_col:
            skills_list = (
                df_skills.select(skill_col)
                         .dropna()
                         .rdd.flatMap(lambda x: x)
                         .collect()
            )
            if skills_list:
                record["Skills"] = ", ".join([str(s) for s in skills_list])

        print("  -> Skills extracted")

    # ======================================================
    # 2. CERTIFICATIONS.csv
    # ======================================================
    df_cert = load_csv_safe(f"{base}/Certifications.csv")

    if df_cert is not None:
        cert_cols = df_cert.columns
        
        # Find certification-like column
        cert_col = None
        for c in cert_cols:
            if "cert" in c.lower() or "name" in c.lower():
                cert_col = c
                break

        if cert_col:
            cert_list = (
                df_cert.select(cert_col)
                       .dropna()
                       .rdd.flatMap(lambda x: x)
                       .collect()
            )
            if cert_list:
                record["Certifications"] = " | ".join([str(c) for c in cert_list])

        print("  -> Certifications extracted")

    # ======================================================
    # 3. EDUCATION.csv
    # ======================================================
    df_edu = load_csv_safe(f"{base}/Education.csv")

    if df_edu is not None:
        edu_cols = df_edu.columns
        
        # Columns vary in LinkedIn — detect dynamically
        school_col = next((c for c in edu_cols if "school" in c.lower()), None)
        degree_col = next((c for c in edu_cols if "degree" in c.lower()), None)
        field_col  = next((c for c in edu_cols if "field" in c.lower() or "study" in c.lower()), None)

        edu_entries = []

        for row in df_edu.collect():
            parts = []

            if school_col and row[school_col] not in (None, "nan"):
                parts.append(str(row[school_col]))

            if degree_col and row[degree_col] not in (None, "nan"):
                parts.append(str(row[degree_col]))

            if field_col and row[field_col] not in (None, "nan"):
                parts.append(str(row[field_col]))

            if parts:
                edu_entries.append(" - ".join(parts))

        if edu_entries:
            record["Education"] = " | ".join(edu_entries)

        print("  -> Education extracted")


StatementMeta(, 50a483ea-f96a-48fe-8ae3-05ae328d0975, 31, Finished, Available, Finished)


Extracting data for: Ahmed
  -> Skills extracted
  -> Certifications extracted
  -> Education extracted

Extracting data for: Aya
  -> Skills extracted
  -> Certifications extracted
  -> Education extracted

Extracting data for: Manar
  -> Skills extracted
  -> Education extracted

Extracting data for: Mennatullah
  -> Skills extracted
  -> Education extracted

Extracting data for: Mohamed Elbasuoney
  -> Skills extracted
  -> Certifications extracted
  -> Education extracted


In [30]:
# ======================================================
# PART 5 — Email + Rich Media + Counts + Save Final Table
# ======================================================

for record in student_records:

    student = record["StudentFolder"]
    base = f"Files/profiles/{student}"

    print(f"\nExtracting email and media for: {student}")

    # ======================================================
    # 1. EMAIL (Fixed Logic)
    # ======================================================
    df_email = load_csv_safe(f"{base}/Email Addresses.csv")

    if df_email is not None and "Email Address" in df_email.columns:

        # Normalize column names
        df_email = df_email.withColumnRenamed("Email Address", "EmailAddress") \
                           .withColumnRenamed("Primary", "PrimaryFlag")

        # Prefer Primary = "Yes"
        primary_df = df_email.filter(F.col("PrimaryFlag") == "Yes")

        if primary_df.count() > 0:
            record["Email"] = primary_df.first()["EmailAddress"]
        else:
            # fallback: first email
            record["Email"] = df_email.first()["EmailAddress"]

    # ======================================================
    # 2. Connections Count
    # ======================================================
    df_conn = load_csv_safe(f"{base}/Connections.csv")
    if df_conn is not None:
        record["ConnectionsCount"] = df_conn.count()

    # ======================================================
    # 3. Invitations Count
    # ======================================================
    df_inv = load_csv_safe(f"{base}/Invitations.csv")
    if df_inv is not None:
        record["InvitationsCount"] = df_inv.count()

    # ======================================================
    # 4. Rich Media (Profile Photo + Background + Header)
    # ======================================================
    df_media = load_csv_safe(f"{base}/Rich_Media.csv")

    if df_media is not None and "Media Link" in df_media.columns:

        links = (
            df_media.select("Media Link")
                    .dropna()
                    .rdd.flatMap(lambda x: x)
                    .collect()
        )

        # --- Profile Photo ---
        profile_candidates = [
            url for url in links
            if "profile-displayphoto" in url.lower()
            or "profile-originalphoto" in url.lower()
        ]
        record["ProfilePhotoUrl"] = profile_candidates[0] if profile_candidates else None

        # --- Background Photo ---
        bg_candidates = [
            url for url in links
            if "profile-displaybackgroundimage" in url.lower()
            or "profile-originalbackgroundimage" in url.lower()
        ]
        record["BackgroundPhotoUrl"] = bg_candidates[0] if bg_candidates else None

        # --- Header Photo = first row in CSV ---
        record["HeaderPhotoUrl"] = df_media.first()["Media Link"]

        print("  -> Profile Photo:", record["ProfilePhotoUrl"])
        print("  -> Background Photo:", record["BackgroundPhotoUrl"])
        print("  -> Header Photo:", record["HeaderPhotoUrl"])


# ======================================================
# PART 5 — Final Schema + Save Delta Table
# ======================================================

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("StudentFolder",      StringType(),  True),
    StructField("FullName",           StringType(),  True),
    StructField("Email",              StringType(),  True),
    StructField("Headline",           StringType(),  True),
    StructField("Summary",            StringType(),  True),
    StructField("Skills",             StringType(),  True),
    StructField("Certifications",     StringType(),  True),
    StructField("Education",          StringType(),  True),
    StructField("ConnectionsCount",   IntegerType(), True),
    StructField("InvitationsCount",   IntegerType(), True),
    StructField("ProfilePhotoUrl",    StringType(),  True),
    StructField("BackgroundPhotoUrl", StringType(),  True),
    StructField("HeaderPhotoUrl",     StringType(),  True)
])

df_final = spark.createDataFrame(student_records, schema=schema)

print("\nFinal DataFrame:")
df_final.show(truncate=False)

df_final.write.mode("overwrite").format("delta").saveAsTable("LinkedProfiles")

print("\nLinkedProfiles table saved successfully!")


StatementMeta(, 50a483ea-f96a-48fe-8ae3-05ae328d0975, 32, Finished, Available, Finished)


Extracting email and media for: Ahmed
  -> Profile Photo: https://media.licdn.com/dms/image/v2/D4D04AQFosDDlow4xaA/profile-originalphoto-shrink_900_1200/profile-originalphoto-shrink_900_1200/0/1733952079261?e=1765411200&v=beta&t=S6wjTIWLq63B-Q1hhd3B0Gk5ARUefCrRBzOjtc7g2Wk
  -> Background Photo: https://media.licdn.com/dms/image/v2/D4D16AQFWzt49NgejSA/profile-displaybackgroundimage-shrink_200_800/B4DZVrhKWZHIAY-/0/1741265613584?e=1765411200&v=beta&t=fq4ONxgEYE4kSkF-gBq1RjUXJqCJqdNC1NeRfB1mecY
  -> Header Photo: https://media.licdn.com/dms/document/media/v2/D4D2DAQHpCWyEfsvAlg/profile-treasury-document-chunked-pdf/B4DZhNcMxwHwBg-/0/1753645900810?e=1764363600&v=beta&t=jzraeAA7Z_RGRs1jCR9S4VH-o8UYjTGlZpL03q6zO4M

Extracting email and media for: Aya
  -> Profile Photo: https://media.licdn.com/dms/image/v2/D4D04AQEb6S2x3zLMSQ/profile-originalphoto-shrink_900_1200/B4DZiOcp_nH8AI-/0/1754736536065?e=1765411200&v=beta&t=AgR7-yIBigEwqlgtZXhcEoGf6o1qcBAaKocTnBVUziA
  -> Background Photo: None
  -