Use the Data Science Agent
https://docs.databricks.com/aws/en/notebooks/ds-agent

Machine learning:
"Perform some data preparation and feature engineering to prepare this dataset for model training."

"Train a classification model on the @customer_data dataset to predict churn. Evaluate the model with accuracy and AUC metrics."

"Perform hyperparameter tuning on a regression model using the @housing_prices dataset to improve prediction error."

"Build a clustering model on the @sales_leads dataset to identify customer segments and provide a summary of each cluster’s characteristics."

In [0]:
# Load the UK housing dataset
df = spark.table("caio_moreno.uk_house_advisor_bot.silver_pp_complete")

# Display basic information about the dataset
print(f"Dataset shape: {df.count()} rows, {len(df.columns)} columns")
print("\nColumn names and types:")
df.printSchema()

# Show first few rows
print("\nFirst 5 rows:")
display(df.limit(5))

In [0]:
# Simple data exploration
from pyspark.sql.functions import col, count, when, min, max, avg, stddev

# Basic counts
total_rows = df.count()
print(f"Total rows: {total_rows:,}")

# Check for nulls in key columns
print("\nNull counts in key columns:")
key_columns = ['price', 'date_of_transfer', 'postcode', 'property_type', 'county']
for col_name in key_columns:
    null_count = df.filter(col(col_name).isNull()).count()
    print(f"{col_name}: {null_count} nulls ({null_count/total_rows*100:.2f}%)")

# Price statistics
print("\nPrice statistics:")
price_stats = df.select(
    min('price').alias('min_price'),
    max('price').alias('max_price'),
    avg('price').alias('avg_price'),
    stddev('price').alias('std_price')
).collect()[0]

print(f"Min price: £{price_stats['min_price']:,.2f}")
print(f"Max price: £{price_stats['max_price']:,.2f}")
print(f"Avg price: £{price_stats['avg_price']:,.2f}")
print(f"Std price: £{price_stats['std_price']:,.2f}")

In [0]:
# Comprehensive data quality assessment
from pyspark.sql.functions import col, when, isnan, isnull, count, sum as spark_sum

# Check for duplicates
print("=== DUPLICATE ANALYSIS ===")
total_records = df.count()
unique_transactions = df.select('transaction_id').distinct().count()
print(f"Total records: {total_records:,}")
print(f"Unique transaction IDs: {unique_transactions:,}")
print(f"Duplicate transactions: {total_records - unique_transactions:,}")

# Check for invalid prices
print("\n=== PRICE QUALITY CHECKS ===")
invalid_prices = df.filter((col('price') <= 0) | col('price').isNull()).count()
print(f"Invalid prices (<=0 or null): {invalid_prices:,}")

# Extreme price outliers (using IQR method)
price_quartiles = df.select('price').approxQuantile('price', [0.25, 0.75], 0.01)
if len(price_quartiles) == 2:
    q1, q3 = price_quartiles
    iqr = q3 - q1
    lower_bound = q1 - 1.5 * iqr
    upper_bound = q3 + 1.5 * iqr
    outliers = df.filter((col('price') < lower_bound) | (col('price') > upper_bound)).count()
    print(f"Price outliers (IQR method): {outliers:,} ({outliers/total_records*100:.2f}%)")
    print(f"Price range (Q1-Q3): £{q1:,.0f} - £{q3:,.0f}")

# Check date range
print("\n=== DATE QUALITY CHECKS ===")
date_stats = df.select(
    min('date_of_transfer').alias('min_date'),
    max('date_of_transfer').alias('max_date')
).collect()[0]
print(f"Date range: {date_stats['min_date']} to {date_stats['max_date']}")

# Check for missing postcodes (critical for location features)
missing_postcodes = df.filter(col('postcode').isNull() | (col('postcode') == '')).count()
print(f"Missing postcodes: {missing_postcodes:,} ({missing_postcodes/total_records*100:.2f}%)")

In [0]:
# Feature Engineering - Date and Time Features
from pyspark.sql.functions import (
    year, month, dayofmonth, dayofweek, quarter,
    when, col, regexp_extract, length, split,
    log, sqrt, abs as spark_abs
)

# Create date-based features
df_features = df.withColumn('year', year('date_of_transfer')) \
                .withColumn('month', month('date_of_transfer')) \
                .withColumn('quarter', quarter('date_of_transfer')) \
                .withColumn('day_of_week', dayofweek('date_of_transfer')) \
                .withColumn('day_of_month', dayofmonth('date_of_transfer'))

