# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [None]:
# import libraries
#This code imports various libraries and classes necessary for data analysis and machine learning using Apache Spark.
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, concat, count, desc, explode, lit, min, max, split, stddev, udf
from pyspark.sql.types import IntegerType, StringType
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier

In [None]:
# Retrieves an existing SparkSession or creates a new one if it doesn't exist. This ensures that only one SparkSession is active per JVM (Java Virtual Machine).
spark = SparkSession.builder \
    .appName("Capstone Project") \
    .getOrCreate()

# Reads the JSON data from the specified file into a DataFrame.
sparkify_data = 'mini_sparkify_event_data.json'
df = spark.read.format("json").load(sparkify_data)

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [None]:
#It is used to show the structure of the DataFrame in the form of a tree
df.printSchema()

In [None]:
#used to convert the Apache Spark df DataFrame to a Pandas DataFrame
df1 = df.toPandas()
df1.head()

In [None]:
#used to get information about the DataFrame
df1.info()

In [None]:
#This code extracts the unique values from various columns of the df1 DataFrame and displays them to the console, allowing us to better understand the different values ​​that appear in those columns and gain insight into the diversity of data in those categories.
unique_levels = df1['level'].unique()
unique_methods = df1['method'].unique()
unique_pages = df1['page'].unique()
unique_statuses = df1['status'].unique()
unique_genders = df1['gender'].unique()
unique_auths = df1['auth'].unique()

print(unique_levels, unique_methods, unique_pages, unique_statuses, unique_genders, unique_auths)

In [None]:
#this code counts the number of records in the df1 DataFrame that have an empty value in the 'userId' column and displays that count to the console. This allows us to identify how many records in the dataset do not have a 'userId' assigned to them and gives us information about the integrity of the data in that column.
empty_user_ids_count = df1.loc[df1['userId'] == '', 'userId'].count()
print(empty_user_ids_count)

In [None]:
#the code counts the number of records in the DataFrame df that have an empty string as value in the 'sessionId' column. This allows us to identify how many records have an empty 'sessionId' in the dataset and gives us information about the integrity of the data in that column.
df.filter(df.sessionId == '').count()

In [None]:
#the code first filters the DataFrame df to remove rows that have an empty 'userId' and then counts the number of records that still have an empty 'userId'. This allows us to check how many records have been deleted and gives us information about the integrity of the data in the 'userId' column.
count = df.filter((df.userId != '') & (df.userId.isNull() | (df.userId == ''))).count()

In [None]:
#converts the Spark DataFrame df to a Pandas DataFrame called df1
df1 = df.toPandas()

In [None]:
#seeks to get the unique values of various columns of the dfp DataFrame and store them in separate variables. Unique values ​​are obtained using the set() function, which removes duplicates, and then converted to lists using list(), this allows you to check what unique values ​​exist in each column and can be useful for better understanding the distribution and diversity of the data in those columns.
unique_levels = list(set(df1['level']))
unique_methods = list(set(df1['method']))
unique_pages = list(set(df1['page']))
unique_statuses = list(set(df1['status']))
unique_genders = list(set(df1['gender']))
unique_auths = list(set(df1['auth']))

print(unique_levels, unique_methods, unique_pages, unique_statuses, unique_genders, unique_auths)

In [None]:
# count the number of rows and columns
row_count = df.count()
column_count = len(df.columns)

# print the count
print((row_count, column_count))

In [None]:
df.describe()

In [None]:
df.show(2)

In [None]:
#Get basic information about the 'userId' column, such as the number of non-null values, the variability of the values, and other important statistics. It can be useful to better understand the distribution and properties of the 'userId' column in the dataset.
summary_df = df.select('userId').describe()
summary_df.show()

In [None]:
# #Get basic information about the 'sessionId' column, such as the number of non-null values, the variability of the values, and other important statistics. It can be useful to better understand the distribution and properties of the 'sessionId' column in the dataset.
summary_df = df.select('sessionId').summary()
summary_df.show()

In [None]:
#Get an overview of the most frequent and least frequent pages visited by users in the data set. It can help identify usage patterns and understand what actions are most common among users.
summary_df = df.groupBy('page').count()
summary_df.show()

In [None]:
#identify the different categories or types of pages present in the data set. It can help to understand the variety of actions or events logged and provide an initial idea of ​​the available features
unique_pages_df = df.select('page').distinct()
unique_pages_df.show()

In [None]:
#Get an overview of the 'length' column, which can help you understand the length characteristics of recorded songs or events.
summary_df = df.select('length').summary()
summary_df.show()

