In [8]:
from dotenv import load_dotenv
from pyspark.sql import SparkSession, functions as F
from google.cloud import storage

load_dotenv()

False

In [9]:
outcomes_map = {'Rto-Adopt':1, 
                'Adoption':2, 
                'Euthanasia':3, 
                'Transfer':4,
                'Return to Owner':5, 
                'Died':6, 
                'Disposal':7,
                'Missing':8,
                'Relocate':9,
                'N/A':10,
                'Stolen':11}

In [10]:
spark = SparkSession.builder.appName("animal_shelter").getOrCreate()

23/12/17 17:47:57 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [13]:
# This needs to change
df = spark.read.csv(f"/Users/ujas/Downloads/Austin_Animal_Center_Outcomes.csv", header=True, inferSchema=True)
df.show()

+---------+----------+--------------------+---------+-------------+---------------+---------------+-----------+----------------+----------------+--------------------+------------------+
|Animal ID|      Name|            DateTime|MonthYear|Date of Birth|   Outcome Type|Outcome Subtype|Animal Type|Sex upon Outcome|Age upon Outcome|               Breed|             Color|
+---------+----------+--------------------+---------+-------------+---------------+---------------+-----------+----------------+----------------+--------------------+------------------+
|  A794011|     Chunk|05/08/2019 06:20:...| May 2019|   05/02/2017|      Rto-Adopt|           NULL|        Cat|   Neutered Male|         2 years|Domestic Shorthai...| Brown Tabby/White|
|  A776359|     Gizmo|07/18/2018 04:02:...| Jul 2018|   07/12/2017|       Adoption|           NULL|        Dog|   Neutered Male|          1 year|Chihuahua Shortha...|       White/Brown|
|  A821648|      NULL|08/16/2020 11:38:...| Aug 2020|   08/16/2019|   

In [None]:
# GCS stuff

In [None]:
def transform_data(date):
    # some more gcs stuff

    data = spark.read.csv(f"/Users/ujas/Downloads/Austin_Animal_Center_Outcomes.csv", header=True, inferSchema=True)

    data = prep_data(data)

    dim_animal_agg = prep_animal_dim(data)
    dim_dates = prep_date_dim(data)
    dim_outcome_types = prep_outcome_types_dim(data)
    fct_outcomes = prep_outcomes_fct(data)

    # upload to GCS

def prep_data(data):
    # removing stars from animal names
    data = data.withColumn("Name", F.regexp_replace(F.col("Name"), "\*", ""))




In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, date_format, concat_ws, when
from google.cloud import storage
from pyspark.sql.functions import year, month, dayofmonth, regexp_replace
from dotenv import load_dotenv
load_dotenv() #load environment variables

# Initialize Spark Session
spark = SparkSession.builder.appName("DataTransformation").getOrCreate()

# GCS stuff
storage_client = storage.Client('oceanic-hangout-406022')
bucket = storage_client.bucket('outcomes_bucket')

def transform_data(date):
    # Check if the blob exists
    blob = bucket.blob(f'extracted/{date}_outcomes.csv')
    if not blob.exists():
        return

    # Read data directly from GCS into a DataFrame
    df = spark.read.csv(f"/Users/ujas/Downloads/Austin_Animal_Center_Outcomes.csv", header=True, inferSchema=True)

    df = prep_data(df)

    dim_animal_agg = prep_animal_dim(df)
    dim_dates = prep_date_dim(df)
    dim_outcome_types = prep_outcome_types_dim(df)
    fct_outcomes = prep_outcomes_fct(df)

    # Upload transformed data to GCS
    # Note: You may need to adjust this for your environment and ensure the necessary permissions
    # This is a basic example of writing to GCS
    # dim_animal_agg.write.csv(f"gs://outcomes_bucket/transformed/dim_animal_agg.csv", header=True)
    # dim_dates.write.csv(f"gs://outcomes_bucket/transformed/{date}_dim_dates.csv", header=True)
    # dim_outcome_types.write.csv(f"gs://outcomes_bucket/transformed/{date}_dim_outcome_types.csv", header=True)
    # fct_outcomes.write.csv(f"gs://outcomes_bucket/transformed/{date}_fct_outcomes.csv", header=True)

def prep_data(df):
    # Apply transformations similar to the Pandas version
    df = df.withColumn("name", regexp_replace(col("name"), "\*", ""))
    
    # Mapping for 'sex' and 'is_fixed'
    sex_map = {"Neutered Male":"M", "Intact Male":"M", "Intact Female":"F", "Spayed Female":"F", "Unknown":None}
    is_fixed_map = {"Neutered Male":True, "Intact Male":False, "Intact Female":False, "Spayed Female":True, "Unknown":None}

    df = df.withColumn("sex", when(col("sex_upon_outcome").isin(sex_map.keys()), col("sex_upon_outcome")).otherwise(None))
    df = df.withColumn("is_fixed", when(col("sex_upon_outcome").isin(is_fixed_map.keys()), col("sex_upon_outcome")).otherwise(None))

    # Converting dates and times
    df = df.withColumn("ts", to_date(col("datetime")))
    df = df.withColumn("date_id", date_format(col("ts"), 'yyyyMMdd'))
    df = df.withColumn("time", date_format(col("ts"), 'HH:mm:ss'))

    # Replace outcome_type with outcome_type_id
    for outcome, id in outcomes_map.items():
        df = df.withColumn('outcome_type_id', when(col('outcome_type') == outcome, id).otherwise(col('outcome_type_id')))

    df = df.drop('outcome_type').withColumnRenamed('outcome_type_id', 'outcome_type')

    return df