# Create seasonal features
df_features = df_features.withColumn('season',
    when(col('month').isin([12, 1, 2]), 'Winter')
    .when(col('month').isin([3, 4, 5]), 'Spring')
    .when(col('month').isin([6, 7, 8]), 'Summer')
    .otherwise('Autumn')
)

# Weekend indicator
df_features = df_features.withColumn('is_weekend',
    when(col('day_of_week').isin([1, 7]), 1).otherwise(0)
)

print("Date and time features created:")
print("- year, month, quarter, day_of_week, day_of_month")
print("- season (Winter/Spring/Summer/Autumn)")
print("- is_weekend (0/1)")

In [0]:
# Price-based features
from pyspark.sql.functions import log, when, col

# Log price (helps with skewed price distribution)
df_features = df_features.withColumn('log_price', log(col('price')))

# Price categories based on UK housing market segments
df_features = df_features.withColumn('price_category',
    when(col('price') < 150000, 'Budget')
    .when(col('price') < 300000, 'Mid-range')
    .when(col('price') < 500000, 'Premium')
    .when(col('price') < 1000000, 'Luxury')
    .otherwise('Ultra-luxury')
)

# Price per square meter proxy (using property type as rough size indicator)
# This is a simplified approach - in real scenarios you'd have actual size data
df_features = df_features.withColumn('price_type_ratio',
    when(col('property_type') == 'F', col('price') / 50)  # Flat - assume ~50 sqm avg
    .when(col('property_type') == 'T', col('price') / 80)  # Terraced - assume ~80 sqm avg
    .when(col('property_type') == 'S', col('price') / 90)  # Semi-detached - assume ~90 sqm avg
    .when(col('property_type') == 'D', col('price') / 120) # Detached - assume ~120 sqm avg
    .otherwise(col('price') / 75)  # Other - assume ~75 sqm avg
)

print("Price features created:")
print("- log_price (log transformation)")
print("- price_category (Budget/Mid-range/Premium/Luxury/Ultra-luxury)")
print("- price_type_ratio (price per estimated sqm)")

In [0]:
# Location-based features
from pyspark.sql.functions import regexp_extract, length, split, col, when, upper

# Extract postcode components
# UK postcodes have format: Area(1-2 letters) + District(1-2 digits) + Sector(1 digit) + Unit(2 letters)
df_features = df_features.withColumn('postcode_area', 
    regexp_extract(col('postcode'), r'^([A-Z]{1,2})', 1)
)

df_features = df_features.withColumn('postcode_district',
    regexp_extract(col('postcode'), r'^[A-Z]{1,2}([0-9]{1,2})', 1)
)

df_features = df_features.withColumn('postcode_sector',
    regexp_extract(col('postcode'), r'([0-9])[A-Z]{2}$', 1)
)

# London indicator (major price driver)
london_areas = ['E', 'EC', 'N', 'NW', 'SE', 'SW', 'W', 'WC']
df_features = df_features.withColumn('is_london',
    when(col('postcode_area').isin(london_areas), 1).otherwise(0)
)

# Major city indicators
major_cities = ['Manchester', 'Birmingham', 'Leeds', 'Glasgow', 'Sheffield', 
                'Bradford', 'Liverpool', 'Edinburgh', 'Bristol', 'Cardiff']
df_features = df_features.withColumn('is_major_city',
    when(col('town_city').isin(major_cities), 1).otherwise(0)
)

# County grouping (group smaller counties)
df_features = df_features.withColumn('county_group',
    when(col('county').isin(['GREATER LONDON', 'LONDON']), 'London')
    .when(col('county').isin(['GREATER MANCHESTER', 'MANCHESTER']), 'Manchester')
    .when(col('county').isin(['WEST MIDLANDS', 'BIRMINGHAM']), 'West Midlands')
    .when(col('county').isin(['WEST YORKSHIRE', 'SOUTH YORKSHIRE', 'NORTH YORKSHIRE']), 'Yorkshire')
    .otherwise('Other')
)

print("Location features created:")
print("- postcode_area, postcode_district, postcode_sector")
print("- is_london (0/1)")
print("- is_major_city (0/1)")
print("- county_group (London/Manchester/West Midlands/Yorkshire/Other)")

In [0]:
# Property-specific features
from pyspark.sql.functions import col, when, length, regexp_extract

