In [0]:
df_products = spark.read.csv(
    "/Volumes/workspace/retail_schema/raw/Products.csv",
    header=True,
    inferSchema=True
)

In [0]:
df_products.show(5)

+------------+---------------------+----------------+-----------------+-----------------+----------------+
|  product_id|product_category_name|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+------------+---------------------+----------------+-----------------+-----------------+----------------+
|90K0C1fIyQUf|                 toys|           491.0|             19.0|             12.0|            16.0|
|qejhpMGGVcsl|        watches_gifts|           440.0|             18.0|             14.0|            17.0|
|qUS5d2pEAyxJ| costruction_tools...|          2200.0|             16.0|             16.0|            16.0|
|639iGvMyv0De|                 toys|          1450.0|             68.0|              3.0|            48.0|
|1lycYGcsic2F|                 toys|           300.0|             17.0|              4.0|            12.0|
+------------+---------------------+----------------+-----------------+-----------------+----------------+
only showing top 5 rows


In [0]:
df_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_weight_g: double (nullable = true)
 |-- product_length_cm: double (nullable = true)
 |-- product_height_cm: double (nullable = true)
 |-- product_width_cm: double (nullable = true)



In [0]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DecimalType, DoubleType, FloatType

products_schema = StructType([
    StructField("product_id", StringType(), nullable=False),
    StructField("product_category_name", StringType(), nullable=True),
    StructField("product_weight_g", DoubleType(), nullable=True),
    StructField("product_length_cm", DoubleType(), nullable=True),
    StructField("product_height_cm", DoubleType(), nullable=True),
    StructField("product_width_cm", DoubleType(), nullable=True),
])


In [0]:
df_products = (
    spark.read
    .option("header", "true")
    .schema(products_schema)
    .csv("/Volumes/workspace/retail_schema/raw/Products.csv")
)


In [0]:
df_products.show(5)

+------------+---------------------+----------------+-----------------+-----------------+----------------+
|  product_id|product_category_name|product_weight_g|product_length_cm|product_height_cm|product_width_cm|
+------------+---------------------+----------------+-----------------+-----------------+----------------+
|90K0C1fIyQUf|                 toys|           491.0|             19.0|             12.0|            16.0|
|qejhpMGGVcsl|        watches_gifts|           440.0|             18.0|             14.0|            17.0|
|qUS5d2pEAyxJ| costruction_tools...|          2200.0|             16.0|             16.0|            16.0|
|639iGvMyv0De|                 toys|          1450.0|             68.0|              3.0|            48.0|
|1lycYGcsic2F|                 toys|           300.0|             17.0|              4.0|            12.0|
+------------+---------------------+----------------+-----------------+-----------------+----------------+
only showing top 5 rows


In [0]:
df_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_weight_g: double (nullable = true)
 |-- product_length_cm: double (nullable = true)
 |-- product_height_cm: double (nullable = true)
 |-- product_width_cm: double (nullable = true)



In [0]:
from pyspark.sql.functions import col

df_products = df_products.filter(
    col("product_id").isNotNull()
)


In [0]:
df_products.write \
    .format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .save("/Volumes/workspace/retail_schema/staging/Products")
