In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, year, month, dayofmonth, to_timestamp,from_unixtime,concat_ws,explode,current_timestamp
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql import functions as F
from datetime import datetime
import re

In [3]:
silver_base = "abfss://silver@datalakeyassin.dfs.core.windows.net/"
control_file_path = silver_base + "_control/processed_files.txt"

df_github_silver = spark.read.parquet(silver_base + "github/")
df_stack_silver  = spark.read.parquet(silver_base + "stackoverflow/")
df_reddit_silver = spark.read.parquet(silver_base + "reddit/")


In [28]:
processed_files_df = spark.read.text(control_file_path)
last_etl_time_str = processed_files_df.collect()[0][0].strip()
last_etl_time = datetime.strptime(last_etl_time_str, "%Y-%m-%d %H:%M:%S")

In [29]:
df_github_silver = df_github_silver.filter(col("ingestion_time") > last_etl_time)
df_stack_silver = df_stack_silver.filter(col("ingestion_time") > last_etl_time)
df_reddit_silver = df_reddit_silver.filter(col("ingestion_time") > last_etl_time)

In [10]:
df_reddit_silver.show(5)
df_stack_silver.show(5)

In [11]:
df_github_gold = df_github_silver.withColumn(
    "engagement_raw",
    F.col("stars") + 2*F.col("forks") + F.col("watchers")
)


df_reddit_gold = df_reddit_silver.withColumn(
    "engagement_raw",
    F.col("score") +
    2*F.col("num_comments") +
    3*F.col("total_awards")
)

df_stack_gold = df_stack_silver.withColumn(
    "engagement_raw",
    F.col("score")*0.5 +
    F.col("answer_count")*5 +
    F.col("favorite_count")*3 +
    F.col("view_count")/1000 
)

In [12]:
df_github_gold.select("engagement_raw").show(5)
df_reddit_gold.select("engagement_raw").show(5)
df_stack_gold.select("engagement_raw").show(5)

In [13]:
def normalize(df):
    stats = df.agg(
        F.min("engagement_raw").alias("min_eng"),
        F.max("engagement_raw").alias("max_eng")
    ).collect()[0]

    min_eng = stats["min_eng"]
    max_eng = stats["max_eng"]

    return df.withColumn(
        "engagement",
        (F.col("engagement_raw") - min_eng) / (max_eng - min_eng)
    )
df_github_gold = normalize(df_github_gold)
df_reddit_gold = normalize(df_reddit_gold)
df_stack_gold = normalize(df_stack_gold)



In [15]:
cols = ["job", "tools", "year", "month", "day", "engagement","source","ingestion_time"]

df_github_gold = df_github_gold.select(cols)
df_reddit_gold = df_reddit_gold.select(cols)
df_stack_gold = df_stack_gold.select(cols)

In [16]:
df_github_gold.show(5)
df_reddit_gold.show(5)
df_stack_gold.show(5)

In [22]:
from pyspark.sql import functions as F
df = df_github_gold.unionByName(df_reddit_gold).unionByName(df_stack_gold)

staging_social = (
    df
    .withColumn("tool", F.explode("tools")) 
    .groupBy("job", "tool", "year", "month", "day", "source")
    .agg(
        F.sum("engagement").alias("job_engagement"), 
        F.count("*").alias("tool_mentions"),
        F.max("ingestion_time").alias("ingestion_time")       
    )
)

staging_social.show(20)



In [23]:
staging_social.write.mode("append").parquet(
    "abfss://gold@datalakeyassin.dfs.core.windows.net/social_popularity/"
)

In [25]:
current_etl_time = datetime.utcnow()
df_etl_time = spark.createDataFrame(
    [(current_etl_time.strftime("%Y-%m-%d %H:%M:%S"),)]
)

# Overwrite the control file in ADLS
df_etl_time.coalesce(1).write.mode("overwrite").text(control_file_path)