In [0]:
%sql
USE CATALOG de_project_catalog;
USE SCHEMA olist_bronze;

In [0]:
Source_path = "/Volumes/de_project_catalog/olist_bronze/raw_data/raw_source"
Bronze_base_path = "/Volumes/de_project_catalog/olist_bronze/raw_data/bronze"

### Orders

In [0]:
orders = ( spark.read
                   .format("csv")
                   .option("header", "True")
                  .load(f"{Source_path}/orders/orders.csv")
            )

In [0]:
display(orders)

In [0]:
orders.printSchema()

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema_orders = StructType([
                  StructField("order_id", StringType(), True),
                  StructField("customer_id", StringType(), True),
                  StructField("order_status",StringType(), True),
                  StructField("order_purchase_timestamp", TimestampType(), True),
                  StructField("order_approved_at", TimestampType(), True),
                  StructField("order_delivered_carrier_date", TimestampType(), True),
                  StructField("order_delivered_customer_date", TimestampType(), True),
                  StructField("order_estimated_delivery_date", TimestampType(), True),
                  # bad rows will be placed in this column
                  StructField("corrupt_data", StringType(), True)
                            ])

In [0]:
orders_df = (
            spark.read
            .format("csv")
            .option("header", True)
            .option("mode","PERMISSIVE")
            .option("columnNameOfCorruptRecord", "corrupt_data")
            .schema(schema_orders)
             .load(f"{Source_path}/orders/orders.csv")
)


In [0]:
orders_df.printSchema()

In [0]:
orders_df.display()

In [0]:
bad_records = orders_df.filter(orders_df.corrupt_data.isNotNull())
bad_records.display()

In [0]:
Bronze_base_path = "/Volumes/de_project_catalog/olist_bronze/raw_data/bronze"

orders_df.write\
     .mode("overwrite")\
         .parquet(f"{Bronze_base_path}/orders/")

### Customers


In [0]:
customers = ( spark.read
                   .format("csv")
                   .option("header", "True")
                  .load(f"{Source_path}/customers/customers.csv")
            )

In [0]:
customers.display()

In [0]:
customers.printSchema()

In [0]:
customers_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("customer_unique_id", StringType(), True),
    StructField("customer_zip_code_prefix", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True),
    StructField("corrupt_record", StringType(), True)
])

In [0]:
customers_df = (
    spark.read
        .format("csv")
        .option("header", True)
        .option("mode", "PERMISSIVE")
        .option("columnNameOfCorruptRecord", "corrupt_record")
        .schema(customers_schema)
        .load(f"{Source_path}/customers/customers.csv")
)

In [0]:
customers_df.printSchema()

In [0]:
customers_df.display()

In [0]:
customers_df.filter("corrupt_record is Not Null").count()

In [0]:
customers_df.write\
     .mode("overwrite")\
         .parquet(f"{Bronze_base_path}/customers/")

### Order Items

In [0]:
order_items = (
   spark.read
   .format("csv")
   .option("header", True)
    .load(f"{Source_path}/order_items/order_items.csv")
)

In [0]:
order_items.printSchema()

In [0]:

from pyspark.sql.types import DoubleType

order_items_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("order_item_id", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("seller_id", StringType(), True),
    StructField("shipping_limit_date", TimestampType(), True),
    StructField("price", DoubleType(), True),
    StructField("freight_value", DoubleType(), True),

    StructField("corrupt_data", StringType(), True)
])

In [0]:
order_items_df = (
     spark.read
     .format("csv")
     .option("mode", "PERMISSIVE")
     .option("columnNameOfCorruptRecord", "corrupt_data")
     .schema(order_items_schema)
     .load(f"{Source_path}/order_items/order_items.csv")
)

In [0]:
order_items_df.printSchema()

In [0]:
order_items_df.filter("corrupt_data is NOT NULL").count()

In [0]:
order_items_df.write\
    .mode("overwrite")\
          .parquet(f"{Bronze_base_path}/order_items/")

### Order Payments

In [0]:
order_payments = (
   spark.read
   .format("csv")
   .option("header", True)
    .load(f"{Source_path}/payments/order_payments.csv")
)

In [0]:
order_payments.printSchema()

In [0]:
payments_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("payment_sequential", IntegerType(), True),
    StructField("payment_type", StringType(), True),
    StructField("payment_installments", IntegerType(), True),
    StructField("payment_value", DoubleType(), True),
    StructField("corrupt_data", StringType(), True)
])

In [0]:
order_payments_df = (
     spark.read
     .format("csv")
     .option("mode", "PERMISSIVE")
     .option("columnNameOfCorruptRecord", "corrupt_data")
     .schema(payments_schema)
     .load(f"{Source_path}/payments/order_payments.csv")
)

In [0]:
order_payments_df.printSchema()

In [0]:
order_payments_df.filter("corrupt_data is Not Null").count()

In [0]:
order_items_df.write\
    .mode("overwrite")\
          .parquet(f"{Bronze_base_path}/payments/")