In [0]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("DateTransformation").getOrCreate()

In [0]:
from pyspark.sql.functions import col, to_date, udf, expr, when, initcap, lower
from pyspark.sql.types import DecimalType, StringType, IntegerType, FloatType, StructType, StructField
from dateutil.parser import parse
from common.transforms import infer_and_transform_date, transform_price, remove_special_characters, separate_camel_case, transform_provider_name
import re

# Register the UDFs
infer_and_transform_date_udf = udf(infer_and_transform_date, StringType())
transform_price_udf = udf(transform_price, DecimalType())
remove_special_characters_udf = udf(remove_special_characters, StringType())
separate_camel_case_udf = udf(separate_camel_case, StringType())
transform_provider_name_udf = udf(transform_provider_name, StringType())

# Read the table into a DataFrame
df = spark.read.table("workspace.default.products_from_csv")

# Convert data types using the UDFs
df = df.withColumn("RawPrice", col("Price")) \
       .withColumn("Price", transform_price_udf(col("Price"))) \
       .withColumn("RawLastReviewDt", col("LastReviewDt")) \
       .withColumn("LastReviewDt", infer_and_transform_date_udf(col("LastReviewDt"))) \
       .withColumn("RawDescription", col("Description")) \
       .withColumn("Description", remove_special_characters_udf(col("Description"))) \
       .withColumn("ProviderName", initcap(transform_provider_name_udf(col("ProviderName")))) \
       .withColumn("IsValidPrice", when(col("Price").isNull(), False).otherwise(True))

In [0]:
from pyspark.sql.functions import col, to_date, udf, expr, when, initcap, lower
from pyspark.sql.types import DecimalType, StringType, IntegerType, FloatType, StructType, StructField
from pyspark.sql import SparkSession
from dateutil.parser import parse
from decimal import *
import re

# Initialize Spark session
spark = SparkSession.builder.appName("DateTransformation").getOrCreate()

# Define a function to infer the date format and transform the date string
def infer_and_transform_date(date_str):
    try:
        parsed_date = parse(date_str, fuzzy=True)
        return parsed_date.strftime("%Y-%m-%d")
    except Exception as e:
        return None

# Define a function to transform the Price column
def transform_price(price_str):
    try:
        cleaned_price_str = price_str.replace(".", "").replace(",", "").replace("$", "").replace(" ", "")
        return Decimal(cleaned_price_str)
    except Exception as e:
        return None

# Define a function to remove special characters
def remove_special_characters(text):
    try:
        return re.sub(r'[^A-Za-z0-9/ ]+', '', text)
    except Exception as e:
        return None

# Define a function to separate camel case
def separate_camel_case(text):
    try:
        return re.sub(r'([a-z])([A-Z])', r'\1 \2', text)
    except Exception as e:
        return None

# Define a function to transform the ProviderName column
def transform_provider_name(provider_name):
    try:
        cleaned_name = remove_special_characters(provider_name)
        separated_name = separate_camel_case(cleaned_name)
        return separated_name
    except Exception as e:
        return None

# Define a function to extract the unit of measure and the measure from a given string
def extract_measure_and_unit(measure_str):
    try:
        measure = re.findall(r"(\d+\.?\d*)\s*([a-zA-Z]{1,2})", measure_str)
        packageUnits =  re.findall(r"[x]\s*(\d+)", measure_str)
        return (measure[0][0] if measure else None, measure[0][1] if measure else None, packageUnits[0] if packageUnits else None)
    except Exception as e:
        return (None, None, None)

# Define the schema for the returned STRUCT type
measure_unit_schema = StructType([
    StructField("Measure", StringType(), True),
    StructField("UnitOfMeasure", StringType(), True),
    StructField("PackageUnits", StringType(), True)
])

# Register the UDFs
infer_and_transform_date_udf = udf(infer_and_transform_date, StringType())
transform_price_udf = udf(transform_price, DecimalType())
remove_special_characters_udf = udf(remove_special_characters, StringType())
separate_camel_case_udf = udf(separate_camel_case, StringType())
transform_provider_name_udf = udf(transform_provider_name, StringType())
extract_measure_and_unit_udf = udf(extract_measure_and_unit, measure_unit_schema)

# Read the table into a DataFrame
df = spark.read.table("workspace.default.products_from_csv")

# Convert data types using the UDFs
df = df.withColumn("RawPrice", col("Price")) \
       .withColumn("Price", transform_price_udf(col("Price"))) \
       .withColumn("RawLastReviewDt", col("LastReviewDt")) \
       .withColumn("LastReviewDt", infer_and_transform_date_udf(col("LastReviewDt"))) \
       .withColumn("RawDescription", col("Description")) \
       .withColumn("Description", remove_special_characters_udf(col("Description"))) \
       .withColumn("ProviderName", initcap(transform_provider_name_udf(col("ProviderName")))) \
       .withColumn("IsValidPrice", when(col("Price").isNull(), False).otherwise(True)) \
       .withColumn("MeasureUnit", extract_measure_and_unit_udf(col("Description"))) \
       .withColumn("Measure", col("MeasureUnit.Measure")) \
       .withColumn("UnitOfMeasure", col("MeasureUnit.UnitOfMeasure")) \
       .withColumn("PackageUnits", col("MeasureUnit.PackageUnits")) \
       .withColumn("UnitOfMeasure", lower(col("UnitOfMeasure")))

# Display the DataFrame
df.display()

In [0]:
import pytz

timezones = spark.createDataFrame(pytz.all_timezones, ["Timezone"])

timezones.display()