In [0]:
df = spark.read.format("csv").load("/Volumes/de_use_cases_UC/vendor_dirty_schema/vendor_dirty_volume")

In [0]:
df = spark.read.format("csv").option("header","true").load("/Volumes/de_use_cases_UC/vendor_dirty_schema/vendor_dirty_volume")

In [0]:
display(df)

In [0]:
df.show()

In [0]:
df = spark.read.format("csv").option("header","true")\
    .option("inferSchema","true").load("/Volumes/de_use_cases_UC/vendor_dirty_schema/vendor_dirty_volume")

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("amount", IntegerType(), True),
    StructField("currency", StringType(), True),
    StructField("order_date", DateType(), True)
])
df = spark.read.format("csv")\
    .option("header","true")\
    .schema(schema)\
    .load("/Volumes/de_use_cases_UC/vendor_dirty_schema/vendor_dirty_volume")

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, IntegerType
from pyspark.sql.functions import to_date, date_format, col, coalesce
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("amount", IntegerType(), True),
    StructField("currency", StringType(), True),
    StructField("order_date", DateType(), True)
])
df = spark.read.format("csv")\
    .option("header","true")\
    .schema(schema)\
    .load("/Volumes/de_use_cases_UC/vendor_dirty_schema/vendor_dirty_volume")

df = df.withColumn(
    "order_date",
    coalesce(
        to_date(col("order_date"), "yyyy-MM-dd"),
        to_date(col("order_date"), "yyyy/MM/dd"),
        to_date(col("order_date"), "dd-MM-yyyy")
    )
)

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import to_date, col, coalesce

schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("amount", IntegerType(), True),
    StructField("currency", StringType(), True),
    StructField("order_date", StringType(), True)   # ðŸ‘ˆ MUST be StringType
])

In [0]:
df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("/Volumes/de_use_cases_UC/vendor_dirty_schema/vendor_dirty_volume")


In [0]:
from pyspark.sql.functions import when, col, to_date

df = df.withColumn(
    "order_date",
    when(col("order_date").rlike("^[0-9]{4}-[0-9]{2}-[0-9]{2}$"),
         to_date(col("order_date"), "yyyy-MM-dd"))
    .when(col("order_date").rlike("^[0-9]{4}/[0-9]{2}/[0-9]{2}$"),
         to_date(col("order_date"), "yyyy/MM/dd"))
    .when(col("order_date").rlike("^[0-9]{2}-[0-9]{2}-[0-9]{4}$"),
         to_date(col("order_date"), "dd-MM-yyyy"))
)

display(df)

In [0]:
from pyspark.sql.functions import upper, when, col

df = df.withColumn(
    "currency",
    when(col("currency") == "usd", "USD")
    .when(col("currency") == "aed", "AED")
    .otherwise(col("currency"))
)
display(df)

In [0]:
df = df.fillna(0)
display(df)

In [0]:
df.createOrReplaceTempView("df")

In [0]:
%sql
select distinct currency from df

In [0]:
sum_amount = df.groupBy("currency").agg({"amount": "sum"})
display(sum_amount)

In [0]:
df.write.mode("overwrite").saveAsTable("de_use_cases_UC.vendor_dirty_schema.tbl_vendor_dirty")