In [0]:
# =======================================================
# STEP 1: INGESTION — Create UC Volume, Mirror CSV, Write Bronze
# =======================================================
from pyspark.sql import functions as F

# ---------- Config ----------
CATALOG = "influencer"                            # UC catalog name
SCHEMA_RAW = "raw"                                # schema for raw/bronze
VOLUME_NAME = "ingest_vol"                        # volume to hold the CSV
WORKSPACE_CSV = "/Workspace/Users/ragutudeepika68730@gmail.com/Big_data_analytics_pipeline/Influencers_dataset.csv"

def tbl(name: str) -> str:
    return f"{CATALOG}.{name}"

# Correct UC Volume file path (catalog/schema/volume/file)
VOLUME_CSV = f"/Volumes/{CATALOG}/{SCHEMA_RAW}/{VOLUME_NAME}/Influencers_dataset.csv"

# ---------- Ensure UC namespaces/volume exist ----------
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
spark.sql(f"CREATE SCHEMA  IF NOT EXISTS {CATALOG}.{SCHEMA_RAW}")
spark.sql(f"CREATE VOLUME  IF NOT EXISTS {CATALOG}.{SCHEMA_RAW}.{VOLUME_NAME}")

print("✅ UC objects ensured:")
print(f"  Catalog: {CATALOG}")
print(f"  Schema : {SCHEMA_RAW}")
print(f"  Volume : {VOLUME_NAME}")
print(f"  Target CSV path: {VOLUME_CSV}")

# ---------- FS helpers ----------
def _exists(dbfs_path: str) -> bool:
    try:
        dbutils.fs.ls(dbfs_path.rstrip("/"))
        return True
    except Exception:
        return False

def mirror_to_volume(ws_path: str, vol_path: str) -> str:
    """
    Copy your CSV into a UC Volume so executors in Jobs can read it safely.
    Tries dbfs:/Workspace/... and dbfs:/FileStore/... as sources.
    """
    vol_dbfs = "dbfs:" + vol_path

    # If already there, use it
    if _exists(vol_dbfs):
        print(f"ℹ️ Using existing file in UC Volume: {vol_dbfs}")
        return vol_path

    candidates = []
    if ws_path.startswith("/Workspace/"):
        candidates.append("dbfs:" + ws_path)                 # dbfs:/Workspace/...
    if ws_path.startswith("dbfs:/"):
        candidates.append(ws_path)                           # already dbfs:/
    candidates.append("dbfs:/FileStore/Influencers_dataset.csv")  # UI upload fallback

    last_err = None
    for src in candidates:
        if not _exists(src):
            continue
        try:
            try:
                dbutils.fs.rm(vol_dbfs, recurse=False)
            except:
                pass
            dbutils.fs.cp(src, vol_dbfs)
            print(f"✅ Copied {src} → {vol_dbfs}")
            return vol_path
        except Exception as e:
            last_err = e
            print(f"⚠️ Copy failed from {src} → {vol_dbfs}: {e}")

    raise RuntimeError(
        "❌ Could not stage the CSV into a UC Volume.\n"
        f"Upload it manually to:\n  {vol_path}\n\n"
        "Or if you put it in FileStore via UI, run once:\n"
        f"  dbutils.fs.cp('dbfs:/FileStore/Influencers_dataset.csv', 'dbfs:{vol_path}')\n\n"
        f"Last error: {last_err}"
    )

# ---------- Stage CSV into UC Volume ----------
resolved_csv = mirror_to_volume(WORKSPACE_CSV, VOLUME_CSV)

# ---------- Read CSV robustly ----------
df = (
    spark.read
         .option("header", True)
         .option("inferSchema", True)
         .option("multiLine", True)
         .option("mode", "PERMISSIVE")
         .option("quote", '"')
         .option("escape", '"')
         .csv(resolved_csv)
)

# ---------- Dynamic column mapping ----------
cols = set(df.columns)
def pick(cands):
    for c in cands:
        if c in cols:
            return F.col(c)
    return None
def OR(expr, fallback): 
    return expr if expr is not None else fallback

PLATFORM=["platform","source_platform","source"]
CREATOR =["creator_id","user_id","handle","username","channel_id","author_id"]
POSTID  =["post_id","content_id","id","tweet_id","video_id"]
TIME    =["timestamp","created_at","date","datetime","published_at"]
TEXT    =["text","caption","title","body","description"]
LIKE    =["like_count","likes"]
COMM    =["comment_count","comments","reply_count"]
SHARE   =["share_count","shares","retweets"]
COUNTRY =["audience_country","country"]
AGE18   =["audience_age_18_24","age_18_24"]
AGE25   =["audience_age_25_34","age_25_34"]
FEM     =["audience_gender_f","female_ratio"]

