In [None]:
##################
##################


def read_data(spark, customSchema):
    '''
    spark_session: spark
    customSchema : we have given the custom schema
    '''
    print("--------------------")
    print ("Starting read_data")
    print("--------------------")
    
    #Mention the Bucket name inside the bucket name variable
    bucket_name = "loan-data926529466287266"
    s3_input_path = "s3://" + bucket_name + "/inputfile/loan_data.csv"
    # Read the CSV file into a dataframe. Make sure to give header as true and schema as customSchema.
    df = spark.read.csv(s3_input_path, header=True, schema=customSchema)
    
    return df


def clean_data(input_df):
    '''
    for input file: input_df is output of read_data function
    '''
    print("--------------------")
    print("Starting clean_data")
    print("--------------------")
    
    df = input_df.dropna()    # Drop any rows containing null values.
    df = df.dropDuplicates()    # Remove duplicate rows.
    df = df.filter(df.purpose!='null')    # Drop the rows where the "purpose" column contains the string "null".
    
    return df


def s3_load_data(data, file_name):
    '''
    data : the output data of refult_1 and result_2 function 
    file name : the name of the output to be stored inside the s3
    '''
    #Mention the bucket name inside the bucket_name variable
    bucket_name = "loan-data926529466287266"	
    output_path = "s3://" + bucket_name + "/output"+ file_name
    
    if data.count() !=0:
        print("Loading the data", output_path)
        # Write a code to store the outputs to the respective locations using the output_path param.
        data.coalesce(1).write.csv(output_path, header=True, mode="overwrite")
    
    else:
        print("Empty dataframe, hence cannot save the data", output_path)


def result_1(input_df):
    '''
    for input file: input_df is output of clean_data function
    '''
    print("--------------------------")
    print("Starting result_1")
    print("--------------------------")
    
    # Filters the rows where the "purpose" is either "educational" or "small_business"
    df = input_df.filter((col("purpose")=="educational")|(col("purpose")=="small_business"))
    
    # Create a new column "income_to_installment_ratio" which is ratio of 
    # "log_annual_inc" to "installment"
    df = df.withColumn("income_to_installment_ratio", col("log_annual_inc")/col("installment"))
    
    # Create a new column "int_rate_category" which categorizes the "int_rate" as
    # int_rate < 0.1:            low
    # 0.1 <= int_rate < 0.15:    medium
    # int_rate >= 0.15:          high    
    df = df.withColumn("int_rate_category",
                       when(col("int_rate")<0.1, "low")
                       .when((col("int_rate") >= 0.1) & (col("int_rate") < 0.15), "medium" )
                       .otherwise("high")
                       )
    
    # Create a new column "high_risk_borrower" which flags high-risk borrowers
    # value of "1" based on the following conditions:
    #   dti > 20
    #   fico < 700
    #   revol_util > 80
    #   Otherwise, this column has a value of "0".
    df = df.withColumn("high_risk_borrower",
                       when((col("dti") > 20) | (col("fico") < 700) | (col("revol_util") > 80), 1) # Corrected syntax
                       .otherwise(0)
                      )
    
    return df


def result_2(input_df):
    '''
    for input file: input_df is output of clean_data function
    '''
    print("--------------------------")
    print("Starting result_2")
    print("--------------------------")
    
    # Calculate the "default_rate" for each purpose, defined as the count of loans that are
    # not fully paid (i.e., not_fully_paid == 1) divided by the total count of loans
    df = input_df.groupBy("purpose").agg(
         (sum(col("not_fully_paid")) / count("*")).alias("default_rate")
         )
    
    # Round the default_rate values to the two decimal values
    df = df.withColumn("default_rate", round(col("default_rate"), 2))
    
    return df


def redshift_load_data(data):
    if data.count() != 0:
        print("Loading the data into Redshift...")
        jdbcUrl = "jdbc:redshift://emr-spark-redshift.cjgnpeot7x5i.us-east-1.redshift.amazonaws.com:5439/dev"
        username = "awsuser" #Mention redshift username
        password = "Awsuser1" #Mention redshift password
        table_name = "result_2" #Mention redshift table name
    
        # Load data from result_2 table into redshift
        data.write \
            .format("jdbc") \
            .option("url", jdbcUrl) \
            .option("dbtable", table_name) \
            .option("user", username) \
            .option("password", password) \
            .mode("overwrite") \
            .save()
    
    else:
        print("Empty dataframe, hence cannot load the data")




#################
#################


#### Import statements here
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.utils import resample
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split 
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, classification_report 