In [None]:
#convert Spark's df DataFrame to a Pandas DataFrame
df_pandas = df.toPandas()
print(df_pandas)

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

Define Churn

In [None]:
#generates a list called unique_counts that contains information about the number of unique values in the 'userId', 'page' and 'sessionId' columns of the DataFrame
unique_counts = [f'Unique {col}s: {df.select(col).distinct().count()}' for col in ('userId', 'page', 'sessionId')]
print(unique_counts)

In [None]:
#calculates the number of unique values in the 'userId' column of the DataFrame df and stores it in the unique_user_ids_count variable
unique_user_ids_count = len(df.select('userId').distinct().collect())
print(unique_user_ids_count)

In [None]:
#the code assigns values of 1 to the "churn" column for the records that represent a cancellation confirmation, and then calculates the cumulative sum of these values ​​within a window defined by the "userId" partition. This can help identify when a user has canceled based on the cumulative evolution of the churn records for each user.
from pyspark.sql.window import Window
from pyspark.sql import functions as F
windowval = Window.partitionBy("userId").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df = df.withColumn("churn", F.expr("CASE WHEN page = 'Cancellation Confirmation' THEN 1 ELSE 0 END"))
df = df.withColumn("churn", F.sum("churn").over(windowval))

In [None]:
df1 = df.toPandas()

In [None]:
df1.sample(5)

Explore Data

In [None]:
#displays the user count for each unique 'churn' value, providing information about the distribution of churns and non-churns in the dataset
churn_counts = df1.drop_duplicates(subset='userId').groupby('churn')['userId'].count()
print(churn_counts)

In [None]:
#performs a grouping and aggregation of data in the DataFrame df using the 'churn' column as the grouping criteria
df.groupby('churn').agg(F.countDistinct('userId').alias('count')).show()

In [None]:
#performs a grouping and aggregation of data in the DataFrame df using the 'churn' and 'gender' columns as grouping criteria.
df.groupby('churn', 'gender').agg(F.countDistinct('userId').alias('count')).show()

In [None]:
#performs a grouping and aggregation of data in the DataFrame df using the 'churn' and 'level' columns as grouping criteria
df.groupby('churn', 'level').agg(F.countDistinct('userId').alias('count')).show()

In [None]:
#generate a bar chart showing the frequency of unique users in different categories
def plot_frequency(subset, group, labels, x_title="Subscription", y_title="users"):
    freq_counts = df1.drop_duplicates(subset=subset).groupby(group)['userId'].count()
    fig, ax = plt.subplots(figsize=(8, 5))
    freq_counts.plot(kind='bar', ax=ax, color=['red', 'black'])
    ax.set_title('Unique users per category')
    ax.set_xlabel(x_title)
    ax.set_ylabel(y_title)
    ax.set_xticklabels(labels, rotation=0)
    plt.show()

plot_frequency(['userId'], ['churn'], ['Active', 'Cancelled'])

The status of "cancelled" accumulates 32% of users.

In [None]:
#view unique user count by gender and subscription status
import numpy as np
def plot_frequency(subset, group, labels, x_title="Subscription", y_title="users"):
    freq_counts = df1.drop_duplicates(subset=subset).groupby(group)['userId'].count()
    fig, ax = plt.subplots(figsize=(8, 5))
    x = np.arange(len(labels))
    
    colors=['yellow','green','blue','gray']
    ax.bar(x, freq_counts, color=colors)
    ax.set_title('unique users per category')
    ax.set_xlabel(x_title)
    ax.set_ylabel(y_title)
    ax.set_xticks(x)
    ax.set_xticklabels(labels, rotation=0)
    plt.show()

plot_frequency(['userId', 'gender'],
               ['gender', 'churn'],
               ['Active-Female', 'Cancelled-Female', 'Active-Male', 'Cancelled-Male'])

There is a direct relationship between the highest number of active subscriptions and the highest number of canceled subscriptions that, taking into account the gender, is male.

In [None]:
#shows the frequency of unique users in different categories
def plot_frequency(subset, group, labels, x_title="Subscription", y_title="users"):
    freq_counts = df1.drop_duplicates(subset=subset).groupby(group)['userId'].count()
    fig, ax = plt.subplots(figsize=(8, 5))
    x = np.arange(len(labels))
    
    colors=['pink','orange','brown','purple']
    ax.bar(x, freq_counts, color=colors)
    ax.set_title('Unique users per category')
    ax.set_xlabel(x_title)
    ax.set_ylabel(y_title)
    ax.set_xticks(x)
    ax.set_xticklabels(labels, rotation=0)
    plt.show()