def prep_animal_dim(df):
    animal_dim = df.select("animal_id", "name", "date_of_birth", "sex", "animal_type", "breed", "color")
    animal_dim = animal_dim.withColumnRenamed("date_of_birth", "dob")

    # Load existing data if exists, and append
    # Implement logic similar to the Pandas version using PySpark functions
    # Example (may need adjustments based on your specific requirements):
    # dim_animal_agg = spark.read.csv("existing_path").union(animal_dim).dropDuplicates()

    return animal_dim.dropDuplicates()

def prep_date_dim(df):
    dates_dim = df.select(
        date_format(col("ts"), 'yyyyMMdd').alias("date_id"),
        date_format(col("ts"), 'yyyy-MM-dd').alias("date"),
        year(col("ts")).alias("year"),
        month(col("ts")).alias("month"),
        dayofmonth(col("ts")).alias("day")
    )
    return dates_dim.dropDuplicates()

def prep_outcome_types_dim(df):
    outcome_types_dim = spark.createDataFrame(outcomes_map.items(), ["outcome_type", "outcome_type_id"])
    return outcome_types_dim

def prep_outcomes_fct(df):
    outcomes_fct = df.select("animal_id", "date_id", "time", "outcome_type_id", "outcome_subtype", "is_fixed")
    return outcomes_fct

# Example usage
transform_data('2023-11-21')


                                                                                

SparkRuntimeException: [UNSUPPORTED_FEATURE.LITERAL_TYPE] The feature is not supported: Literal for '[Unknown, Intact Male, Intact Female, Spayed Female, Neutered Male]' of class java.util.HashSet.

In [3]:
# Initialize spark
spark = pyspark.sql.SparkSession.builder.appName("shelter").getOrCreate()

# read in data
data = spark.read.csv("/Users/ujas/Downloads/Austin_Animal_Center_Outcomes.csv", header=True, inferSchema=True)
data.show(5)

+---------+-----+--------------------+---------+-------------+------------+---------------+-----------+----------------+----------------+--------------------+-----------------+
|Animal ID| Name|            DateTime|MonthYear|Date of Birth|Outcome Type|Outcome Subtype|Animal Type|Sex upon Outcome|Age upon Outcome|               Breed|            Color|
+---------+-----+--------------------+---------+-------------+------------+---------------+-----------+----------------+----------------+--------------------+-----------------+
|  A794011|Chunk|05/08/2019 06:20:...| May 2019|   05/02/2017|   Rto-Adopt|           NULL|        Cat|   Neutered Male|         2 years|Domestic Shorthai...|Brown Tabby/White|
|  A776359|Gizmo|07/18/2018 04:02:...| Jul 2018|   07/12/2017|    Adoption|           NULL|        Dog|   Neutered Male|          1 year|Chihuahua Shortha...|      White/Brown|
|  A821648| NULL|08/16/2020 11:38:...| Aug 2020|   08/16/2019|  Euthanasia|           NULL|      Other|         Unk

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, to_timestamp, date_format
from pyspark.sql.types import StringType, BooleanType

spark = SparkSession.builder.appName("transform").getOrCreate()

def read_data_from_gcs(date):
    file_path = f"gs://outcomes_bucket/extracted/{date}_outcomes.csv"
    return spark.read.csv(file_path, header=True, inferSchema=True)


In [None]:
def prep_data(data):
    data = data.withColumn("name", col("name").replace("*", "", "regex"))
    data = data.withColumn("sex", col("sex_upon_outcome").replace({"Neutered Male": "M", "Intact Male": "M", "Intact Female": "F", "Spayed Female": "F", "Unknown": None}))
    data = data.withColumn("is_fixed", col("sex_upon_outcome").replace({"Neutered Male": True, "Intact Male": False, "Intact Female": False, "Spayed Female": True, "Unknown": None}))
    data = data.withColumn("ts", to_timestamp(col("datetime"), "yyyy-MM-dd HH:mm:ss"))
    data = data.withColumn("date_id", date_format(col("ts"), "yyyyMMdd"))
    data = data.withColumn("time", date_format(col("ts"), "HH:mm:ss"))
    data = data.withColumn("outcome_type_id", col("outcome_type").replace(outcomes_map))
    return data