import warnings
import boto3
from sagemaker import get_execution_role

warnings.filterwarnings('ignore')


#### Import the dataset from S3
# Build the S3 path for the dataset
# Sample S3 URI - "s3://bucket_name/folder_name/file_name.csv
bucket="loan-data602607864436400"
folder_name = "loan_cleaned_data"
data_key = "loan_cleaned_data.csv"
data_location = f's3://{bucket}/{folder_name}/{data_key}'

# Load and store the dataset into a pandas DataFrame.
data = pd.read_csv(data_location)
data.head()


# Feature Engineering - One-hot Encoding
# Convert the values in the categorical column 'purpose' into numerical format 
# using `One-hot Encoding``. The datatype of the new columns should be `int`.
data = pd.get_dummies(data,columns=['purpose'], dtype=int)
data.head()


# Inspect the target column 'not_fully_paid' and identify the count of records belonging to the two classes.
# Filter out the majority and minority classes and store them separately.
df_majority=data[data['not_fully_paid']==0]
df_minority=data[data['not_fully_paid']==1]


# Handle the imbalanced data using resample method and oversample the minority class
df_minority_upsampled = resample(df_minority,
                                 replace=True, 
                                 n_samples = df_majority.shape[0],
                                 random_state=42)


# Concatenate the upsampled data records with the majority class records and shuffle the resultant dataframe
df_balanced=pd.concat([df_majority,df_minority_upsampled])
print(df_balanced['not_fully_paid'].value_counts())


# Create X and y data for train-test split
# Model Training
from sklearn.model_selection import train_test_split
from sklearn.ensemble import  RandomForestClassifier

# Drop the columns 'sl_no' and 'not_fully_paid' and 
# create a dataframe of independent variables named X. 
# Filter the dependent variable and store it in y.
X = df_balanced.drop(columns=['sl_no','not_fully_paid'])
y = df_balanced['not_fully_paid']


# Split the data 
# Split the data into training and test sets using 60:40 ratio. Use a random state equal to 42.
X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.4, random_state=42)


# Train a Random Forest Classifier model
# Train a Random Forest Classifier model called rf using the training data. Use a random state equal to 42.
rf = RandomForestClassifier(random_state=42)
rf.fit(X_train, y_train)


# Predict using the trained Random Forest Classifier model
# Predict using the trained Random Forest Classifier model rf on the test data X_test.
from sklearn.metrics import classification_report
y_pred = rf.predict(X_test)


# Print the classification report 
# Model Evaluation
# Evaluate the predictions by comparing it with the actual test data y_test.
# Print the classification report to determine the evaluation metric scores.
print(classification_report(y_test, y_pred))


import tempfile
import joblib

BUCKET_NAME ="loan-data602607864436400"
model_name = "model.pkl"
with tempfile.NamedTemporaryFile() as tmp:
    joblib.dump(rf, tmp.name)
    tmp.flush()

    s3= boto3.client('s3')
    
    # Upload the model file to the specified S3 bucket named 'loan-dataYXYZYZ' (XYZXYZ can be any random integers).
    # Ensure the model is saved as 'model.pkl' in the S3 bucket.
    s3.upload_file(tmp.name, BUCKET_NAME, model_name)




#################
#################

def read_data(spark):
    '''
    spark_session : spark
    customSchema : we have given the custom schema
    '''
    print("-------------------")
    print("Starting read data")
    print("-------------------")
    #Mention the Bucket name inside the bucket name variable
    bucket_name = "car-data843004767089968"
    s3_input_path = "s3://" + bucket_name + "/inputfile/car_data.csv"
    # s3_input_path = './Car details v3.csv'

    df = spark.read.csv(s3_input_path, header=True)    # Read the CSV file into a dataframe. Make sure to give only header as true

    return df


def clean_data(input_df):
    '''
    for input file: input_df is output of read_data function
    '''
    print("--------------------")
    print("Starting clean_data")
    print("--------------------")
    df = input_df.dropna()    # Drop the rows which have null values in any of the columns.
    df = df.dropDuplicates()    # Drop duplicate rows

    return df


def s3_load_data(data, file_name) :
    '''
    data : the output data of clean_data function
    file_name : the name of the output to be stored inside the s3
    '''
    #Mention the bucket name inside the bucket_name variable
    bucket_name = "car-data843004767089968"
    output_path = "s3://" + bucket_name + "/output/"+ file_name

    if data.count() != 0:
        print("Loading the data", output_path)
        #write the s3 load data command here
        data.coalesce(1).write.csv(output_path, header=True, mode="overwrite")    # Write a code to store the output to the respective locations using the output_path param.
    else:
        print("Empty dataframe, hence cannot save the data", output_path)