# Property age indicator (new vs old)
df_features = df_features.withColumn('is_new_property',
    when(col('old_new') == 'Y', 1).otherwise(0)
)

# Leasehold vs Freehold
df_features = df_features.withColumn('is_freehold',
    when(col('duration') == 'F', 1).otherwise(0)
)

# Property type hierarchy (size/value proxy)
df_features = df_features.withColumn('property_size_score',
    when(col('property_type') == 'F', 1)  # Flat - smallest
    .when(col('property_type') == 'T', 2)  # Terraced
    .when(col('property_type') == 'S', 3)  # Semi-detached
    .when(col('property_type') == 'D', 4)  # Detached - largest
    .otherwise(2)  # Other - assume medium
)

# Address completeness score (more complete address might indicate better data quality/area)
df_features = df_features.withColumn('address_completeness',
    (when(col('paon').isNotNull() & (col('paon') != ''), 1).otherwise(0) +
     when(col('saon').isNotNull() & (col('saon') != ''), 1).otherwise(0) +
     when(col('street').isNotNull() & (col('street') != ''), 1).otherwise(0) +
     when(col('locality').isNotNull() & (col('locality') != ''), 1).otherwise(0))
)

print("Property features created:")
print("- is_new_property (0/1)")
print("- is_freehold (0/1)")
print("- property_size_score (1-4, size proxy)")
print("- address_completeness (0-4, completeness score)")

In [0]:
# Prepare categorical variables for ML
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

# Define categorical columns to encode
categorical_cols = [
    'property_type', 'county_group', 'season', 'price_category',
    'postcode_area', 'ppd_category_type', 'record_status'
]

# Create StringIndexers for categorical variables
indexers = [StringIndexer(inputCol=col, outputCol=col + '_indexed', handleInvalid='keep') 
           for col in categorical_cols]

# Create OneHotEncoders
encoders = [OneHotEncoder(inputCol=col + '_indexed', outputCol=col + '_encoded') 
           for col in categorical_cols]

# Combine indexers and encoders
stages = indexers + encoders

# Create and fit pipeline
encoding_pipeline = Pipeline(stages=stages)
encoding_model = encoding_pipeline.fit(df_features)
df_encoded = encoding_model.transform(df_features)

print("Categorical encoding completed for:")
for col in categorical_cols:
    print(f"- {col} -> {col}_indexed -> {col}_encoded")

In [0]:
# Data cleaning and filtering
from pyspark.sql.functions import col, isnan, isnull

# Remove records with invalid prices
df_clean = df_encoded.filter(
    (col('price') > 0) & 
    col('price').isNotNull() & 
    (col('price') < 10000000)  # Remove extreme outliers (>£10M)
)

# Remove records with missing critical information
df_clean = df_clean.filter(
    col('postcode').isNotNull() & 
    (col('postcode') != '') &
    col('property_type').isNotNull() &
    col('date_of_transfer').isNotNull()
)

# Remove duplicate transactions (keep first occurrence)
df_clean = df_clean.dropDuplicates(['transaction_id'])

print(f"Data cleaning completed:")
original_count = df_encoded.count()
cleaned_count = df_clean.count()
print(f"Original records: {original_count:,}")
print(f"Cleaned records: {cleaned_count:,}")
print(f"Removed: {original_count - cleaned_count:,} ({(original_count - cleaned_count)/original_count*100:.2f}%)")

In [0]:
# Feature scaling and final dataset preparation
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# Define numerical features for scaling
numerical_features = [
    'log_price', 'year', 'month', 'quarter', 'day_of_week',
    'property_size_score', 'address_completeness', 'price_type_ratio'
]

# Define binary features (no scaling needed)
binary_features = [
    'is_weekend', 'is_new_property', 'is_freehold', 'is_london', 'is_major_city'
]

# Define encoded categorical features
encoded_features = [
    'property_type_encoded', 'county_group_encoded', 'season_encoded',
    'postcode_area_encoded', 'ppd_category_type_encoded'
]

# Assemble numerical features for scaling
numerical_assembler = VectorAssembler(
    inputCols=numerical_features,
    outputCol='numerical_features_raw'
)

# Scale numerical features
scaler = StandardScaler(
    inputCol='numerical_features_raw',
    outputCol='numerical_features_scaled',
    withStd=True,
    withMean=True
)

