In [0]:
spark.conf.set(
    "fs.azure.account.key.ecomadls.dfs.core.windows.net",
    "hrT7eB1bOxjk3A7VjbsGkIcvaNDGHJrQZZgZntf2qNyp/YPiRdDwrlS9bVjcDKgTfgNVoVjcsauT+AStJZzasw=="
)

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Initialize Spark session
spark = SparkSession.builder.appName('EcomDataPipeline').getOrCreate()

# Define the paths for your Azure Data Lake
input_path = 'abfss://landing-zone-2@ecomadls.dfs.core.windows.net/users-raw/'  # Input path in Azure Data Lake
output_path = 'abfss://processed-user-data@ecomadls.dfs.core.windows.net/'  # Output path in Azure Data Lake
archive_path = 'abfss://landing-zone-2@ecomadls.dfs.core.windows.net/archive/users-raw/'  # Archive path in Azure Data Lake

# Read the data from Azure Data Lake
usersDF = spark.read.format('parquet').load(input_path)

usersDF.write.format('delta').mode('overwrite').save('/mnt/delta/tables/bronze/users')

usersDF = spark.read.format('delta').load('/mnt/delta/tables/bronze/users')


# Perform transformations
usersDF = usersDF.withColumn('country', upper(col('country')))

usersDF = usersDF.withColumn('language_full', expr(
    "CASE WHEN language = 'EN' THEN 'ENGLISH' " +
    "WHEN language = 'ES' THEN 'SPANISH' " +
    "ELSE 'Other' END"))

usersDF = usersDF.withColumn('gender', 
                             when(col('gender').startswith('M'), 'Male')
                             .when(col('gender').startswith('F'), 'Female')
                             .otherwise('Other'))

usersDF = usersDF.withColumn('civilitytitle_clean', regexp_replace("civilitytitle", "(Mme|Ms|Mrs)", "Ms"))

usersDF = usersDF.withColumn("years_since_last_login", col("dayssincelastlogin") / 365)

usersDF = usersDF.withColumn('account_age_years', round(col('seniority')/365, 2))
usersDF = usersDF.withColumn('account_age_group', when(col('account_age_years') < 1, 'New')
                             .when((col('account_age_years') >= 1) & (col('account_age_years') < 3), 'Intermediate')
                             .otherwise("Experienced"))

usersDF = usersDF.withColumn('current_year', year(current_date()))

usersDF = usersDF.withColumn('user_descriptor', 
                             concat(col('gender'), lit('_'),
                                    col('countryCode'), lit('_'),
                                    expr('substring(civilitytitle_clean, 1, 3)'), lit('_'),
                                         col('language_full')))

usersDF = usersDF.withColumn("flag_long_title", length(col('civilitytitle')) > 10)

usersDF = usersDF.withColumn("hasanyapp", col("hasanyapp").cast("boolean"))
usersDF = usersDF.withColumn("hasandroidapp", col("hasandroidapp").cast("boolean"))
usersDF = usersDF.withColumn("hasiosapp", col("hasiosapp").cast("boolean"))
usersDF = usersDF.withColumn("hasprofilepicture", col("hasprofilepicture").cast("boolean"))

usersDF = usersDF.withColumn("socialnbfollowers", col("socialnbfollowers").cast(IntegerType()))
usersDF = usersDF.withColumn("socialnbfollows", col("socialnbfollows").cast(IntegerType()))

usersDF = usersDF.withColumn("productspassrate", col("productspassrate").cast(DecimalType(10, 2)))
usersDF = usersDF.withColumn("seniorityasmonths", col("seniorityasmonths").cast(DecimalType(10, 2)))
usersDF = usersDF.withColumn("seniorityasyears", col("seniorityasyears").cast(DecimalType(10, 2)))

usersDF = usersDF.withColumn("dayssincelastlogin",
                             when(col("dayssincelastlogin").isNotNull(),
                                  col("dayssincelastlogin").cast(IntegerType()))
                             .otherwise(0))

# Save the transformed data to Delta table
usersDF.write.format('delta').mode('append').saveAsTable('processed_users')

# Write the transformed data back to another folder in Azure Data Lake
usersDF.write.format('delta').mode('append').save(output_path)

# Move files from the input folder to the archive folder
files = dbutils.fs.ls(input_path)
for file in files:
    dbutils.fs.mv(file.path, archive_path + file.name)


