In [None]:
import os
from pyspark import StorageLevel
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [None]:
animal_df = (
    spark.read.format("csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .option("mode", "FAILFAST")
    .load("s3://animal-center/stage/animal-center-raw.csv")
)

In [None]:
animal_df.persist(storageLevel=StorageLevel.MEMORY_ONLY)

Out[523]: DataFrame[Animal ID: string, Name: string, DateTime: string, MonthYear: string, Found Location: string, Intake Type: string, Intake Condition: string, Animal Type: string, Sex upon Intake: string, Age upon Intake: string, Breed: string, Color: string]

In [None]:
animal_df = animal_df.dropDuplicates()

In [None]:
animal_df = (
    animal_df
    .withColumnRenamed("Animal ID", "animal_id")
    .withColumnRenamed("Name", "name")
    .withColumnRenamed("DateTime", "datetime")
    .withColumnRenamed("Found Location", "found_location")
    .withColumnRenamed("Intake Type", "intake_type")
    .withColumnRenamed("Intake Condition", "intake_condition")
    .withColumnRenamed("Animal Type", "animal_type")
    .withColumnRenamed("Sex upon Intake", "gender_upon_intake")
    .withColumnRenamed("Age upon Intake", "age_upon_intake")
    .withColumnRenamed("Breed", "breed")
    .withColumnRenamed("Color", "color")
)

In [None]:
animal_df = animal_df.na.fill({"name": "dummy-name"})

In [None]:
def remove_star(name):
    if name and '*' in name:
        return name[1:]
    return name
  
remove_star_udf = udf(remove_star, StringType())
animal_df = animal_df.withColumn("name", remove_star_udf(col("name")))

In [None]:
animal_df = animal_df.withColumn("datetime", 
    to_timestamp(col("datetime"), "MM/dd/yyyy hh:mm:ss a")
)

In [None]:
def convert_to_days(age_string):
    total_days = 0
    parts = age_string.split()
    
    for age in range(0, len(parts), 2):
        if age + 1 >= len(parts):
            break
            
        number = int(parts[age])
        unit = parts[age + 1].lower().rstrip('s')
        
        if unit == 'year':
            total_days += number * 365
        elif unit == 'month':
            total_days += number * 30
        elif unit == 'week':
            total_days += number * 7
        elif unit == 'day':
            total_days += number
            
    return total_days

convert_age_udf = udf(convert_to_days, IntegerType())

animal_df = (
  animal_df
  .withColumn("age_days", convert_age_udf(col("age_upon_intake")))
  .withColumn("age(in years)", round((col("age_days") / 365), 2))
  .drop("age_upon_intake", "MonthYear")
)

In [None]:
animal_df = animal_df.withColumn(
  "age_days",
  when(col("age_days") < 0, col("age_days") * -1)
  .otherwise(col("age_days"))
)

animal_df = animal_df.withColumn(
    "age_category",
    when(col("age_days") <= 365, "Puppy/Kitten")
    .when(col("age_days") <= 365 * 7, "Adult")
    .otherwise("Senior"),
).drop("age_days")

In [None]:
jdbc_url = "jdbc:mysql://austin-animal-center-db.cf20yui40eho.ap-south-1.rds.amazonaws.com:3306/austin_animal_center"

connection_properties = {
    "user": "anirudhone",
    "password": "anirudh07",
    "batchsize": "10000",                    
    "rewriteBatchedStatements": "true",    
    "useServerPrepStmts": "true",            
    "cachePrepStmts": "true",                
    "useCompression": "true",                
    "socketTimeout": "60000",               
    "autoReconnect": "true",                 
    "useSSL": "false",                       
    "verifyServerCertificate": "false",
}

animal_df.write \
    .mode("overwrite") \
    .option("numPartitions", 10) \
    .option("partitionColumn", "age(in years)") \
    .option("lowerBound", "1") \
    .option("upperBound", "100000") \
    .jdbc(
        url=jdbc_url,
        table="animals",
        properties=connection_properties
    )

In [None]:
(
  animal_df.coalesce(1).write
  .format("csv")
  .option("header","true") 
  .save("s3://animal-center/curated", mode="overwrite")
)