# Create scaling pipeline
scaling_pipeline = Pipeline(stages=[numerical_assembler, scaler])
scaling_model = scaling_pipeline.fit(df_clean)
df_scaled = scaling_model.transform(df_clean)

print("Feature scaling completed for numerical features:")
for feat in numerical_features:
    print(f"- {feat}")

In [0]:
# Assemble final feature vector for ML
from pyspark.ml.feature import VectorAssembler

# Combine all features into final feature vector
all_feature_cols = (
    binary_features + 
    encoded_features + 
    ['numerical_features_scaled']
)

final_assembler = VectorAssembler(
    inputCols=all_feature_cols,
    outputCol='features'
)

# Create final ML-ready dataset
df_ml_ready = final_assembler.transform(df_scaled)

# Select only necessary columns for ML
ml_dataset = df_ml_ready.select(
    'transaction_id',
    'price',  # target variable
    'log_price',  # alternative target (for regression)
    'features',  # feature vector
    'date_of_transfer',
    'postcode',
    'property_type',
    'county',
    'town_city'
)

print("Final ML dataset created with columns:")
for col in ml_dataset.columns:
    print(f"- {col}")

print(f"\nFinal dataset shape: {ml_dataset.count():,} rows")

In [0]:
# Create train/validation/test splits
from pyspark.sql.functions import rand

# Add random column for splitting
ml_dataset_with_split = ml_dataset.withColumn('rand', rand(seed=42))

# Create splits: 70% train, 15% validation, 15% test
train_data = ml_dataset_with_split.filter(col('rand') < 0.7).drop('rand')
val_data = ml_dataset_with_split.filter((col('rand') >= 0.7) & (col('rand') < 0.85)).drop('rand')
test_data = ml_dataset_with_split.filter(col('rand') >= 0.85).drop('rand')

print("=== DATA PREPARATION SUMMARY ===")
print(f"Training set: {train_data.count():,} records ({train_data.count()/ml_dataset.count()*100:.1f}%)")
print(f"Validation set: {val_data.count():,} records ({val_data.count()/ml_dataset.count()*100:.1f}%)")
print(f"Test set: {test_data.count():,} records ({test_data.count()/ml_dataset.count()*100:.1f}%)")

print("\n=== FEATURES CREATED ===")
print("Date Features: year, month, quarter, season, day_of_week, is_weekend")
print("Price Features: log_price, price_category, price_type_ratio")
print("Location Features: postcode components, is_london, is_major_city, county_group")
print("Property Features: is_new_property, is_freehold, property_size_score, address_completeness")
print("Categorical Encoding: One-hot encoded for all categorical variables")
print("Numerical Scaling: StandardScaler applied to continuous features")

# Save the prepared datasets (optional)
# Uncomment the lines below to save the datasets
# train_data.write.mode('overwrite').saveAsTable('caio_moreno.uk_house_advisor_bot.ml_train_data')
# val_data.write.mode('overwrite').saveAsTable('caio_moreno.uk_house_advisor_bot.ml_val_data')
# test_data.write.mode('overwrite').saveAsTable('caio_moreno.uk_house_advisor_bot.ml_test_data')

print("\n✅ Data preparation completed! Ready for model training.")

In [0]:
# Get dataset statistics
print(f"Total records: {df.count():,}")
print(f"Number of columns: {len(df.columns)}")

# Check data types and null counts
from pyspark.sql.functions import col, count, when, isnan, isnull
from pyspark.sql.types import DoubleType, FloatType

# Calculate null counts for each column (handle different data types)
null_counts = []
for c in df.columns:
    if isinstance(df.schema[c].dataType, (DoubleType, FloatType)):
        null_count = df.select(count(when(col(c).isNull() | isnan(col(c)), c)).alias(c))
    else:
        null_count = df.select(count(when(col(c).isNull(), c)).alias(c))
    null_counts.append(null_count)

# Combine all null counts
from functools import reduce
null_summary = reduce(lambda df1, df2: df1.union(df2), null_counts)
print("\nNull counts by column:")
null_summary.show()

# Basic statistics for numerical columns
print("\nBasic statistics for price column:")
df.select("price").describe().show()

# Check unique values for categorical columns
print("\nUnique values in key categorical columns:")
print(f"Property types: {df.select('property_type').distinct().count()}")
print(f"Counties: {df.select('county').distinct().count()}")
print(f"Towns/Cities: {df.select('town_city').distinct().count()}")