plot_frequency(['userId', 'level'],
               ['level', 'churn'],
               ['Active-Free', 'Cancelled-Free', 'Active-Paid', 'Cancelled-Paid'])

85% of total active Sparkify users are paid subscriptions, 77% of total canceled users were paid subscriptions.

In [None]:
#The provided code performs an analysis of the event pages based on the cancellation status of the users. First, the percentage occurrence of events for canceled and active users on each page is calculated. A bar chart is then created showing the comparison of these percentages between the two groups of users. The graph provides a visualization of the difference in event distribution between canceled users and active users on different pages.
users_1 = df1[df1.churn == 1].groupby(['page'])['userId'].count().drop('NextSong')
users_1 = users_1 / users_1.sum() * 100

users_0 = df1[df1.churn == 0].groupby(['page'])['userId'].count().drop('NextSong')
users_0 = users_0 / users_0.sum() * 100

users_df = pd.DataFrame({'Cancelled': users_1, 'Active users': users_0})
fig, ax = plt.subplots(figsize=(8, 10))
users_df.plot(kind='bar', ax=ax, color=['red', 'black'])
ax.set_xlabel('Event occurrence (%)')
ax.set_ylabel('Page')
ax.set_title('Event occurrence for active and cancelled users')
plt.show()

The activity on the page: Add friend and Add to playlist are notable in relation to the use of Sparkify, the activity with the highest use of the page: Thumps up is for both active users and those who have cancelled.

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

In [None]:
#perform an operation on the DataFrame 'df' to get a new DataFrame called 'churn_users'. This new DataFrame contains the 'userId' and 'churn' columns and is obtained by removing duplicate rows based on these two columns. Then, the content of the 'churn_users' DataFrame is displayed in the output. This allows us to see the unique values ​​of 'userId' along with their 'churn' status.

churn_users = df.drop_duplicates(['userId', 'churn']).select('userId', 'churn')
churn_users.show()

In [None]:
#The provided code defines a function called 'create_dummy_df' which creates a new DataFrame from another existing DataFrame. In this case, it is used to create a DataFrame named 'gender_df' containing the 'userId' and 'gender' columns. The function replaces the values ​​in the 'gender' column based on a given dictionary, converts the data type of the 'gender' column to an integer, and displays the schema and content of the resulting new DataFrame. In this case, 'M' values ​​are replaced with '1' and 'F' values ​​are replaced with '0'. This approach allows converting categorical values ​​into numerical variables for further analysis or modeling.
def create_supposed_df(col, dictionary):
   
    col_df = df.select('userId', col).dropDuplicates().replace(dictionary, subset=col)
    col_df = col_df.withColumn(col, col_df[col].cast('int'))
    col_df.printSchema()
    col_df.show()
    return col_df

gender_df = create_supposed_df('gender', {'M': '1', 'F': '0'})

In [None]:
#The function would use an existing DataFrame and replace the values in the 'level' column based on a given dictionary. In this case, 'paid' values ​​would be replaced with '1' and 'free' values ​​would be replaced with '0'. Then the resulting DataFrame would display its schema and content. This approach allows you to convert the values ​​of the 'level' column into numeric variables for further analysis or modeling.
level_df = create_supposed_df('level', {'paid': '1', 'free': '0'})

In [None]:
#number of different artists present in the 'artist' column of the DataFrame 'dfp'
df1.artist.unique().shape[0]

In [None]:
#The code performs some data processing to calculate the average length and standard deviation of the songs played by each user
from pyspark.sql import functions as sF

song_length = df.filter(df.page == 'NextSong') \
    .select('userId', 'sessionId', 'length') \
    .withColumn('hours', sF.expr('length / 3600')) \
    .groupBy('userId', 'sessionId') \
    .agg(sF.sum('hours').alias('total_hours'))

song_length = song_length.groupBy('userId') \
    .agg(
        sF.avg('total_hours').alias('mean_hours'),
        sF.stddev('total_hours').alias('stdev_hours')
    ).fillna(0)

song_length.limit(5).toPandas()

In [None]:
#Perform data processing to obtain the distribution of the pages visited by each user in the form of normalized percentages. This is achieved by creating a new DataFrame 'user_page_distribution' containing the 'userId' columns and the pages columns, where each value represents the percentage of visits to that page per user
from functools import reduce

