In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,when,lit,current_timestamp

In [0]:
%run /Workspace/Users/bishalbishal6659@gmail.com/utilited/utilites

In [0]:
print(bronze_schema)

In [0]:
dbutils.widgets.text("catalog","fmcg","catalog")
dbutils.widgets.text("data_source","customers","data Source")

In [0]:
%python
catalog = dbutils.widgets.get("catalog")
data_source = dbutils.widgets.get("data_source")
base_path = f"s3://sportsbar-dp-vishal/{data_source}/*.csv"
print(base_path)

In [0]:
%python
from pyspark.sql.functions import current_timestamp

df = (
    spark.read.format("csv")
    .option("inferSchema", True)
    .load(base_path)
    .withColumn("Read_timestamp", current_timestamp())
    .withColumnRenamed("_c1", "name")
    .select(
        "*",
        "_metadata.file_name",
        "_metadata.file_size"
    )
)
display(df)

In [0]:
df.write\
    .format("delta")\
    .option("delta.enableChangeDataFeed", "true")\
    .mode("append")\
    .saveAsTable(f"{catalog}.{bronze_schema}.{data_source}")

In [0]:
df_bronze =spark.sql(f"SELECT * FROM {catalog}.{bronze_schema}.{data_source};")
display(df_bronze)



In [0]:
#duplicate
df_duplicate_number = df_bronze.groupBy("_c0").count().filter(col("count")>1)
display(df_duplicate_number)
#total number of col before and after 
print(f"the number of count col: ",df_bronze.count())
print(f"rows after cout,{df_duplicate_number.count()}")

In [0]:
#drop duplicate 
df_silver = df_bronze.dropDuplicates(["_c0"])
display(df_silver)
print("before drop duplicate",df_bronze.count())
print("after drop duplicate",df_silver.count())

In [0]:
#name space remove
from pyspark.sql.functions import trim
display(df_silver.filter(col("name")!=trim(col("name"))))


In [0]:
#to remove the space need to used trim function 

df_silver=df_silver.withColumn("name",trim(col("name")))
display(df_silver.filter(col("name")!=trim(col("name"))))

In [0]:
#chack city  name Distinct 
display(df_silver.select("_c2").distinct())

In [0]:
%python
from pyspark.sql.functions import col, when

city_map = {
    "Bengaluru": "Bengaluru",
    "Bengalore": "Bengaluru",
    "Bengaluruu": "Bengaluru",
    "Hyderabad": "Hyderabad",
    "Hyderabadd": "Hyderabad",
    "Hyderbad": "Hyderabad",
    "New Delhi": "New Delhi",
    "NewDelhi": "New Delhi",
    "NewDelhee": "New Delhi",
    "NewDheli": "New Delhi"
}

allowed = ["Bengaluru", "Hyderabad", "New Delhi"]

df_silver = (
    df_silver
    .replace(city_map, subset=["_c2"])
    .withColumn(
        "_c2",
        when(
            col("_c2").isNull(), None
        ).when(
            col("_c2").isin(allowed), col("_c2")
        ).otherwise(None)
    )
)

In [0]:
display(df_silver.distinct())

In [0]:
%python
from pyspark.sql.functions import col, when, initcap, isnull

df_silver = df_silver.withColumn(
    "name",
    when(
        isnull(col("name")), None
    ).otherwise(
        initcap(col("name"))
    )
)

display(df_silver.distinct())

In [0]:
import pyspark.sql.functions as F
df_silver.filter(F.col("_c2").isNull()).show(truncate=False)

In [0]:
null_customer_name = ["Primefuel Nutrition","Recovery Lane","Sprintx Nutrition","Zenathlete Foods"]
df_silver.filter(F.col("name").isin(null_customer_name)).show()

In [0]:
customer_city_fix={
    789403:"New Delhi",
    789420:"Bengaluru",
    789521:"Hyderabad",
    789603:"Hyderabad"

}

df_fix = spark.createDataFrame([(k,v)for k,v in customer_city_fix.items()],["customer_id","fixed_city"])

In [0]:
display(df_fix)

In [0]:
display(df_silver)

In [0]:
from pyspark.sql import functions as f
df_silver = df_silver.withColumnRenamed( "customer_id","city")
display(df_silver)

In [0]:
df_silver.printSchema()
df_fix.printSchema()
display(df_fix)

In [0]:
from pyspark.sql import functions as F

silver = df_silver.alias("s")
fix = df_fix.alias("f")

fixed_df = (
    silver
    .join(
        fix,
        F.col("s._c0").cast("string") == F.col("f.customer_id").cast("string"),
        "left"
    )
    .withColumn(
        "city",
        F.coalesce(F.col("s.city"), F.col("f.fixed_city"))
    )
    .drop(F.col("f.fixed_city"),F.col("customer_id")
)
)


df_silver =fixed_df
display(df_silver)

need to transform the data based on the gold table


In [0]:
df_add = (
    df_silver
    .withColumn(
        "customer",
        F.concat_ws("-", F.col("name"), F.coalesce(F.col("city"), F.lit("Unknown")))
    )
    .withColumn("marker", F.lit("india"))
    .withColumn("platform", F.lit("Sports Bar"))
    .withColumn("channel", F.lit("Acquisition"))
)

display(df_add.limit(10))

**load this to silver layer**


In [0]:
df_silver = df_add

df_silver.write \
    .format("delta") \
    .option("delta.enableChangeDataFeed", "true") \
    .mode("overwrite") \
    .saveAsTable("fmcg.silver.customers")
