In [0]:
%pip install openpyxl bs4

In [0]:
dbutils.fs.help('ls')


# 1. Process historical analytics data


In [0]:
import pandas as pd
import re
from delta.tables import DeltaTable

# 1. define our constants
LINKEDIN_PROFILE_NAME = "YingzhaoOuyang" # use your own profile name here

LANDING_CATALOG = "landing"
LANDING_SCHEMA = "linkedin"
LANDING_DAILY_VOLUME = "content_historical"

PENDING_FOLDER = "pending"
PROCESSED_FOLDER = "processed"
ERRORS_FOLDER = "errors"

BRONZE_CATALOG = "bronze"
BRONZE_SCHEMA = "linkedin"
BRONZE_TOTALS_TABLE = "totals"
BRONZE_FOLLOWERS_TABLE = "followers"

# 2. set our input and output variables
source_volume = \
  f"/Volumes/{LANDING_CATALOG}/{LANDING_SCHEMA}/{LANDING_DAILY_VOLUME}/"

landing_pending_folder = f"{source_volume}{PENDING_FOLDER}/"
landing_processed_folder = f"{source_volume}{PROCESSED_FOLDER}/"
landing_errors_folder = f"{source_volume}{ERRORS_FOLDER}/"

bronze_totals_table = \
  f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_TOTALS_TABLE}"

bronze_followers_table = \
  f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_FOLLOWERS_TABLE}"


# 3. execute the ingestion
ingestion_timestamp = pd.Timestamp.utcnow()

# extract the list of files from the pending folder
historical_files_info = [
    (f.path, pd.to_datetime(f.modificationTime, unit='ms', utc=True)) 
    for f in dbutils.fs.ls(landing_pending_folder)
]

for file_path, file_timestamp in historical_files_info:

  # extract filename from file path
  filename = file_path.split('/')[-1]
  
  # define source and target paths for file
  pending_path = landing_pending_folder + filename
  processed_path = landing_processed_folder + filename
  errors_path = landing_errors_folder + filename

  # check if filename is of expected format
  if re.search(
    r'Content_\d{4}-\d{2}-\d{2}_\d{4}-\d{2}-\d{2}_' 
    + LINKEDIN_PROFILE_NAME + r'\.xlsx', filename
  ):
    # process valid filename
    try:
      # read totals from xlsx file
      print(f"Processing ENGAGEMENT sheet in {pending_path}")
      totals_df = pd.read_excel(
        pending_path, 
        sheet_name="ENGAGEMENT", 
        parse_dates=["Date"]
      ).dropna().iloc[:-1] 
      totals_df.columns = totals_df.columns.str.lower().str.replace(' ', '_') 
      totals_df['ingestion_timestamp'] = ingestion_timestamp
      totals_df['source_file'] = filename
      totals_df['source_file_timestamp'] = file_timestamp
      
      # Write totals to Delta table with upsert logic
      print(f"Writing to {bronze_totals_table}")
      if not totals_df.empty:
        if spark.catalog.tableExists(bronze_totals_table):
          delta_table = DeltaTable.forName(spark, bronze_totals_table)
          delta_table.alias("t").merge(
            spark.createDataFrame(totals_df).alias("s"),
            "t.date = s.date"
          ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
        else:
          spark.createDataFrame(totals_df).write.format("delta").saveAsTable(
            bronze_totals_table
          )

      # read followers from xlsx file
      print(f"Processing FOLLOWERS sheet in {pending_path}")
      followers_df = pd.read_excel(
        pending_path, 
        sheet_name="FOLLOWERS", 
        parse_dates=["Date"],
        skiprows=2
      ).dropna().iloc[:-1]
      followers_df.columns = followers_df.columns.str.lower().str.replace(' ', '_') 
      followers_df['ingestion_timestamp'] = ingestion_timestamp
      followers_df['source_file'] = filename
      followers_df['source_file_timestamp'] = file_timestamp
      
      # Write followers to Delta table with upsert logic
      print(f"Writing to {bronze_followers_table}")
      if not followers_df.empty:
        if spark.catalog.tableExists(bronze_followers_table):
          delta_table = DeltaTable.forName(spark, bronze_followers_table)
          delta_table.alias("t").merge(
            spark.createDataFrame(followers_df).alias("s"),
            "t.date = s.date"
          ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
        else:
          spark.createDataFrame(followers_df).write.format("delta").saveAsTable(
            bronze_followers_table
          )

      print(f"Processed: Moving {pending_path} to {processed_path}")
      dbutils.fs.mv(pending_path, processed_path)
    except Exception as e:
      print(e)
      print(f"Errors encountered: Moving {pending_path} to {errors_path}")
      dbutils.fs.mv(pending_path, errors_path)
  else:
    # move invalid filename to errors folder
    try:
      print(f"Invalid filename: Moving {pending_path} to {errors_path}")
      dbutils.fs.mv(pending_path, errors_path)
    except Exception as e:
      print(f"Failed to move file {pending_path}: {e}")


# 2. Process daily analytics data

In [0]:
import pandas as pd
import re
from delta.tables import DeltaTable

# 1. define our constants
VERSION = 2

LINKEDIN_PROFILE_NAME = "YingzhaoOuyang" # use your own profile name here

LANDING_CATALOG = "landing"
LANDING_SCHEMA = "linkedin"
LANDING_DAILY_VOLUME = "content_daily"

PENDING_FOLDER = "pending"
PROCESSED_FOLDER = "processed"
ERRORS_FOLDER = "errors"

BRONZE_CATALOG = "bronze"
BRONZE_SCHEMA = "linkedin"
if VERSION > 1:
  BRONZE_DISCOVERY_TABLE = "discovery"
BRONZE_TOTALS_TABLE = "totals"
BRONZE_FOLLOWERS_TABLE = "followers"

BRONZE_IMPRESSIONS_TABLE = "impressions"
BRONZE_ENGAGEMENTS_TABLE = "engagements"

# 2. set our input and output variables
source_volume = \
  f"/Volumes/{LANDING_CATALOG}/{LANDING_SCHEMA}/{LANDING_DAILY_VOLUME}/"

landing_pending_folder = f"{source_volume}{PENDING_FOLDER}/"
landing_processed_folder = f"{source_volume}{PROCESSED_FOLDER}/"
landing_errors_folder = f"{source_volume}{ERRORS_FOLDER}/"

bronze_totals_table = \
  f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_TOTALS_TABLE}"
if VERSION > 1:
  bronze_discovery_table = \
    f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_DISCOVERY_TABLE}"