user_page_distribution = df.groupby('userId').pivot('page').count().na.fill(0)
user_page_distribution = user_page_distribution.drop(*['Cancel','Cancellation Confirmation'])
pages_cols = user_page_distribution.columns[1:]
new_df = user_page_distribution.withColumn('total', sum(user_page_distribution[col] for col in pages_cols))
for col in pages_cols:
    new_df = new_df.withColumn(f'norm_{col}', new_df[col] / new_df['total'] * 100.)
new_df = new_df.drop('total')
new_df = new_df.drop(*pages_cols)
oldColumns = new_df.columns
newColumns = ['userId'] + pages_cols
user_page_distribution = reduce(lambda new_df, idx: new_df.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), new_df)
new_df=None

user_page_distribution.toPandas().head()

In [None]:
#calculate the average and standard deviation of the number of songs played per session for each user

song_user_df = (df.filter(df.page == 'NextSong')
                  .groupBy('userId', 'sessionId')
                  .agg(F.count('*').alias('count'))
                  .groupBy('userId')
                  .agg(F.avg('count').alias('mean_songs'),
                       F.stddev('count').alias('stdev_songs'))
                  .na.fill(0))

song_user_df.show()

In [None]:
#calculates the number of unique artists listened to by each user
artists_user_fans = (
    df.select('userId', 'artist')
    .dropDuplicates()
    .groupBy('userId')
    .count()
    .withColumnRenamed('count', 'num_artists')
)

artists_user_fans.show()

In [None]:
#calculates the duration of each user session in hours from the recorded timestamps
from pyspark.sql.functions import col, expr

session_end = (
    df.groupBy('userId', 'sessionId')
    .agg(expr('max(ts)').alias('end'))
)

session_start = (
    df.groupBy('userId', 'sessionId')
    .agg(expr('min(ts)').alias('start'))
)

ticks_per_hour = 1000 * 60 * 60

session_df = (
    session_start.join(session_end, ['userId', 'sessionId'])
    .withColumn('session_hours', (col('end') - col('start')) / ticks_per_hour)
    .select('userId', 'sessionId', 'session_hours')
)

session_df.show()

In [None]:
#calculates the mean and standard deviation of session durations per user
from pyspark.sql.functions import avg, stddev, col

session_user_df = (
    session_df.groupBy('userId')
    .agg(avg('session_hours').alias('mean_session_h'), stddev('session_hours').alias('stdev_session_h'))
    .na.fill(0)
)

session_user_df.show()

In [None]:
#calculates the number of sessions for each user, eliminating duplicate sessions
from pyspark.sql.functions import count

num_sessions_user_df = (
    df.select('userId', 'sessionId')
    .dropDuplicates()
    .groupby('userId')
    .agg(count('*').alias('num_sessions'))
)

num_sessions_user_df.show()

In [None]:
#calculates the number of days since subscription for each user
from pyspark.sql.functions import expr

def days_since_subscription(df, col_name='days_on'):
    reg_df = df.select('userId', 'registration') \
        .dropDuplicates() \
        .withColumnRenamed('registration', 'start') \
        .join(df.groupBy('userId').max('ts').withColumnRenamed('max(ts)', 'end'), 'userId') \
        .withColumn(col_name, F.expr('(end - start) / (1000 * 60 * 60 * 24)')) \
        .select('userId', col_name)
    
    return reg_df

reg_df = days_since_subscription(df, col_name='days_total_subscription')
reg_df.show()

In [None]:
#filters the original DataFrame to get only the rows with 'paid' subscription level, then calculates the number of days since paid subscription for each user
from pyspark.sql.functions import expr

df_paid = df.filter(expr("level = 'paid'"))
paid_df = days_since_subscription(df_paid, col_name='days_paid_subscription')
paid_df.show()

In [None]:
#The code filters and processes two different subsets of the original data frame df based on the level ("free" and "paid"). Then calculate the subscription duration in days for each user in each subset
df_free = df.filter(expr("level = 'free'"))
free_df = days_since_subscription(df_free, col_name='days_free_subscription')
free_df.show()

Collect all

In [None]:
#both lists contain a collection of dataframes related to various user characteristics. The names in user_features_names provided a reference to identify each data frame in user_features.
user_features = []
user_features_names = []

user_features.append(gender_df)
user_features_names.append('gender_df')

user_features.append(song_length)
user_features_names.append('song_length')

user_features.append(user_page_distribution)
user_features_names.append('user_page_distribution')

user_features.append(song_user_df)
user_features_names.append('song_user_df')

user_features.append(artists_user_fans)
user_features_names.append('artists_user_fans')

