In [0]:
# # Extract raw CSV
# raw_df = spark.read.option("header", True).csv("abfss://hgcontainer@hgstoragenew.dfs.core.windows.net/landing/customer_churn_data.csv")

# # display(raw_df)

# # Load to Bronze
# raw_df.write.format("delta").mode("overwrite").option("path","abfss://hgcontainer@hgstoragenew.dfs.core.windows.net/bronze/telecom_churn_data").saveAsTable("dev.bronze.telecom_churn_data")

####Landing to Bronze

In [0]:
from pyspark.sql.functions import input_file_name, current_timestamp
from pyspark.sql import Row
import os
# Define paths and log table
landing_path = "abfss://hgcontainer@hgstoragenew.dfs.core.windows.net/landing/"
bronze_path = "abfss://hgcontainer@hgstoragenew.dfs.core.windows.net/bronze/telecom_churn_data/"
log_table = "dev.bronze.file_ingestion_log"

# Step 1: Create ingestion log table if not exists
spark.sql(f"""
CREATE TABLE IF NOT EXISTS dev.bronze.file_ingestion_log (
    file_name STRING,
    file_path STRING,
    load_timestamp TIMESTAMP,
    status STRING
)
USING DELTA
LOCATION 'abfss://hgcontainer@hgstoragenew.dfs.core.windows.net/bronze/file_ingestion_log/'
""")

# Step 2: List all .csv files in landing zone
all_files = [f.path for f in dbutils.fs.ls(landing_path) if f.path.endswith(".csv")]

print(all_files)

# Step 3: Get already processed files
processed_files = (
    spark.table(log_table)
    .select("file_path")
    .rdd.flatMap(lambda x: x)
    .collect()
)
print(processed_files)
# Step 4: Filter new/unprocessed files
new_files = [f for f in all_files if f not in processed_files]
print(new_files)
# Step 5: Ingest new files and save as Parquet, then log
for file_path in new_files:
    try:
        # Read CSV from landing
        df = spark.read.option("header", True).csv(file_path)
        df = df.withColumn("source_file", input_file_name())
            
        # Write as Parquet to bronze path using file_name folder to separate
        file_name = os.path.basename(file_path).replace(".csv", "")
        df.write.mode("overwrite").parquet(f"{bronze_path}/{file_name}")

        # Log successful ingestion
        log_df = spark.createDataFrame([
            Row(file_name=os.path.basename(file_path), file_path=file_path, status="SUCCESS")
        ]).withColumn("load_timestamp", current_timestamp())

        log_df.write.mode("append").saveAsTable(log_table)

    except Exception as e:
        print(f"Failed to load {file_path}: {e}")

        # Log failure
        log_df = spark.createDataFrame([
            Row(file_name=os.path.basename(file_path), file_path=file_path, status="FAILED")
        ]).withColumn("load_timestamp", current_timestamp())

        log_df.write.mode("append").saveAsTable(log_table)


###Bronze to silver

In [0]:
from pyspark.sql.functions import col, sha2, current_timestamp
from pyspark.sql import Row
import os

bronze_base_path = "abfss://hgcontainer@hgstoragenew.dfs.core.windows.net/bronze/telecom_churn_data/"
#silver_path = "abfss://hgcontainer@hgstoragenew.dfs.core.windows.net/silver/"



# Create log table if not exists
spark.sql(f"""
CREATE TABLE IF NOT EXISTS dev.silver.bronze_to_silver_log (
    file_name STRING,
    file_path STRING,
    processed_at TIMESTAMP,
    status STRING
)
USING DELTA
LOCATION 'abfss://hgcontainer@hgstoragenew.dfs.core.windows.net/silver/bronze_to_silver_log/'
""")

# List all folders in Bronze
bronze_folders = [f.path for f in dbutils.fs.ls(bronze_base_path) if f.isDir()]
print( bronze_folders)
# Get already processed
processed_paths = (
    spark.table("dev.silver.bronze_to_silver_log")
    .select("file_path")
    .rdd.flatMap(lambda x: x)
    .collect()
)

# Filter only unprocessed folders
new_folders = [f for f in bronze_folders if f not in processed_paths]

# Process each new folder
for file_path in new_folders:
    try:
        df = spark.read.option("header", True).parquet(file_path)

        df_filled = df.fillna({
            "Age": 0,
            "Gender": "Unknown",
            "Tenure": 0,
            "MonthlyCharges": 0.0,
            "ContractType": "Unknown",
            "InternetService": "Unknown",
            "TotalCharges": 0.0,
            "TechSupport": "Unknown",
            "Churn": "Unknown"
        })

        df_anonymized = df_filled.withColumn("CustomerID", sha2(col("CustomerID"), 256)) \
                                 .withColumn("processed_at", current_timestamp())

        df_cleaned = df_anonymized.filter(
            (col("MonthlyCharges") >= 0) & (col("TotalCharges") >= 0)
        )

        df_cleaned.write.mode("append").format("delta").option("path", "abfss://hgcontainer@hgstoragenew.dfs.core.windows.net/silver/telecom_churn_data").saveAsTable("dev.silver.customer_data")

        log_df = spark.createDataFrame([
    Row(file_name=os.path.basename(file_path.rstrip("/")), file_path=file_path, status="SUCCESS")
]).withColumn("processed_at", current_timestamp())
        log_df.write.mode("append").format("delta").saveAsTable("dev.silver.bronze_to_silver_log")

    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        log_df = spark.createDataFrame([
    Row(file_name=os.path.basename(file_path.rstrip("/")), file_path=file_path, status="FAILED")
]).withColumn("processed_at", current_timestamp())
        log_df.write.mode("append").format("delta").saveAsTable("dev.silver.bronze_to_silver_log")