bronze_followers_table = \
  f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_FOLLOWERS_TABLE}"

bronze_impressions_table_prefix = \
    f'{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_IMPRESSIONS_TABLE}'
bronze_engagements_table_prefix = \
    f'{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_ENGAGEMENTS_TABLE}'

# 3. execute the ingestion
ingestion_timestamp = pd.Timestamp.utcnow()

# extract the list of files from the pending folder
daily_files_info = [
    (f.path, pd.to_datetime(f.modificationTime, unit='ms', utc=True)) 
    for f in dbutils.fs.ls(landing_pending_folder)
]

# get current count of tables in bronze schema
table_count_in_bronze_schema = spark.sql(
    f"SHOW TABLES IN {BRONZE_CATALOG}.{BRONZE_SCHEMA}"
).count()
dates_processed = []

for file_path, file_timestamp in daily_files_info:

  tables_to_create = 0  
  
  # extract filename from file path
  filename = file_path.split('/')[-1]
  
  # define source and target paths for file
  pending_path = landing_pending_folder + filename
  processed_path = landing_processed_folder + filename
  errors_path = landing_errors_folder + filename

  # check if filename is of expected format (from and to date are the same)
  if re.search(
    r'Content_(\d{4}-\d{2}-\d{2})_\1_' 
    + LINKEDIN_PROFILE_NAME + r'\.xlsx', filename
  ):
      
    # extract date of analytics and append it to staging table names
    analytics_date_str = re.search(
      r'Content_(\d{4}-\d{2}-\d{2})_\1_', filename
    ).group(1)
    analytics_date = pd.to_datetime(analytics_date_str).date()
    bronze_engagements_table = \
      f"{bronze_engagements_table_prefix}_{
          analytics_date_str.replace('-', '_')
        }"
    bronze_impressions_table = \
      f"{bronze_impressions_table_prefix}_{
          analytics_date_str.replace('-', '_')
        }"

    # check if bronze_engagement_table exists in bronze schema
    if not spark.catalog.tableExists(bronze_engagements_table):
      tables_to_create += 1

    # check if bronze_impressions_table exists in bronze schema
    if not spark.catalog.tableExists(bronze_impressions_table):
      tables_to_create += 1

    # exit if too many tables in bronze schema (specific to Databricks Free Edition)
    if table_count_in_bronze_schema + tables_to_create > 100:
        print("Too many tables in bronze schema. Please process and clear staging tables before rerunning.")
        break

    # process valid filename
    try:
      if VERSION > 1:
        try:
          # read members reached from xlsx file
          print(f"Processing DISCOVERY sheet in {pending_path}")
          discovery_df = pd.read_excel(
            pending_path, 
            sheet_name="DISCOVERY", 
          ).set_index("Overall Performance").transpose().dropna().reset_index(drop=True)
          discovery_df.columns = discovery_df.columns.str.lower().str.replace(' ', '_') 

          discovery_df = discovery_df[['members_reached']]
          discovery_df['date'] = analytics_date
          discovery_df['ingestion_timestamp'] = ingestion_timestamp
          discovery_df['source_file'] = filename
          discovery_df['source_file_timestamp'] = file_timestamp

          # Write members reached to Delta table with upsert logic
          print(f"Writing to {bronze_discovery_table}")
          if not discovery_df.empty:
            if spark.catalog.tableExists(bronze_discovery_table):
              delta_table = DeltaTable.forName(spark, bronze_discovery_table)
              delta_table.alias("t").merge(
                spark.createDataFrame(discovery_df).alias("s"),
                "t.date = s.date"
              ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
            else:
              spark.createDataFrame(discovery_df).write.format("delta").saveAsTable(
                bronze_discovery_table
              )
        except Exception as e:
          print(f"Error writing to {bronze_discovery_table}: {e}")
          exit()

      # read totals from xlsx file
      print(f"Processing ENGAGEMENT sheet in {pending_path}")
      totals_df = pd.read_excel(
        pending_path, 
        sheet_name="ENGAGEMENT", 
        parse_dates=["Date"]
      ).dropna()
      totals_df.columns = totals_df.columns.str.lower().str.replace(' ', '_') 
      totals_df['ingestion_timestamp'] = ingestion_timestamp
      totals_df['source_file'] = filename
      totals_df['source_file_timestamp'] = file_timestamp
      
      # Write totals to Delta table with upsert logic
      print(f"Writing to {bronze_totals_table}")
      if not totals_df.empty:
        if spark.catalog.tableExists(bronze_totals_table):
          delta_table = DeltaTable.forName(spark, bronze_totals_table)
          delta_table.alias("t").merge(
            spark.createDataFrame(totals_df).alias("s"),
            "t.date = s.date"
          ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
        else:
          spark.createDataFrame(totals_df).write.format("delta").saveAsTable(
            bronze_totals_table
          )

      # read followers from xlsx file
      print(f"Processing FOLLOWERS sheet in {pending_path}")
      followers_df = pd.read_excel(
        pending_path, 
        sheet_name="FOLLOWERS", 
        parse_dates=["Date"],
        skiprows=2
      ).dropna()
      followers_df.columns = followers_df.columns.str.lower().str.replace(' ', '_') 
      followers_df['ingestion_timestamp'] = ingestion_timestamp
      followers_df['source_file'] = filename
      followers_df['source_file_timestamp'] = file_timestamp
      
      # Write followers to Delta table with upsert logic
      print(f"Writing to {bronze_followers_table}")
      if not followers_df.empty:
        if spark.catalog.tableExists(bronze_followers_table):
          delta_table = DeltaTable.forName(spark, bronze_followers_table)
          delta_table.alias("t").merge(
            spark.createDataFrame(followers_df).alias("s"),
            "t.date = s.date"
          ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
        else:
          spark.createDataFrame(followers_df).write.format("delta").saveAsTable(
            bronze_followers_table
          )

      # read top posts from xlsx file  
      print(f"Processing TOP POSTS sheet in {pending_path}")
      topposts_df = pd.read_excel(pending_path, sheet_name="TOP POSTS", skiprows=2)

      engagements_df = topposts_df.iloc[:, :3].dropna()
      if not engagements_df.empty:  
        engagements_df.columns = engagements_df.columns.str.replace(' ', '_').str.lower()
        engagements_df['analytics_date'] = analytics_date
        engagements_df['ingestion_timestamp'] = ingestion_timestamp
        engagements_df['source_file'] = filename
        engagements_df['source_file_timestamp'] = file_timestamp

      impressions_df = topposts_df.iloc[:, 4:].dropna()
      if not impressions_df.empty:
        impressions_df.columns = impressions_df.columns.str.replace('.1', '', regex=False).str.replace(' ', '_').str.lower()
        impressions_df['analytics_date'] = analytics_date
        impressions_df['ingestion_timestamp'] = ingestion_timestamp
        impressions_df['source_file'] = filename
        impressions_df['source_file_timestamp'] = file_timestamp

      # Write engagements to Delta staging table, overwrite existing      
      if not engagements_df.empty:
        print(f"Writing to {bronze_engagements_table}")
        spark.createDataFrame(engagements_df).write.mode("overwrite").saveAsTable(
          bronze_engagements_table
        )
      else:
        print(f"Skipped writing empty dataset to {bronze_engagements_table}")

      # Write impressions to Delta staging table, overwrite existing
      
      if not impressions_df.empty:
        print(f"Writing to {bronze_impressions_table}")
        spark.createDataFrame(impressions_df).write.mode("overwrite").saveAsTable(
          bronze_impressions_table
        )
      else:
        print(f"Skipped writing empty dataset to {bronze_impressions_table}")

      # update counters for dates processed and tables in bronze schema
      dates_processed.append(analytics_date_str)  
      table_count_in_bronze_schema += tables_to_create

      print(f"Processed: Moving {pending_path} to {processed_path}")
      dbutils.fs.mv(pending_path, processed_path)


    except Exception as e:
      print(e)
      print(f"Errors encountered: Moving {pending_path} to {errors_path}")
      dbutils.fs.mv(pending_path, errors_path)
  else:
    # move invalid filename to errors folder
    try:
      print(f"Invalid filename: Moving {pending_path} to {errors_path}")
      dbutils.fs.mv(pending_path, errors_path)
    except Exception as e:
      print(f"Failed to move file {pending_path}: {e}")

if len(dates_processed) == 0:
    print("No dates processed.")
else:
    print(f"Dates processed: {dates_processed}")



In [0]:
# uncomment to reprocess files in errors folder

# files = dbutils.fs.ls('/Volumes/landing/linkedin/content_daily/errors/')
# for file in files:
#     dbutils.fs.mv(file.path, '/Volumes/landing/linkedin/content_daily/pending/' + file.name)


In [0]:
# uncomment to reprocess files in processed folder

# files = dbutils.fs.ls('/Volumes/landing/linkedin/content_daily/processed/')
# for file in files:
#     dbutils.fs.mv(file.path, '/Volumes/landing/linkedin/content_daily/pending/' + file.name)


In [0]:
# uncomment to drop staging tables

# from pyspark.sql.functions import col

# staging_table_prefix = "impressions_"
# tables_df = spark.catalog.listTables("bronze.linkedin")
# impressions_tables = [t.name for t in tables_df if t.name.startswith(staging_table_prefix)]
# for table in impressions_tables:
#     spark.sql(f"DROP TABLE bronze.linkedin.{table}")

# 3. Post table ingestion

In [0]:
# Helper functions for fetching and parsing LinkedIn post HTML content
import requests
from bs4 import BeautifulSoup
import time

def get_html_content(url: str, max_retries: int = 5, base_delay: float = 5.0):
    """
    Fetches HTML content from a LinkedIn post URL with exponential backoff.
    """
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
    }
    for attempt in range(max_retries):
        try:
            print(f"Fetching HTML content from URL: {url} (Attempt {attempt + 1})")
            response = requests.get(url, headers=headers)
            response.raise_for_status()
            print(f"Successfully fetched HTML content from {url}")
            return response.content
        except Exception as e:
            print(f"Error fetching {url}: {e}")
            if attempt < max_retries - 1:
                delay = base_delay * (2 ** attempt)
                print(f"Retrying in {delay} seconds...")
                time.sleep(delay)
            else:
                print(f"Failed to fetch HTML content from {url} after {max_retries} attempts.")
                raise e

def get_content(html_content):
    """
    Extracts post content from HTML using BeautifulSoup.
    """
    soup = BeautifulSoup(html_content, "html.parser")
    content_wrapper = soup.find('p', class_="attributed-text-segment-list__content")
    if content_wrapper:
        return content_wrapper.get_text()
    return None

In [0]:
# Extract posts data using post_url from all impressions tables and save to bronze posts table
import pandas as pd
from pyspark.sql.functions import col, row_number, desc
from pyspark.sql.window import Window
from delta.tables import DeltaTable

BRONZE_POSTS_TABLE = "posts"

bronze_posts_table = f'{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_POSTS_TABLE}'

# List all impressions staging tables in the bronze.linkedin schema
daily_impressions_tables = [table.name for table in spark.catalog.listTables(f'{BRONZE_CATALOG}.{BRONZE_SCHEMA}') if table.name.startswith(BRONZE_IMPRESSIONS_TABLE)]

print("Loading daily impressions tables to extract posts data...")
# Read each table and select the relevant columns
columns_to_select = [
  "post_url", "post_publish_date", 
  "ingestion_timestamp", "source_file", "source_file_timestamp"
]
dfs = [spark.read.table(f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{table}").select(*columns_to_select) for table in daily_impressions_tables]

# Union all dataframes and remove duplicates
if dfs:
    impressions_df = dfs[0]
    for df in dfs[1:]:
        impressions_df = impressions_df.unionByName(df)
    impressions_df = impressions_df.withColumn(
        "row_num",
        row_number().over(
            Window.partitionBy("post_url", "post_publish_date").orderBy(desc("ingestion_timestamp"))
        )
    ).filter(col("row_num") == 1).drop("row_num")
    impressions_pd = impressions_df.toPandas()[columns_to_select]
    impressions_pd['post_publish_date'] = pd.to_datetime(impressions_pd['post_publish_date'])
else:
    impressions_pd = pd.DataFrame(columns=columns_to_select)


if not impressions_pd.empty:
    # Display posts to be ingested
    display(impressions_pd)

       
    # Fetch HTML content for each post URL
    print(f"Fetching HTML content for {len(impressions_pd)} posts...")
    impressions_pd['html_content'] = impressions_pd['post_url'].apply(get_html_content)
    # Extract link, title, and content from the HTML content
    impressions_pd['link'] = impressions_pd['html_content'].apply(lambda x: BeautifulSoup(x, "html.parser").find('link').get('href'))
    impressions_pd['title'] = impressions_pd['html_content'].apply(lambda x: BeautifulSoup(x, "html.parser").find("head").get_text(strip=True).split('|')[0].strip())
    impressions_pd['content'] = impressions_pd['html_content'].apply(get_content)

    impressions_spark_df = spark.createDataFrame(impressions_pd)
    if not spark.catalog.tableExists(bronze_posts_table):
        print(f"Creating {bronze_posts_table} table...")
        impressions_spark_df.write.format("delta").saveAsTable(bronze_posts_table)
    else:
        print(f"Appending new posts to {bronze_posts_table} table...")
        impressions_spark_df.write.format("delta").mode("append").saveAsTable(bronze_posts_table)
else:
    print("No new posts found based on impressions tables.")

# Display sample of posts data for verification
posts_sample_df = spark.read.table(bronze_posts_table).limit(10)
display(posts_sample_df)

In [0]:
posts_df = spark.read.table(bronze_posts_table).collect()
display(posts_df)

# 4. Ingest post analytics data

In [0]:
test_df = pd.read_excel(
        '/Volumes/landing/linkedin/posts/processed/PostAnalytics_YingzhaoOuyang_7367980676152819712.xlsx', 
        sheet_name="PERFORMANCE", 
        header=None,
        names=["key", "value"],
      ).dropna(how='all')

display(test_df)

In [0]:
if not test_df[test_df['key'].str.contains(" Highlights ", na=False)]['key'].empty:
    print("Found")


In [0]:
if not test_df[test_df['key'].str.contains(" Highlights ", na=False)]['key'].empty:

    date_range_str = test_df[test_df['key'].str.contains(" Highlights ", na=False)]['key'].str.split(" Highlights ").iloc[0][1]
    [date_start_str, date_end_str] = date_range_str.split(" to ")

    date_start = pd.to_datetime(date_start_str).date()
    date_end = pd.to_datetime(date_end_str).date()

else:
    print("Cannot detect analytics date")

date_start, date_end

In [0]:
timedelta_str = test_df.dropna().set_index('key').transpose()["Post Publish Time"].iloc[0]

pd.to_datetime(date_start_str + ' ' + timedelta_str + ' UTC')

In [0]:
post_url = test_df.dropna().set_index('key').transpose()["Post URL"].iloc[0]

In [0]:
html_content = get_html_content(post_url)
link = BeautifulSoup(html_content, "html.parser").find('link').get('href')

link

In [0]:
test_df.dropna().set_index('key').transpose().to_dict()

In [0]:
import pandas as pd
import re
import datetime
from pyspark.sql import Row

# 1. define our constants
LINKEDIN_PROFILE_NAME = "YingzhaoOuyang" # use your own profile name here

LANDING_CATALOG = "landing"
LANDING_SCHEMA = "linkedin"
LANDING_POSTS_VOLUME = "posts"

PENDING_FOLDER = "pending"
PROCESSED_FOLDER = "processed"
ERRORS_FOLDER = "errors"

BRONZE_CATALOG = "bronze"
BRONZE_SCHEMA = "linkedin"
BRONZE_POST_DETAILS_TABLE = "post_details"

# 2. set our input and output variables
source_volume = \
  f"/Volumes/{LANDING_CATALOG}/{LANDING_SCHEMA}/{LANDING_POSTS_VOLUME}/"

landing_pending_folder = f"{source_volume}{PENDING_FOLDER}/"
landing_processed_folder = f"{source_volume}{PROCESSED_FOLDER}/"
landing_errors_folder = f"{source_volume}{ERRORS_FOLDER}/"

bronze_post_details_table = \
  f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_POST_DETAILS_TABLE}"

# 3. execute the ingestion
ingestion_timestamp = datetime.datetime.utcnow()

# extract the list of files from the pending folder
post_files_info = [
    (f.path, pd.to_datetime(f.modificationTime, unit='ms', utc=True).to_pydatetime()) 
    for f in dbutils.fs.ls(landing_pending_folder)
]

for file_path, file_timestamp in post_files_info:
 
  # extract filename from file path
  filename = file_path.split('/')[-1]
  
  # define source and target paths for file
  pending_path = landing_pending_folder + filename
  processed_path = landing_processed_folder + filename
  errors_path = landing_errors_folder + filename

  # check if filename is of expected format
  if re.search(
    r'PostAnalytics_' + LINKEDIN_PROFILE_NAME + r'_\d+(?: \(\d+\))?\.xlsx', filename
  ):
      
    # process valid filename
    try:
      # read totals from xlsx file
      print(f"Extracting post URL and timestamp from PERFORMANCE sheet in {pending_path}")
      post_details_raw_df = pd.read_excel(
        pending_path, 
        sheet_name="PERFORMANCE", 
        header=None,
        names=["key", "value"],
      ).dropna(how='all')

      # Extract timestamp. post URL and date of analytics
      transposed_post_details_df = post_details_raw_df.dropna().set_index('key').transpose()
      post_timedelta_str = transposed_post_details_df["Post Publish Time"].iloc[0]

      # Extract date range of analytics from specific string pattern
      df_with_date_range = post_details_raw_df[post_details_raw_df['key'].str.contains(
          " Highlights ", na=False
        )]['key']
      if df_with_date_range.empty:
        print("Cannot detect analytics date")
        date_start_str = transposed_post_details_df["Post Date"].iloc[0]
        date_end_str = None
      else:
        date_range_str = \
          df_with_date_range.str.split(" Highlights ").iloc[0][1]
        [date_start_str, date_end_str] = date_range_str.split(" to ")

      post_timestamp = pd.to_datetime(date_start_str + ' ' + timedelta_str + ' UTC').to_pydatetime()

      post_url = transposed_post_details_df["Post URL"].iloc[0]
      if date_end_str:
        analytics_date = pd.to_datetime(date_end_str).to_pydatetime().date()
      else:
        analytics_date = None

      post_top_demographics_df = pd.read_excel(
        pending_path, 
        sheet_name="TOP DEMOGRAPHICS", 
      ).dropna()
      post_top_demographics_df.columns = post_top_demographics_df.columns.str.lower().str.replace('%', 'percent')

      # Fetch HTML content for each post URL
      print(f"Fetching HTML content and link for {post_url}")
      html_content = get_html_content(post_url)
      # Extract link from the HTML content
      link = BeautifulSoup(html_content, "html.parser").find('link').get('href')
      
      print(f"Preparing post details record for {link}")
      post_details_record = Row(
          post_url=post_url, 
          link=link,
          post_timestamp=post_timestamp,
          analytics_date=analytics_date,
          performace_metrics=str(transposed_post_details_df.to_dict()),
          demographics_metrics=str(post_top_demographics_df.to_dict()),
          ingestion_timestamp=ingestion_timestamp,
          source_file=file_path,
          source_file_timestamp=file_timestamp,
      )

      # Write post_details_record to Delta table with upsert logic based on latest analytics date
      print(f"Writing to {bronze_post_details_table}: {post_details_record}")
      post_details_spark_df = spark.createDataFrame([post_details_record])
      if spark.catalog.tableExists(bronze_post_details_table):
          delta_table = DeltaTable.forName(spark, bronze_post_details_table)
          delta_table.alias("t").merge(
              post_details_spark_df.alias("s"),
              "t.post_url = s.post_url and t.analytics_date = s.analytics_date"
          ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
      else:
          post_details_spark_df.write.format("delta").saveAsTable(
            bronze_post_details_table
          )

      print(f"Processed: Moving {pending_path} to {processed_path}")
      dbutils.fs.mv(pending_path, processed_path)


    except Exception as e:
      print(e)
      print(f"Errors encountered: Moving {pending_path} to {errors_path}")
      dbutils.fs.mv(pending_path, errors_path)
  else:
    # move invalid filename to errors folder
    try:
      print(f"Invalid filename: Moving {pending_path} to {errors_path}")
      dbutils.fs.mv(pending_path, errors_path)
    except Exception as e:
      print(f"Failed to move file {pending_path}: {e}")



In [0]:
# uncomment to reprocess files in errors folder

# files = dbutils.fs.ls('/Volumes/landing/linkedin/posts/errors/')
# for file in files:
#     dbutils.fs.mv(file.path, '/Volumes/landing/linkedin/posts/pending/' + file.name)


# 5. Silver Layer: Impressions ETL

This section processes daily bronze impressions tables, standardizes date formats, merges data into the silver impressions table, and logs ingestion events. Source tables are dropped after processing to maintain workspace hygiene.

In [0]:
import pandas as pd
import re
import datetime
from pyspark.sql import Row
from pyspark.sql.functions import sum
from delta.tables import DeltaTable

# 1. define our constants
LINKEDIN_PROFILE_NAME = "YingzhaoOuyang" # use your own profile name here

BRONZE_CATALOG = "bronze"
BRONZE_SCHEMA = "linkedin"
BRONZE_IMPRESSIONS_TABLE_PREFIX = "impressions"
BRONZE_TOTALS_TABLE = "totals"

SILVER_CATALOG = "silver"
SILVER_SCHEMA = "linkedin"
SILVER_IMPRESSIONS_TABLE = "impressions"

# 2. set our input and output variables

# extract list of daily bronze staging impressions tables
bronze_impressions_tables = [
    table.name for table in spark.catalog.listTables(f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}")
    if re.match(rf"{BRONZE_IMPRESSIONS_TABLE_PREFIX}_\d{{4}}_\d{{2}}_\d{{2}}$", table.name)
]

silver_impressions_table = \
  f"{SILVER_CATALOG}.{SILVER_SCHEMA}.{SILVER_IMPRESSIONS_TABLE}"

# 3. execute the ingestion
ingestion_timestamp = datetime.datetime.utcnow()

# Process daily bronze staging impressions tables and merge into silver layer

print("Processing daily impressions tables...")

for bronze_impression_table in bronze_impressions_tables:
    # Extract date suffix from table name and convert to datetime in %Y_%m_%d format
    date_str = bronze_impression_table.replace(f"{BRONZE_IMPRESSIONS_TABLE_PREFIX}_", "")
    analytics_date = pd.to_datetime(date_str, format='%Y_%m_%d').date()

    table_name = f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{bronze_impression_table}"
    
    print(f"Processing {table_name}...")

    silver_impressions_df = spark.sql(f"""
    SELECT 
        post_url,
        to_date(post_publish_date, 'M/d/yyyy') AS post_publish_date,
        impressions,
        to_date('{date_str}', 'yyyy_MM_dd') AS analytics_date 
    FROM {table_name}
    """)

    silver_impressions_totals_df = spark.sql(f"""
    SELECT 
        impressions,
        source_file,
        source_file_timestamp,
        ingestion_timestamp
    FROM {BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_TOTALS_TABLE}
    WHERE to_date('{date_str}', 'yyyy_MM_dd') = date
    """)

    impressions_others = silver_impressions_totals_df.select("impressions").collect()[0].impressions - silver_impressions_df.agg(sum("impressions")).collect()[0][0]
    if impressions_others > 0:
        silver_impressions_df = silver_impressions_df.union(
            spark.createDataFrame([
                Row(
                    post_url = "others",
                    post_publish_date = analytics_date,
                    impressions = impressions_others,
                    analytics_date = analytics_date,
                    # ingestion_timestamp = silver_impressions_totals_df.select("ingestion_timestamp").collect()[0].ingestion_timestamp,
                    # source_file = silver_impressions_totals_df.select("source_file").collect()[0].source_file,
                    # source_file_timestamp = silver_impressions_totals_df.select("source_file_timestamp").collect()[0].source_file_timestamp
                )
            ])
        )

    if spark.catalog.tableExists(silver_impressions_table):
        print(f"Table {silver_impressions_table} exists, merging data...")
        delta_table = DeltaTable.forName(spark, silver_impressions_table)
        (
            delta_table.alias("t")
            .merge(
                silver_impressions_df.alias("s"),
                "t.post_url = s.post_url AND t.analytics_date = s.analytics_date"
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
    else:
        print(f"Table {silver_impressions_table} does not exist, creating...")
        silver_impressions_df.write.format("delta").saveAsTable(silver_impressions_table)
    
    # Drop the source table after processing
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    print(f"Dropped table {table_name}")

# 6. Silver Layer: Engagements ETL
This section processes daily bronze engagements tables, standardizes date formats, merges data into the silver engagemetns table, and logs ingestion events. Source tables are dropped after processing to maintain workspace hygiene.

In [0]:
import pandas as pd
import re
import datetime
from pyspark.sql import Row
from pyspark.sql.functions import sum
from delta.tables import DeltaTable

# 1. define our constants
LINKEDIN_PROFILE_NAME = "YingzhaoOuyang" # use your own profile name here

BRONZE_CATALOG = "bronze"
BRONZE_SCHEMA = "linkedin"
BRONZE_ENGAGEMENTS_TABLE_PREFIX = "engagements"
BRONZE_TOTALS_TABLE = "totals"

SILVER_CATALOG = "silver"
SILVER_SCHEMA = "linkedin"
SILVER_ENGAGEMENTS_TABLE = "engagements"

# 2. set our input and output variables

# extract list of daily bronze staging engagements tables
bronze_engagements_tables = [
    table.name for table in spark.catalog.listTables(f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}")
    if re.match(rf"{BRONZE_ENGAGEMENTS_TABLE_PREFIX}_\d{{4}}_\d{{2}}_\d{{2}}$", table.name)
]

silver_engagements_table = \
  f"{SILVER_CATALOG}.{SILVER_SCHEMA}.{SILVER_ENGAGEMENTS_TABLE}"

# 3. execute the ingestion
ingestion_timestamp = datetime.datetime.utcnow()

# Process daily bronze staging engagements tables and merge into silver layer

print("Processing daily engagements tables...")

for bronze_engagements_table in bronze_engagements_tables:
    # Extract date suffix from table name and convert to datetime in %Y_%m_%d format
    date_str = bronze_engagements_table.replace(f"{BRONZE_ENGAGEMENTS_TABLE_PREFIX}_", "")
    analytics_date = pd.to_datetime(date_str, format='%Y_%m_%d').date()

    table_name = f"{BRONZE_CATALOG}.{BRONZE_SCHEMA}.{bronze_engagements_table}"
    
    print(f"Processing {table_name}...")

    silver_engagements_df = spark.sql(f"""
    SELECT 
        post_url,
        to_date(post_publish_date, 'M/d/yyyy') AS post_publish_date,
        engagements,
        to_date('{date_str}', 'yyyy_MM_dd') AS analytics_date 
    FROM {table_name}
    """)

    silver_engagements_totals_df = spark.sql(f"""
    SELECT 
        engagements,
        source_file,
        source_file_timestamp,
        ingestion_timestamp
    FROM {BRONZE_CATALOG}.{BRONZE_SCHEMA}.{BRONZE_TOTALS_TABLE}
    WHERE to_date('{date_str}', 'yyyy_MM_dd') = date
    """)

    engagements_others = silver_engagements_totals_df.select("engagements").collect()[0].engagements - silver_engagements_df.agg(sum("engagements")).collect()[0][0]
    if engagements_others > 0:
        silver_engagements_df = silver_engagements_df.union(
            spark.createDataFrame([
                Row(
                    post_url = "others",
                    post_publish_date = analytics_date,
                    engagements = engagements_others,
                    analytics_date = analytics_date,
                    # ingestion_timestamp = silver_engagements_totals_df.select("ingestion_timestamp").collect()[0].ingestion_timestamp,
                    # source_file = silver_engagements_totals_df.select("source_file").collect()[0].source_file,
                    # source_file_timestamp = silver_engagements_totals_df.select("source_file_timestamp").collect()[0].source_file_timestamp
                )
            ])
        )

    if spark.catalog.tableExists(silver_engagements_table):
        print(f"Table {silver_engagements_table} exists, merging data...")
        delta_table = DeltaTable.forName(spark, silver_engagements_table)
        (
            delta_table.alias("t")
            .merge(
                silver_engagements_df.alias("s"),
                "t.post_url = s.post_url AND t.analytics_date = s.analytics_date"
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
    else:
        print(f"Table {silver_engagements_table} does not exist, creating...")
        silver_engagements_df.write.format("delta").saveAsTable(silver_engagements_table)
    
    # Drop the source table after processing
    spark.sql(f"DROP TABLE IF EXISTS {table_name}")
    print(f"Dropped table {table_name}")

#7. Silver Layer: Fill in-between dates for engagement and impressions tables with 0 values


In [0]:
from pyspark.sql.functions import col, lit, sequence, explode, to_date, min as spark_min, max as spark_max

import pandas as pd
import re
import datetime
from pyspark.sql import Row
from pyspark.sql.functions import sum
from delta.tables import DeltaTable

# 1. define our constants
LINKEDIN_PROFILE_NAME = "YingzhaoOuyang" # use your own profile name here

SILVER_CATALOG = "silver"
SILVER_SCHEMA = "linkedin"
SILVER_IMPRESSIONS_TABLE = "impressions"
SILVER_ENGAGEMENTS_TABLE = "engagements"

silver_impressions_table = \
  f"{SILVER_CATALOG}.{SILVER_SCHEMA}.{SILVER_IMPRESSIONS_TABLE}"
silver_engagements_table = \
  f"{SILVER_CATALOG}.{SILVER_SCHEMA}.{SILVER_ENGAGEMENTS_TABLE}"


def fill_missing_dates(df, date_col, group_cols, value_col):
    # Get min and max dates
    date_range = df.select(spark_min(date_col).alias("min_date"), spark_max(date_col).alias("max_date")).collect()[0]
    min_date, max_date = date_range.min_date, date_range.max_date

    # Create full date sequence
    date_seq_df = spark.createDataFrame([(min_date, max_date)], ["start", "end"]) \
        .select(explode(sequence(col("start"), col("end"))).alias(date_col))

    # Cross join with unique post_url and post_publish_date
    unique_keys_df = df.select(*group_cols).distinct()
    full_grid_df = unique_keys_df.crossJoin(date_seq_df)

    # Left join to original df
    filled_df = full_grid_df.join(
        df,
        on=group_cols + [date_col],
        how="left"
    ).fillna({value_col: 0})

    return filled_df

# Fill missing dates for silver impressions
silver_impressions_filled_df = fill_missing_dates(
    spark.table(silver_impressions_table),
    "analytics_date",
    ["post_url", "post_publish_date"],
    "impressions"
)

# Fill missing dates for silver engagements
silver_engagements_filled_df = fill_missing_dates(
    spark.table(silver_engagements_table),
    "analytics_date",
    ["post_url", "post_publish_date"],
    "engagements"
)

display(silver_impressions_filled_df)
display(silver_engagements_filled_df)

# silver_impressions_filled_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver_impressions_table)
# silver_engagements_filled_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(silver_engagements_table)

# 8. Silver Layer: Create totals and followers views

In [0]:
%sql
CREATE VIEW IF NOT EXISTS silver.linkedin.totals AS SELECT date, impressions, engagements FROM bronze.linkedin.totals;
CREATE VIEW IF NOT EXISTS silver.linkedin.followers AS SELECT date, new_followers FROM bronze.linkedin.followers;

# 9. Silver Layer: Posts ETL
This section merges bronze posts and patch data, enriches post metadata, and writes the results to the silver posts table. Ingestion events are logged for traceability.

In [0]:
%sql
-- 1. Join patch to posts on post_url
-- Prioritize patch data over posts data
-- Deduplicate merged_posts to ensure one row per post_url
CREATE OR REPLACE TEMP VIEW merged_posts_dedup AS
SELECT
  post_url,
  post_publish_date,
  post_publish_timestamp,
  link,
  title,
  content
FROM (
  SELECT
    posts.post_url AS post_url,
    CAST(posts.post_publish_date AS DATE) AS post_publish_date,
    post_details.post_timestamp AS post_publish_timestamp,
    COALESCE(patch.true_url, posts.link) AS link,
    COALESCE(patch.title, posts.title) AS title,
    COALESCE(patch.content, posts.content, posts.title) AS content,
    ROW_NUMBER() OVER (
      PARTITION BY posts.post_url 
      ORDER BY 
        patch.true_url DESC, 
        patch.title DESC, 
        patch.content DESC
    ) AS rn
  FROM
    bronze.linkedin.posts AS posts
  LEFT JOIN
    bronze.linkedin.linkedin_patch AS patch
  ON
    posts.post_url = patch.post_url
  LEFT JOIN
    bronze.linkedin.post_details AS post_details
  ON
    posts.post_url = post_details.post_url
)
WHERE rn = 1;

-- 2. Create silver posts table if not exists
CREATE TABLE IF NOT EXISTS silver.linkedin.posts (
  post_url STRING,
  post_publish_date DATE,
  post_publish_timestamp TIMESTAMP,
  link STRING,
  title STRING,
  content STRING
) USING DELTA;

-- 3. Use deduplicated view for MERGE
MERGE INTO silver.linkedin.posts AS t
USING merged_posts_dedup AS s
ON t.post_url = s.post_url
WHEN MATCHED THEN
  UPDATE SET
    post_url = s.post_url,
    post_publish_date = s.post_publish_date,
    post_publish_timestamp = s.post_publish_timestamp,
    link = s.link,
    title = s.title,
    content = s.content
WHEN NOT MATCHED THEN
  INSERT (
    post_url, 
    post_publish_date, 
    post_publish_timestamp, 
    link, 
    title, 
    content
  )
  VALUES (
    s.post_url, 
    s.post_publish_date, 
    s.post_publish_timestamp, 
    s.link, 
    s.title, 
    s.content
  );