def result_1(input_df):
    '''
    for input file: input_df is output of clean_data function
    '''
    print("--------------------")
    print("Starting result 1")
    print("--------------------")

    # Fetch the average selling price and rename it as average_selling_price and count of cars and rename it as car_count for each car_name.
    # And fetch only the records which have car_count greater than 2 cars.
    df = input_df.groupBy("name").agg(avg("selling_price").alias("average_selling_price"), count("*").alias("car_count")).filter("car_count > 2")

    return df


def result_2(input_df):
    '''
    for input file: input_df is output of clean_data function
    '''
    print("--------------------")
    print("Starting result_2")
    print("--------------------")

    # Create a column named as price_per_km which calculates the price per kilometer
    df = input_df.withColumn("price_per_km", col("selling_price") / col("km_driven"))

    # Filter the data to include only rows where price_per_km is less than `10`.
    df = df.filter(col("price_per_km") < 10)

    # Round the price_per_km to 2 decimals.
    df = df.withColumn("price_per_km", round(col("price_per_km"), 2))

    return df




########################
########################


import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn. compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import boto3
import pandas as pd
from sagemaker import get_execution_role
import numpy as np
import warnings


### Create S3 path for the dataset
# bucket = 
# data_key = 
# data_location = f's3://{bucket}/{data_key}'

### Load the dataset
data = pd.read_csv(data_location)


### Analyze the dataset
# Print the first few rows of the dataset to understand its structure and use inbuilt
# functions to get insights on the dataset.
data.head()


### Create new feature: age of the car
# Create a new feature 'car_age' which represents the age of the car in years.
# The car's age can be calculated as 2024-year
data['car_age'] = 2024 - data['year']


### Drop the columns
# Drop the columns 'name' and 'year' from the dataset
del data['name'], data['year']


### Define the features and target variable
# Define the features `X` by dropping the target variable (selling price)
# Define the target variable `y` as the selling price.
X = data.drop(columns=['selling_price'])
y = data['selling_price']


# Identify numerical and categorical features
numerical_features = ['km_driven', 'seats', 'car_age']
categorical_features = ['fuel', 'seller_type', 'transmission', 'owner']

### Log transformation for skewed numerical features
X['km_driven'] = np.log1p(X['km_driven'])
y = np.log1p(y)


# Use StandardScaler to scale the numerical features.
# Use OneHotEncoder to encode the categorical features, ensuring that unknown categories are handled.
preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numerical_features),
        ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
    ])


# Create a Sklearn model pipeline with the preprocessor pipeline and 
# `RandomForestRegressor` model with a random state set to `8`. 
# Store it under the name `model`.
model = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('regressor', RandomForestRegressor(random_state=8))
])


### Split the data into training and test sets
# Split the dataset into training and test sets using `train test_split` 
# with a test size of 20% and random state set to `8`
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=8)


# Param_grid = {`regressor_n_estimators`: [100, 200, 300),
#               `regressor_max_depth`: [None, 10, 20, 30],
#               `regressor_min_samples_split`: [2, 5, 10)}
# Define a parameter grid as given above with different values for 
# n_estimators, max_depth, and min_samples_split.
param_grid = {
    'regressor__n_estimators': [100, 200, 300],
    'regressor__max_depth': [None, 10, 20, 30],
    'regressor__min_samples_split': [2, 5, 10]
}

### Create the model
# Perform hyperparameter tuning using GridSearchCV to find 
# the best parameters for the Random ForestRegressor
grid_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=5, scoring='r2', n_jobs=1)


# Fit the GridSearchCV on the training data to find the best model
grid_search.fit(X_train, y_train)

# Extract the best model from the grid search results. 
# Store the best model under variable for example `best_model`.
best_model = grid_search.best_estimator_


### Make predictions
y_pred = best_model.predict(X_test)

### Transform predictions back to original scale
y_test = np.expm1(y_test)
y_pred = np.expm1(y_pred)


### Calculate evaluation metrics
# Calculate the Mean Absolute Error (MAE), Mean Squared Error (MSE), 
# Root Mean Squared Error (RMSE), and R2 score for the model
mae = mean_absolute_error(y_test, y_pred)
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_test, y_pred)

### Print the metrics
print(f'MAE: {mae}')
print(f'MSE: {mse}')
print(f'RMSE: {rmse} ')
print(f'R2: {r2} ')
print(f'Best Parameters: {grid_search.best_params_}')