user_features.append(session_user_df)
user_features_names.append('session_user_df')

user_features.append(num_sessions_user_df)
user_features_names.append('num_sessions_user_df')

user_features.append(reg_df)
user_features_names.append('reg_df')

In [None]:
#performs the union of multiple dataframes of user characteristics with a base dataframe, generating a final dataframe with all the characteristics combined. Each feature dataframe is joined using the 'userId' column and the result is sorted by 'userId'

final_df = churn_users

def join_features(base, new):
    return base.join(new, 'userId', how='inner').dropDuplicates()

for feature, feature_name in zip(user_features, user_features_names):
    final_df = join_features(final_df, feature)

final_df = final_df.orderBy('userId', ascending=True)

In [None]:
#the code deletes a directory if it already exists and then saves the final_df dataframe in CSV format
import shutil
import os

directory_path = 'user_dataset.CSV'

if os.path.exists(directory_path):
    shutil.rmtree(directory_path)

final_df.write.save('user_dataset.CSV', format='csv', header=True)

In [None]:
#convert the Spark DataFrame to a Pandas DataFrame
final_df1 = final_df.toPandas()
final_df1.head()

In [None]:
final_df1.to_csv('user_dataset_definitive.CSV', index=False)

In [None]:
#loads the CSV file into a Spark DataFrame, converts it to a Pandas DataFrame
final_df = spark.read.csv('user_dataset.CSV', header = True)
final_1_df = final_df.toPandas()
final_1_df.head()

In [None]:
#renames the columns of the final_df DataFrame, replacing whitespace with underscores and removing periods in the column names, and then displays the new column names
final_df = final_df.toDF(*(col.replace(' ', '_').replace('.', '') for col in final_df.columns))
print(final_df.columns)

In [None]:
#converts the 'userId' columns and the first 11 columns of the final_df DataFrame to the IntegerType data type, and the remaining columns (starting with column 12) are converted to the FloatType data type. This is achieved using Spark SQL's withColumn() and cast() function

from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, FloatType

final_df = final_df.select(
    col('userId').cast(IntegerType()),
    *[
        col(col_name).cast(IntegerType()) if col_name in final_df.columns[1:12]
        else col(col_name).cast(FloatType())
        for col_name in final_df.columns[1:]
    ]
)

In [None]:
#replace null values (NaN) with the value 0
final_df = final_df.fillna(0)

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [None]:
#prepares the data for use in a machine learning model by assembling the features, scaling them, and dividing them into training and test sets, 70% of the data is assigned to the training set (train) and 30% of the data to the test set. A seed of 100 is used to guarantee the reproducibility of the division

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

assembler = VectorAssembler(inputCols=final_df.columns[2:], outputCol='features')
assembled_data = assembler.transform(final_df)

scaler = StandardScaler(withMean=True, withStd=True, inputCol='features', outputCol='scaled_features')
scaled_data = scaler.fit(assembled_data).transform(assembled_data)

ml_data = scaled_data.select(scaled_data.churn.alias('label'), scaled_data.scaled_features.alias('features'))

train, test = ml_data.randomSplit([0.70, 0.30], seed=100)

In [None]:
#performs fit of a machine learning model using cross validation and saves the best fitted model to a specified file

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from datetime import datetime

def model_fitting(data, model_type, param_grid, save_as, num_folds=2, random_seed=100):
    model_evaluator = CrossValidator(
        estimator=model_type,
        estimatorParamMaps=param_grid,
        evaluator=MulticlassClassificationEvaluator(),
        numFolds=num_folds,
        seed=random_seed
    )
    t_start = datetime.now()
    fitted_model = model_evaluator.fit(data)
    t_dif = datetime.now() - t_start
    t_start = datetime.now()
    fitted_model.bestModel.write().overwrite().save(save_as)
    t_dif = datetime.now() - t_start
    return fitted_model

The Logistic Regression model

In [None]:
model = LogisticRegression()
param_grid = (ParamGridBuilder()
              .addGrid(model.regParam, [0.02, 0.2])
              .addGrid(model.elasticNetParam, [0.1, 0.4])
              .addGrid(model.aggregationDepth, [3, 6])
              .build())

m = model_fitting(train, model, param_grid, 'LogisticRegression.model')


In [None]:
predictions = m.transform(test)

print("test")
# Calculate and display the precision of the model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: {:.2f}%".format(accuracy * 100))

# Calculate and display the accuracy metric
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)
print("Precision: {:.2f}%".format(precision * 100))