synthetic_pid = F.concat(F.lit("p_"), F.monotonically_increasing_id())

bronze = df.select(
    F.lower(F.trim(OR(pick(PLATFORM), F.lit("unknown")))).alias("platform"),
    F.trim(OR(pick(CREATOR), F.lit("unknown_creator"))).alias("creator_id"),
    OR(pick(POSTID), synthetic_pid).alias("post_id"),
    OR(pick(TIME), F.current_timestamp()).alias("timestamp"),
    F.trim(OR(pick(TEXT), F.lit(""))).alias("text"),
    F.coalesce(OR(pick(LIKE),  F.lit(None)).cast("int"),   F.lit(0)).alias("like_count"),
    F.coalesce(OR(pick(COMM),  F.lit(None)).cast("int"),   F.lit(0)).alias("comment_count"),
    F.coalesce(OR(pick(SHARE), F.lit(None)).cast("int"),   F.lit(0)).alias("share_count"),
    F.upper(OR(pick(COUNTRY), F.lit(None))).alias("audience_country"),
    F.coalesce(OR(pick(AGE18), F.lit(None)).cast("double"),F.lit(0.0)).alias("audience_age_18_24"),
    F.coalesce(OR(pick(AGE25), F.lit(None)).cast("double"),F.lit(0.0)).alias("audience_age_25_34"),
    F.coalesce(OR(pick(FEM),   F.lit(None)).cast("double"),F.lit(0.0)).alias("audience_gender_f")
)

# ---------- Write Bronze Delta ----------
(
    bronze.write.format("delta")
          .mode("overwrite")
          .option("overwriteSchema", "true")
          .saveAsTable(tbl("raw.posts_bronze"))
)

print("✅ Step 1 complete — Bronze table ready:", tbl("raw.posts_bronze"))
print(f"ℹ️ Staged file (UC Volume): {resolved_csv}")
display(spark.table(tbl("raw.posts_bronze")).limit(10))


✅ UC objects ensured:
  Catalog: influencer
  Schema : raw
  Volume : ingest_vol
  Target CSV path: /Volumes/influencer/raw/ingest_vol/Influencers_dataset.csv
ℹ️ Using existing file in UC Volume: dbfs:/Volumes/influencer/raw/ingest_vol/Influencers_dataset.csv
✅ Step 1 complete — Bronze table ready: influencer.raw.posts_bronze
ℹ️ Staged file (UC Volume): /Volumes/influencer/raw/ingest_vol/Influencers_dataset.csv


platform,creator_id,post_id,timestamp,text,like_count,comment_count,share_count,audience_country,audience_age_18_24,audience_age_25_34,audience_gender_f
instagram,unknown_creator,p_0,2025-11-11T15:24:51.628Z,,0,0,0,KENYA,0.0,0.0,0.0
twitter,unknown_creator,p_1,2025-11-11T15:24:51.628Z,,0,0,0,GERMANY,0.0,0.0,0.0
tiktok,unknown_creator,p_2,2025-11-11T15:24:51.628Z,,0,0,0,KENYA,0.0,0.0,0.0
youtube,unknown_creator,p_3,2025-11-11T15:24:51.628Z,,0,0,0,UNITED KINGDOM,0.0,0.0,0.0
instagram,unknown_creator,p_4,2025-11-11T15:24:51.628Z,,0,0,0,BRAZIL,0.0,0.0,0.0
instagram,unknown_creator,p_5,2025-11-11T15:24:51.628Z,,0,0,0,GERMANY,0.0,0.0,0.0
instagram,unknown_creator,p_6,2025-11-11T15:24:51.628Z,,0,0,0,SPAIN,0.0,0.0,0.0
tiktok,unknown_creator,p_7,2025-11-11T15:24:51.628Z,,0,0,0,AUSTRALIA,0.0,0.0,0.0
youtube,unknown_creator,p_8,2025-11-11T15:24:51.628Z,,0,0,0,GERMANY,0.0,0.0,0.0
tiktok,unknown_creator,p_9,2025-11-11T15:24:51.628Z,,0,0,0,INDONESIA,0.0,0.0,0.0
