# **PySpark Transformations**

### **DATA READING**

In [1]:
df_customers = spark.read.format("parquet")\
                    .load("abfss://DP700_DEV@onelake.dfs.fabric.microsoft.com/bronze_lakehouse.Lakehouse/Files/azure_raw_csv/olist_customers_dataset.parquet")

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 3, Finished, Available, Finished)

In [2]:
df_orderitems = spark.read.format("parquet")\
                        .load("abfss://DP700_DEV@onelake.dfs.fabric.microsoft.com/bronze_lakehouse.Lakehouse/Files/azure_raw_csv/olist_order_items_dataset.parquet")

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 4, Finished, Available, Finished)

**inferSchema**

In [3]:
df_payments = spark.read.option("inferSchema",True)\
        .parquet("abfss://DP700_DEV@onelake.dfs.fabric.microsoft.com/bronze_lakehouse.Lakehouse/Files/azure_raw_csv/olist_order_payments_dataset.parquet")
# df now is a Spark DataFrame containing parquet data from "abfss://DP700_DEV@onelake.dfs.fabric.microsoft.com/bronze_lakehouse.Lakehouse/Files/azure_raw_csv/olist_order_payments_dataset.parquet".
display(df_payments)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 5, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0336234e-8a48-4338-9898-69726b044859)

In [4]:
df_reviews = spark.read.parquet("abfss://DP700_DEV@onelake.dfs.fabric.microsoft.com/bronze_lakehouse.Lakehouse/Files/azure_raw_csv/olist_order_reviews_dataset.parquet")
# df now is a Spark DataFrame containing parquet data from "abfss://DP700_DEV@onelake.dfs.fabric.microsoft.com/bronze_lakehouse.Lakehouse/Files/azure_raw_csv/olist_order_reviews_dataset.parquet".
display(df_reviews)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8f62fd56-b1de-426a-be96-ed7cca81e129)

In [5]:
df_products = spark.read.parquet("abfss://DP700_DEV@onelake.dfs.fabric.microsoft.com/bronze_lakehouse.Lakehouse/Files/azure_raw_csv/olist_products_dataset.parquet")
# df now is a Spark DataFrame containing parquet data from "abfss://DP700_DEV@onelake.dfs.fabric.microsoft.com/bronze_lakehouse.Lakehouse/Files/azure_raw_csv/olist_products_dataset.parquet".
display(df_products)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, fbe2d86b-d1a1-4874-bac7-3324ead64827)

In [6]:
display(df_orderitems)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a38e612a-8d96-4deb-85f5-8dfa4f8012d8)

In [7]:
df_orders = spark.read.format("csv").option("header","true").load("abfss://DP700_DEV@onelake.dfs.fabric.microsoft.com/bronze_lakehouse.Lakehouse/Files/AzureDataIngestion/olist_orders_dataset.csv")
# df now is a Spark DataFrame containing CSV data from "abfss://DP700_DEV@onelake.dfs.fabric.microsoft.com/bronze_lakehouse.Lakehouse/Files/AzureDataIngestion/olist_orders_dataset.csv".

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 9, Finished, Available, Finished)

In [8]:
display(df_customers.limit(5))

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 10, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a3ba97fe-06c0-4a89-86ea-168f06ba9bc5)

### **DATA TRANSFORMATION**

In [9]:
from pyspark.sql.functions import * 
from pyspark.sql.types import *

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 11, Finished, Available, Finished)

**SELECT**

In [10]:
df_customers = df_customers.select("customer_id","customer_unique_id","customer_zip_code_prefix")

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 12, Finished, Available, Finished)

In [11]:
display(df_customers)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0d498860-9801-41b5-bf5e-b4974ef02b15)

**withColumnRenamed**

In [12]:
df_customers = df_customers.withColumnRenamed("customer_unique_id","unique_id")\
                            .withColumnRenamed("customer_zip_code_prefix","customer_zip_code")

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 14, Finished, Available, Finished)

In [13]:
display(df_customers)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 15, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5a744efc-2771-4fa7-a625-77c32daf21bf)

**withColumn TYPE CASTING**

In [14]:
df_customers = df_customers.withColumn("customer_zip_code",col("customer_zip_code").cast(IntegerType()))

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 16, Finished, Available, Finished)

In [15]:
display(df_customers)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 17, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0e0988ee-bb9a-4ffe-ae89-53a2c5d17c02)

**printSchema**

In [16]:
df_customers.printSchema()

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 18, Finished, Available, Finished)

root
 |-- customer_id: string (nullable = true)
 |-- unique_id: string (nullable = true)
 |-- customer_zip_code: integer (nullable = true)



In [17]:
df_orderitems.printSchema()

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 19, Finished, Available, Finished)

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: string (nullable = true)
 |-- price: string (nullable = true)
 |-- freight_value: string (nullable = true)



**Timestamp Conversion**

In [18]:
df_orderitems = df_orderitems.withColumn("shipping_limit_date",col("shipping_limit_date").cast(TimestampType()))

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 20, Finished, Available, Finished)

In [19]:
display(df_orderitems)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 21, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, fb3d5aab-8037-424e-9abc-416c0830e443)

In [20]:
df_orderitems = df_orderitems.withColumn("price",col("price").cast(FloatType()))\
                    .withColumn("freight_value",col("freight_value").cast(FloatType()))

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 22, Finished, Available, Finished)

**ReplaceValues**