In [0]:
# %sql 
# drop table dev.silver.customer_data

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
    .appName("SparkByExamples.com").getOrCreate()

data = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]

rdd=spark.sparkContext.parallelize(data)


In [0]:
rdd_count = rdd.map(lambda x:(x,1)).reduceByKey(lambda x,y: x+y)
rdd_count.collect()

In [0]:
data = [('James','Smith','M',30,'UK'),
  ('Anna','Rose','F',41,'DBA'),
  ('Robert','Williams','M',62,'DEL'), 
]

map_country = {'UK': 'USA', 'DBA': 'Canada', 'DEL': 'Germany'} 
broadcast_map = sc.broadcast(map_country)   
columns = ["firstname","lastname","gender","salary","Country"]
df = spark.createDataFrame(data=data, schema = columns)
df.show()

In [0]:
from pyspark.sql.functions import col,concat_ws,concat
df1 = df.withColumn("name",concat(df.firstname,df.lastname)).show()
df2 = df.withColumn("name", concat_ws(" ",df.firstname, df.lastname))
df2.show()

In [0]:
df2.rdd.map(lambda x: (x["name"], x["gender"], x["salary"],broadcast_map.value.get(x["Country"], "Unknown"))).collect()

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

data = ["Project Gutenberg’s",
        "Alice’s Adventures in Wonderland",
        "Project Gutenberg’s",
        "Adventures in Wonderland",
        "Project Gutenberg’s"]
rdd=spark.sparkContext.parallelize(data)
for element in rdd.collect():
    print(element)

In [0]:
data = [
 ("James,,Smith",["Java","Scala","C++"],["Spark","Java"],"OH","CA"),
 ("Michael,Rose,",["Spark","Java","C++"],["Spark","Java"],"NY","NJ"),
 ("Robert,,Williams",["CSharp","VB"],["Spark","Python"],"UT","NV")
]

from pyspark.sql.types import StringType, ArrayType,StructType,StructField
schema = StructType([ 
    StructField("name",StringType(),True), 
    StructField("languagesAtSchool",ArrayType(StringType()),True), 
    StructField("languagesAtWork",ArrayType(StringType()),True), 
    StructField("currentState", StringType(), True), 
    StructField("previousState", StringType(), True)
  ])

df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()

In [0]:
from pyspark.sql.functions import explode, col
df = df.withColumn("languagesAtSchool",explode(col("languagesAtSchool")))
df.show()

In [0]:
from pyspark.sql.functions import split
df2 = df.withColumn("fname",split(col("name"),",")[0]) .withColumn("mname",split(col("name"),",")[1]).withColumn("lname",split(col("name"),",")[2]).drop("name").show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Create a Spark session
spark = SparkSession.builder.appName("SparkByExamples").getOrCreate()

# Sample Data with two lists
names = ["Ricky", "Bunny", "Coco"]
ages = [10, 15, 20]

df = spark.createDataFrame(zip(names, ages),["names", "ages"])
df.withColumn("ages", col("ages").cast("String")).show()
                         

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

# Create SparkSession
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

# Prepare Data
data = [("James", "Sales", 3000), \
    ("Michael", "Sales", 4600), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", 3300), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000), \
    ("Saif", "Sales", 4100) \
  ]

# Create DataFrame
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = data, schema = columns)
df.printSchema()
df.show(truncate=False)


In [0]:
df.distinct().show()

In [0]:
from pyspark.sql.functions import col, replace
columns = df.columns
df1 = df.select([col(c).alias(f"{c}_add") for c in columns])
df1.show()
columns1 = df1.columns
print(columns1)
df2 = df1.select(*[col(c).alias(c.replace("_add","")) for c in columns1])
df2.show()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import min, max

# Create Spark session
spark = SparkSession.builder.appName("event_status_example").getOrCreate()

# Sample data
data = [
    ("success", "2024-01-01"),
    ("success", "2024-01-03"),
    ("failed", "2024-01-02"),
    ("in_progress", "2024-01-04"),
    ("success", "2024-01-05"),
    ("failed", "2024-01-06"),
    ("in_progress", "2024-01-07"),
    ("failed", "2024-01-08"),
    ("success", "2024-01-09"),
]

# Create DataFrame
df = spark.createDataFrame(data, ["event_status", "event_date"])

# Show the base data
df.show()


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
#window_spec = window.partitionBy("event_status").orderBy(("event_date").desc())
window_spec = Window.partitionBy("event_status").orderBy(col("event_date"))
df_with_lag = df.withColumn("start_date", lead("event_date").over(window_spec))
df_with_lag.show()