# Calculate and display the recall metric
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)
print("Recall: {:.2f}%".format(recall * 100))

# Calculate and display the F1-score metric
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(predictions)
print("F1-score: {:.2f}%".format(f1 * 100))

In [None]:
predictions = m.transform(train)

print("train")
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: {:.2f}%".format(accuracy * 100))

evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)
print("Precision: {:.2f}%".format(precision * 100))

evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)
print("Recall: {:.2f}%".format(recall * 100))

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(predictions)
print("F1-score: {:.2f}%".format(f1 * 100))

The Decision Tree Classifier model

In [None]:
#trains a decision tree classifier with different combinations of hyperparameters, saves the trained model, and then returns the classifier-specific evaluation metrics for the training and test sets

from pyspark.ml.classification import DecisionTreeClassifier
model = DecisionTreeClassifier()
param_grid = (ParamGridBuilder()
              .addGrid(model.maxDepth, [1, 3, 5])
              .addGrid(model.impurity, ['entropy', 'gini'])
              .build())

m = model_fitting(train, model, param_grid, 'DecisionTreeClassifier.model')

In [None]:
predictions = m.transform(test)

print("test")
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: {:.2f}%".format(accuracy * 100))

evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)
print("Precision: {:.2f}%".format(precision * 100))

evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)
print("Recall: {:.2f}%".format(recall * 100))

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(predictions)
print("F1-score: {:.2f}%".format(f1 * 100))

In [None]:
predictions = m.transform(train)

print("train")
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: {:.2f}%".format(accuracy * 100))

evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)
print("Precision: {:.2f}%".format(precision * 100))

evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)
print("Recall: {:.2f}%".format(recall * 100))

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(predictions)
print("F1-score: {:.2f}%".format(f1 * 100))

The Gradient-Boosted Trees (GBTs) model

In [None]:
#trains a Gradient Boosted Trees (GBT) classifier with different combinations of hyperparameters, saves the trained model, and displays classifier-specific evaluation metrics for the training and test sets

model = GBTClassifier()
param_grid = ParamGridBuilder() \
    .addGrid(model.maxDepth, [1, 3, 5]) \
    .addGrid(model.maxBins, [4, 2]) \
    .addGrid(model.maxIter, [6, 2]) \
    .build()

m = model_fitting(train, model, param_grid, 'GradientBoostedTrees.model')

In [None]:
predictions = m.transform(test)

print("test")
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: {:.2f}%".format(accuracy * 100))

evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)
print("Precision: {:.2f}%".format(precision * 100))

evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)
print("Recall: {:.2f}%".format(recall * 100))

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(predictions)
print("F1-score: {:.2f}%".format(f1 * 100))

In [None]:
predictions = m.transform(train)

print("train")
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: {:.2f}%".format(accuracy * 100))

evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)
print("Precision: {:.2f}%".format(precision * 100))

evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)
print("Recall: {:.2f}%".format(recall * 100))

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(predictions)
print("F1-score: {:.2f}%".format(f1 * 100))

Random Forest model

In [None]:
#finds the best combination of hyperparameters for the RandomForestClassifier classifier using the cross-validation technique with the default number of folds 

model = RandomForestClassifier()
param_grid = ParamGridBuilder() \
    .addGrid(model.maxDepth, [1, 3]) \
    .addGrid(model.impurity, ['entropy', 'gini']) \
    .addGrid(model.maxBins, [4, 2]) \
    .addGrid(model.numTrees, [6, 2]) \
    .addGrid(model.featureSubsetStrategy, ['sqrt', 'onethird']) \
    .build()

m = model_fitting(train, model, param_grid, 'RandomForestClassifier.model')

In [None]:
predictions = m.transform(test)

print("test")
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: {:.2f}%".format(accuracy * 100))

evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)
print("Precision: {:.2f}%".format(precision * 100))

evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)
print("Recall: {:.2f}%".format(recall * 100))

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(predictions)
print("F1-score: {:.2f}%".format(f1 * 100))

In [None]:
predictions = m.transform(train)

print("train")
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy: {:.2f}%".format(accuracy * 100))

evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
precision = evaluator_precision.evaluate(predictions)
print("Precision: {:.2f}%".format(precision * 100))

evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
recall = evaluator_recall.evaluate(predictions)
print("Recall: {:.2f}%".format(recall * 100))

evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1 = evaluator_f1.evaluate(predictions)
print("F1-score: {:.2f}%".format(f1 * 100))

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.