In [21]:
df_payments = df_payments.withColumn("payment_type",regexp_replace(col("payment_type"),"_"," "))\
                        .withColumn("payment_installments",col("payment_installments").cast(IntegerType()))\
                        .withColumn("payment_value",col("payment_value").cast(IntegerType()))

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 23, Finished, Available, Finished)

In [22]:
display(df_payments)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 24, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b22c1870-f1fe-4a86-9d14-40dc7fd349d5)

**String Functions**

In [23]:
df_reviews = df_reviews.withColumn("review_comment_title",upper("review_comment_title"))\
                .withColumn("review_comment_message",lower("review_comment_message"))

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 25, Finished, Available, Finished)

In [24]:
display(df_reviews)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 26, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 28e1b295-9447-46c8-8c6c-4526e0913264)

**Handling Nulls**

In [25]:
display(df_products)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 27, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 760cb4a8-8832-4b2d-809f-30e40cb723cf)

In [26]:
df_products = df_products.fillna({"product_category_name":"x","product_name_lenght":"y","product_photos_qty":"z"})

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 28, Finished, Available, Finished)

In [27]:
display(df_products)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 29, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, acff18c0-f557-4683-b491-11ab70b6814d)

In [28]:
df_products = df_products.withColumn("poduct_weight_g",col("product_weight_g").cast(IntegerType()))\
                        .withColumn("product_length_cm",col("product_length_cm").cast(IntegerType()))\
                        .withColumn("product_height_cm",col("product_height_cm").cast(IntegerType()))\
                        .withColumn("product_width_cm",col("product_width_cm").cast(IntegerType()))

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 30, Finished, Available, Finished)

In [29]:
display(df_products)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 31, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 91b9ae31-b730-4abb-b00b-bf43db804036)

**FILTER**

In [30]:
display(df_products.filter(col("product_weight_g")>300))

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 32, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, aeb0a290-4ff0-4c3c-a42c-33a6e593b0ba)

**Flagging - Create a new column**

In [31]:
df_products = df_products.withColumn("300gFlag",when(col("product_weight_g")>300,"Y").otherwise("N"))
display(df_products)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 33, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 77f45a40-39e5-4eb8-8daa-546e8e2d6a52)

In [32]:
display(df_orders)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 34, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, af7f1965-901f-4f8d-a265-be99e2300e2c)

In [33]:
df_orders = df_orders.dropna(subset = ['order_delivered_carrier_date','order_delivered_customer_date'])

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 35, Finished, Available, Finished)

In [34]:
df_orders = df_orders.withColumn("order_purchase_timestamp",col("order_purchase_timestamp").cast(TimestampType()))

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 36, Finished, Available, Finished)

**SPARK SQL**

In [35]:
df_orders.createOrReplaceTempView("order_temp_view")

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 37, Finished, Available, Finished)

In [36]:
display(spark.sql("select * from order_temp_view"))

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 38, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1ff30871-65a4-4f38-aa68-de40a2dc48df)

In [37]:
%%sql 

SELECT *,
        row_number() over(order by order_id) as row_number
FROM 
    order_temp_view

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 39, Finished, Available, Finished)

<Spark SQL result set with 1000 rows and 9 fields>

In [38]:
df_sql = spark.sql("""SELECT *,
        row_number() over(order by order_id) as row_number
FROM 
    order_temp_view""")

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 40, Finished, Available, Finished)

In [39]:
display(df_sql)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 41, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 8072ff2b-a849-4933-9784-40af24afaf5c)

**Visualization**

In [40]:
display(df_sql)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 42, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1f8f1832-de46-482d-99d1-2a9107026f15)

In [41]:
display(df_products)

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 43, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ae009998-8c7a-446c-a321-c064fa5abcac)

### **Data Writing Scenarios**

**Writing To Files**

In [42]:
df_sql.write.format("delta")\
        .mode("append")\
        .option("path","Files/raw_source")\
        .save()

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 44, Finished, Available, Finished)

**Writing as a Managed Table**

In [50]:
df_sql.write.format("csv")\
        .mode("append")\
        .saveAsTable("bronze_lakehouse.csv_man_tbl")

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 52, Finished, Available, Finished)

**Writing as an External Table**

In [51]:
df_sql.write.format("delta")\
        .mode("append")\
        .option("path","Files/my_data")\
        .saveAsTable("ext_table_delta")

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 53, Finished, Available, Finished)

In [52]:
df_sql.write.format("delta")\
        .mode("append")\
        .option("path","abfss://DP700_DEV@onelake.dfs.fabric.microsoft.com/bronze_lakehouse.Lakehouse/Files/my_data")\
        .saveAsTable("ext_table_delta_ext")

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 54, Finished, Available, Finished)

### **DATA WRITING**

In [55]:
df_customers.write.format("delta")\
        .mode("append")\
        .saveAsTable("Silver_LH.enr_customers")

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 57, Finished, Available, Finished)

In [56]:
df_reviews.write.format("delta")\
        .mode("append")\
        .saveAsTable("Silver_LH.enr_reviews")

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 58, Finished, Available, Finished)

In [57]:
df_payments.write.format("delta")\
        .mode("append")\
        .saveAsTable("Silver_LH.enr_payments")

df_orderitems.write.format("delta")\
        .mode("append")\
        .saveAsTable("Silver_LH.enr_orderitems")

df_products.write.format("delta")\
        .mode("append")\
        .saveAsTable("Silver_LH.enr_products")

df_orders.write.format("delta")\
        .mode("append")\
        .saveAsTable("Silver_LH.enr_orders")

StatementMeta(, 88c7ddb5-5c6e-4730-bea9-d890f9f66e32, 59, Finished, Available, Finished)