In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
%%bash
# This is a Bash cell
wget -O - https://redfin-public-data.s3.us-west-2.amazonaws.com/redfin_market_tracker/city_market_tracker.tsv000.gz | aws s3 cp - s3://redfine-data-zone-7266/store_raw_data_yml/city_market_tracker.tsv000.gz

In [None]:
spark = SparkSession.builder.appName("RedfinDataAnalysis").getOrCreate()

In [None]:
redfin_data = spark.read.csv("s3://redfine-data-zone-7266/store_raw_data_yml/city_market_tracker.tsv000.gz", header=True, inferSchema=True, sep= "\t")

In [None]:
display(redfin_data)

In [None]:
#Check the schema
redfin_data.printSchema()

In [None]:
#print column names
redfin_data.columns

In [None]:
df_redfin = redfin_data.select(['period_end','period_duration', 'city', 'state', 'property_type',
    'median_sale_price', 'median_ppsf', 'homes_sold', 'inventory', 'months_of_supply', 'median_dom', 'sold_above_list', 'last_updated'])
display(df_redfin)

In [None]:
#check total number of rows
print(f"Total number of rows: {df_redfin.count()}")

In [None]:
from pyspark.sql.functions import isnull
# Count null values in each column
null_counts = [df_redfin.where(isnull(col_name)).count() for col_name in df_redfin.columns]
null_counts

In [None]:
# Display the results
for i, col_name in enumerate(df_redfin.columns):
    print(f"{col_name}: {null_counts[i]} null values")

In [None]:
# Check for missing values in the entire DataFrame
remaining_count = df_redfin.na.drop().count()

print(f"Number of missing rows: {df_redfin.count() - remaining_count}")

In [None]:
print(f"Total number of remaining rows: {remaining_count}")

In [None]:
#remove na and count total number of remaining rows
df_redfin = df_redfin.na.drop()
print(f"Total number of rows: {df_redfin.count()}")

In [None]:
# Count null values in each column to confirm if we have removed all na
null_counts = [df_redfin.where(isnull(col_name)).count() for col_name in df_redfin.columns]
null_counts

In [None]:
from pyspark.sql.functions import year, month

#Extract year from period_end and save in a new column "period_end_yr"
df_redfin = df_redfin.withColumn("period_end_yr", year(col("period_end")))

#Extract month from period_end and save in a new column "period_end_month"
df_redfin = df_redfin.withColumn("period_end_month", month(col("period_end")))

In [None]:
# Drop period_end and last_updated columns
df_redfin = df_redfin.drop("period_end", "last_updated")

In [None]:
display(df_redfin3)

In [None]:
from pyspark.sql.functions import when

#let's map the month number to their respective month name.

df_redfin = df_redfin.withColumn("period_end_month", 
                   when(col("period_end_month") == 1, "January")
                   .when(col("period_end_month") == 2, "February")
                   .when(col("period_end_month") == 3, "March")
                   .when(col("period_end_month") == 4, "April")
                   .when(col("period_end_month") == 5, "May")
                   .when(col("period_end_month") == 6, "June")
                   .when(col("period_end_month") == 7, "July")
                   .when(col("period_end_month") == 8, "August")
                   .when(col("period_end_month") == 9, "September")
                   .when(col("period_end_month") == 10, "October")
                   .when(col("period_end_month") == 11, "November")
                   .when(col("period_end_month") == 12, "December")
                   .otherwise("Unknown")
                 )

In [None]:
display(df_redfin)

In [None]:
#let us write the final dataframe into our s3 bucket as a parquet file.
s3_bucket = "s3://redfine-data-zone-7266/redfine_transform_zone/redfin_data.parquet"
df_redfin.write.mode("overwrite").parquet(s3_bucket)