Set up the environment variables for PySpark to run

In [1]:
import pyspark
import os
import sys
from pyspark import SparkContext
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [ ]:
from pyspark.sql import SparkSession

Create a SparkSession object by allocating 16GB memory to the Spark Driver
Either get the exisiting SparkSession or create one with application names as Kmeans

In [ ]:
spark = SparkSession.builder.config('spark.driver.memory', '16g').appName('k_means').getOrCreate()

# KMeans
KMeans is an unsupervised machine learning algorithm used for clustering data points into groups or clusters based on similarity. 
It aims to minimize the within-cluster variance, also known as inertia or distortion. It's computationally efficient and works well with large datasets.

Read the csv file from the data directory.
Infer the Schema of the columns automatically , but also mention that no header row is present in csv file.

Define a list of all column names present in the list and apply these columns to the dataframe we have

In [ ]:
data_without_header = spark.read.option('inferSchema', True).option('header', False).csv('data/kddcup.data_10_percent_corrected')

column_names = [ "duration", "protocol_type", "service", "flag",
"src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent",
"hot"
, "num_failed_logins", "logged_in", "num_compromised"
"root_shell", "su_attempted", "num_root", "num_file_creations",
"num_shells", "num_access_files", "num_outbound_cmds",
"is_host_login", "is-guest_login", "count", "srv_count",
"serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
"same_srv_rate", "diff_srv_rate", "srv_diff _host_rate",
"dst_host_count", "dst_host_srv_count"
"dst_host_same_srv_rate", "dst_host_diff_srv_rate",
"dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate",
"dst_host_serror_rate", "dst_host_srv_serror_rate"
"dst_host_rerror_rate", "dst_host_srv_rerror_rate",
"label"]
data = data_without_header. toDF (*column_names)

To refer to dataframe by column names, import the col function
Calculate the frequency of each unique label in the 'label' column of the dataframe and display the top 25 results in descending order

In [ ]:
from pyspark.sql.functions import col 
data.select("label").groupBy("label").count().orderBy(col("count").desc()).show (25)

Import VectorAssembler to assemble feature columns into vector column
Import Kmeans class to implement the Kmeans algorithm
Import Pipleline class to chain multiple transformations

Create a new dataframe numeric_only by dropping the mentioned columns. Cache this dataframe for faster access

Create a vector assembler specifying input columns as all except the last one

In [ ]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml. clustering import KMeans, KMeansModel
from pyspark.ml import Pipeline
numeric_only = data.drop("protocol_type", "service", "flag"). cache()
assembler = VectorAssembler().setInputCols (numeric_only. columns [: -1]) . \
    setOutputCol ("'featureVector")


Create a Kmeans object and set ther feature column name as 'featureVector' and prediction column name as 'cluster'
Then create a pipeline object setting its stages to assembler nad Kmeans
Fit the earlier created numeric_only dataframe to this pipeline model
Then retrive the Kmeans model from the fitter pipeline model

Import the pprint function for pretty printing the cluster centers. 

In [ ]:
kmeans = KMeans().setPredictionCol("cluster").setFeaturesCol("'featureVector")
pipeline = Pipeline().setStages([assembler, kmeans] )
pipeline_model = pipeline.fit(numeric_only)
kmeans_model = pipeline_model.stages[1]

from pprint import pprint
pprint (kmeans_model. clusterCenters)

Apply the trained Kmeans model to dataset
Calculates the count of each combination of cluster and label in the dataset, ordered by cluster and count

In [ ]:
with_cluster = pipeline_model.transform(numeric_only)

with_cluster.select("cluster", "label").groupBy ("cluster", "label").count().orderBy (col("cluster"), col ("count").desc()).show (25)

Define a function clustering_score that calculates the training cost of KMeans clustering.
Iterate over the value of 20 to 100 in steps of 20.
Create a VectorAssembler specifying all columns except last from only numeric inputs dataframe. 
Create a KMeans object with random seed and k clusters
Create a pipeline with assembler and Kmeans as stages

Find the training cost using ther sum of squared distances of points method.
Finally print the training_cost

In [ ]:
from pyspark.sql import DataFrame
from random import randint

def clustering_score (input_data, k):
    input_numeric_only = input_data.drop ("protocol_type", "service", "flag")
    assembler = VectorAssembler().setInputCols(input_numeric_only. columns[: -1]).setOutputCol ("'featureVector")
    kmeans = KMeans().setSeed(randint (100,100000)).setK(k).setPredictionCol ("cluster").setFeaturesCol ("featureVector")
    pipeline = Pipeline().setStages([assembler, kmeans] )
    pipeline_model = pipeline.fit(input_numeric_only)
    kmeans_model = pipeline_model. stages [-1]
    training_cost = kmeans_model.summary.trainingCost
    return training_cost    

for k in list(range(20,100,20)):
    print(clustering_score(numeric_only, k))

Prepare the input dataset by only keeping numeric columns
Perform similar steps as previous cell , only with once additional k val

In [ ]:
def clustering_score_1(input_data, k):
    input_numeric_only = input_data.drop("protocol_type", "service", "flag")
    
    assembler = VectorAssembler().setInputCols(input_numeric_only.columns[:-1]).setOutputCol("'featureVector")
    kmeans = KMeans().setSeed(randint(100, 100000)).setK(k).setPredictionCol("cluster").setFeaturesCol("'featureVector")
    pipeline = Pipeline().setStages([assembler, kmeans])
    pipeline_model = pipeline.fit(input_numeric_only)
    kmeans_model = pipeline_model.stages[-1]
    training_cost = kmeans_model.summary.trainingCost    
    return training_cost

for k in list(range(20,101,20)):
    print(k , clustering_score_1(numeric_only, k))

Use StandardScaler to standardize features
Set the Kmeans algorithm with specified number of features and parameters
Create a pipeline with VectorAssembler, StandardScaler, and KMeans estimator
Extract the KMeans model from the fitted pipeline model
Calculate training cost of the KMeans model

In [ ]:
from pyspark.ml. feature import StandardScaler
def clustering_score_2(input_data, k):
    input_numeric_only = input_data.drop ("protocol_type", "service", "flag")
    assembler = VectorAssembler.\
    setInputCols(input_numeric_only.columns [: -1]) .\
    setOutputCol ("'featureVector")
    scaler = StandardScaler().setInputCol ("featureVector"). \
    setOutputCol ("scaledFeatureVector"). \
    setWithStd (True). setWithMean (False)
    kmeans = KMeans().setSeed(randint (100,100000)) .\
    setK(k) .setMaxIter (40) .\
    setTol (1.0e-5) .setPredictionCol("cluster"). \
    setFeaturesCol ("scaledFeatureVector")
    pipeline =Pipeline().setStages([assembler, scaler, kmeans])
    pipeline_model = pipeline.fit(input_numeric_only)
    means_model = pipeline_model.stages [-1]
    training_cost = kmeans_model.summary.trainingCost
    return training_cost

for k in list (range (60, 271, 30)) :
    print(k, clustering_score_2 (numeric_only, k))

In [ ]:
from math import log

def entropy (counts) :
    values = [c for c in counts if (c > 0)]
    n = sum (values)
    p = [v/n for v in values]
    return sum([-1*(p_v) * log(p_v) for p_v in p])

Transform the data using the fitted pipeline model and select the cluster and label columns
Group the data by cluster and label, and count the occurrences of each combination
define a window partitioned by cluster for later use
Calculate the probability of each label within each cluster
Calculate the entropy and weighted cluster for each cluster 
Calculate the sum of weighted cluster entropy across all clusters
Calculate the weighted average cluster entropy by dividing by the total number of data points

In [ ]:
from pyspark.sql import functions as fun
from pyspark.sql import  Window

cluster_label = pipeline_model.transform (data).\
                select ("cluster", "label")
df = cluster_label.groupBy("cluster", "label").count().orderBy("cluster")
w = Window. partitionBy ("cluster")
p_col = df ['count'] / fun. sum (df ['count ']) . over (w)
with_p_col = df.withColumn ("p_col", p_col)
result = with_p_col.groupBy('cluster').agg((-fun.sum(col('p_col')*fun.log2(col('p_col')))).\
                                           alias('entropy'),
                                           fun.sum(col('count')).alias('cluster_size'))
result = result.withColumn ( 'weightedClusterEntropy',fun.col('entropy') * fun.col('cluster_size'))
weighted_cluster_entropy_avg = result.agg (fun. sum(col ( 'weightedClusterEntropy'))).\
    collect ()
weighted_cluster_entropy_avg [0] [0]/data. count ()

# Entity Resolution

Entity resolution, also known as record linkage or deduplication, is a data integration process that identifies 
and links records that refer to the same real-world entity across diverse data sources. The goal is to 
reconcile and merge information about entities, such as individuals or businesses, even when they are 
represented inconsistently or incompletely in different datasets. Entity resolution involves comparing and 
analyzing attributes like names, addresses, and other identifying information to determine the likelihood 
of a match. This process is crucial in various domains, including customer relationship management, 
healthcare, finance, and law enforcement, where accurate and consolidated data is essential. Advanced 
techniques, such as probabilistic matching and machine learning algorithms, are often employed to 
enhance the accuracy and efficiency of entity resolution in handling large and complex datasets.

### Data Loading and Preparation


We start by creating a SparkSession, the entry point to Spark functionality. Upon loading the data, the schema of the data is inferred and printed. After that, we cache the parsed DataFrame to optimize future operations. 

We display the first five rows of the DataFrame and perform a grouping operation on the 'is_match' column, counting the occurrences of each unique value and displaying the results in descending order. Finally, we create a temporary view named 'linkage' from the parsed DataFrame, which can be used to execute SQL queries on the DataFrame using Spark SQL.

We're performing the same operation as the previous PySpark DataFrame operation but using SQL syntax through the SparkSession. This query groups the DataFrame by the 'is_match' column, counts the occurrences, and orders the results by count in descending order. Finally, it displays the results with the count column aliased as 'cnt'.

In [ ]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

prev = spark.read.option("recursiveFileLookup","true").csv("data/linkage/donation/block_1.csv")

prev.show(3)

parsed = spark.read.option("header","true").option("nullValue","?").option("inferSchema","true").csv("data//linkage//donation//block_1.csv")
parsed.printSchema()
parsed.count()
parsed.cache()

parsed.show(5)

from pyspark.sql.functions import col
parsed.groupBy("is_match").count().orderBy(col("count").desc()).show()

parsed.createOrReplaceTempView("linkage")

spark.sql("""
    SELECT is_match,count(*) cnt
    FROM linkage
    GROUP BY is_match
    ORDER BY cnt DESC""").show()

### Summary Statistics

The code is conducting statistical summaries of specific columns within the DataFrame 'parsed'. 

1. `summary = parsed.describe()`: This line generates summary statistics for all numerical columns in the DataFrame 'parsed'. The resulting DataFrame 'summary' contains statistical summaries such as count, mean, standard deviation, min, and max.

2. `summary.select("summary","cmp_fname_c1","cmp_fname_c2").show()`: This line selects and displays summary statistics for the columns 'cmp_fname_c1' and 'cmp_fname_c2' from the DataFrame 'summary'. The 'summary' column provides information on what type of statistical summary it is (e.g., count, mean, stddev, min, and max).

3. `matches = parsed.where("is_match = true")`: This line filters the DataFrame 'parsed' to include only rows where the column 'is_match' is equal to 'true', thus selecting only the rows where matches occurred.

4. `match_summary = matches.describe()`: This line generates summary statistics for all numerical columns in the DataFrame 'matches', which now contains only the rows where matches occurred.

5. `misses = parsed.filter(col('is_match') == False)`: This line filters the DataFrame 'parsed' to include only rows where the column 'is_match' is equal to 'false', thus selecting only the rows where no matches occurred.

6. `miss_summary = misses.describe()`: This line generates summary statistics for all numerical columns in the DataFrame 'misses', which now contains only the rows where no matches occurred.

Overall, these operations help in understanding the distribution and characteristics of data related to matches and non-matches separately.

In [ ]:
summary = parsed.describe()
summary.select("summary","cmp_fname_c1","cmp_fname_c2").show()

matches = parsed.where("is_match = true")
match_summary = matches.describe()

misses = parsed.filter(col('is_match') == False)
miss_summary = misses.describe()

### Pivoting and Reshaping DataFrames

We begin by converting the PySpark DataFrame `summary` into a Pandas DataFrame `summary_p` to facilitate data manipulation using Pandas functions. Then, we transpose the DataFrame, set the index to 'summary', rename columns, and convert it back to a Spark DataFrame `summaryT`.

Next, we cast the metric columns to DoubleType to ensure numerical consistency and print the schema of the resulting DataFrame.

We define a function `pivot_summary(desc)` that performs similar operations as above but takes a DataFrame `desc` as input. It returns the transposed and type-casted Spark DataFrame `descT`.

We apply the `pivot_summary` function to both `match_summary` and `miss_summary` DataFrames to obtain `match_summaryT` and `miss_summaryT`, respectively.


In [ ]:
from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType

summary_p = summary.toPandas()
summary_p.head()
print(summary_p.shape)
summary_p = summary_p.set_index('summary').transpose().reset_index()
summary_p = summary_p.rename(columns={'index':'field'})
summary_p = summary_p.rename_axis(None, axis=1)
print(summary_p.shape)

summaryT = spark.createDataFrame(summary_p)

from pyspark.sql.types import DoubleType
for c in summaryT.columns:
    if c == 'field':
        continue
    summaryT = summaryT.withColumn(c, summaryT[c].cast(DoubleType()))

summaryT.printSchema()


def pivot_summary(desc):
    # convert to pandas dataframe
    desc_p = desc.toPandas()
    # transpose
    desc_p = desc_p.set_index('summary').transpose().reset_index()
    desc_p = desc_p.rename(columns={'index':'field'})
    desc_p = desc_p.rename_axis(None, axis=1)
    # convert to Spark dataframe
    descT = spark.createDataFrame(desc_p)
    # convert metric columns to double from string
    for c in descT.columns:
        if c == 'field':
            continue
        else:
            descT = descT.withColumn(c, descT[c].cast(DoubleType()))
    return descT

match_summaryT = pivot_summary(match_summary)
miss_summaryT = pivot_summary(miss_summary)

### Joining Dataframes and Selecting Good Features

We create temporary views for `match_summaryT` and `miss_summaryT` named "match_desc" and "miss_desc", respectively.

We execute a SQL query to join the two DataFrames on the `field` column, calculate the total count, and compute the difference in mean values between matches and misses for each field. The results are ordered by delta and total count in descending order.


In [ ]:
match_summaryT.createOrReplaceTempView("match_desc")
miss_summaryT.createOrReplaceTempView("miss_desc")

spark.sql("""
    SELECT a.field, a.count + b.count total, a.mean - b.mean delta
    FROM match_desc a INNER JOIN miss_desc b ON a.field = b.field
    WHERE a.field NOT IN ("id_1", "id_2")
    ORDER BY delta DESC, total DESC
""")

### Scoring and Model Evaluation 

We define a list of `good_features` and an expression `sum_expression` to calculate the sum of these features.

We start by filling null values with 0 in the columns specified in `good_features` in the DataFrame `parsed`. Then, we calculate a new column `score` based on the sum of values in the columns specified in `good_features`, and select only the `score` and `is_match` columns from the DataFrame `parsed`.

Next, we define a function named `crossTabs` that takes a DataFrame `scored` and a threshold `t` as input and returns a DataFrame.

Then, we compute cross-tabulations by grouping the DataFrame `scored` based on whether the `score` is above or equal to the threshold `t`, pivot the `is_match` column, and count the occurrences of each combination.

We calculate cross-tabulations using the function `crossTabs` for the threshold `4.0` and store the result in `cm1`, and similarly for the threshold `2.0`, storing the result in `cm2`.

Finally, we display cross-tabulations using the function `crossTabs` for the threshold `4.0`. 

In [ ]:
from pyspark.sql.functions import expr

good_features = ["cmp_lname_c1", "cmp_plz", "cmp_by", "cmp_bd", "cmp_bm"]
sum_expression = " + ".join(good_features)

scored = parsed.fillna(0,subset=good_features).withColumn('score',expr(sum_expression)).select('score','is_match')
scored.show()

def crossTabs(scored: DataFrame, t: DoubleType) -> DataFrame:
    return scored.selectExpr(f"score>={t} as above","is_match").groupBy("above").pivot("is_match",("true","false")).count()

cm1 = crossTabs(scored,4.0)
cm2 = crossTabs(scored,2.0)
crossTabs(scored,4.0).show()

# Decision Tree
Decision trees are a fundamental and widely used machine learning algorithm for both classification and regression tasks. They are intuitive, easy to understand, and can handle both numerical and categorical data.

The construction of a decision tree involves recursively splitting the dataset into subsets based on the value of the most informative feature at each step. This process continues until all instances in a node belong to the same class (in the case of classification) or until a stopping criterion is met (in the case of regression).

In [ ]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config('spark.driver.memory', '16g').appName('k_means').getOrCreate()

Read the dataset.
The dataset does not have a header row

In [ ]:
data_without_header = spark.read.option('inferSchema', True).option('header', False)\
    .csv('data/covtype.csv')

data_without_header.printSchema()

Define column names in colnames
Then Cast the Cover_Type column to Double

In [ ]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

# Define column names as a tuple
colnames = ("Elevation", "Aspect", "Slope",
            "Horizontal_Distance_To_Hydrology",
            "Vertical_Distance_To_Hydrology",
            "Horizontal_Distance_To_Roadways",
            "Hillshade_9am", "Hillshade_Noon",
            "Hillshade_3pm",
            "Horizontal_Distance_To_Fire_Points") + \
           [f"Wilderness_Area_{i}" for i in range(4)] + \
           [f"Soil_Type_{i}" for i in range(40)] + \
           ["Cover_Type"]

# Create DataFrame with specified column names
data = data_without_header.toDF(*colnames).withColumn("Cover_Type", col("Cover_Type").cast(DoubleType()))
data.head()


Define column names in colnames
Then Cast the Cover_Type column to Double

In [ ]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col

# Define column names as a tuple
colnames = ("Elevation", "Aspect", "Slope",
            "Horizontal_Distance_To_Hydrology",
            "Vertical_Distance_To_Hydrology",
            "Horizontal_Distance_To_Roadways",
            "Hillshade_9am", "Hillshade_Noon",
            "Hillshade_3pm",
            "Horizontal_Distance_To_Fire_Points") + \
           [f"Wilderness_Area_{i}" for i in range(4)] + \
           [f"Soil_Type_{i}" for i in range(40)] + \
           ["Cover_Type"]

# Create DataFrame with specified column names
data = data_without_header.toDF(*colnames).withColumn("Cover_Type", col("Cover_Type").cast(DoubleType()))
data.head()


Split the data to train and test in the ratio 0.9 to 0.1
Cache both the train and test data

In [ ]:
(train_data, test_data) = data.randomSplit([0.9, 0.1])
train_data.cache()
test_data.cache()

Define input columns as all columns except the last one
Create a VectorAssembler instance specifying input and output columns
Transform the train_data DataFrame using the VectorAssembler
Display the assembled feature vector column

In [ ]:
from pyspark.ml. feature import VectorAssembler
input_cols = colnames [: -1]
vector_assembler = VectorAssembler(inputCols=input_cols,outputCol="featureVector")
assembled_train_data = vector_assembler. transform (train_data)
assembled_train_data.select("featureVector").show(truncate = False)

Create a DecisionTreeClassifier instance
Fit the classifier to the assembled training data
Print the decision tree model's debug string

In [ ]:
from pyspark.ml. classification import DecisionTreeClassifier
classifier = DecisionTreeClassifier (seed = 1234, labelCol="Cover_Type",featuresCol="featureVector" predictionCol="prediction")
model = classifier. fit (assembled_train_data)
print (model. toDebugString)

Convert feature importances to a Pandas DataFrame
Then sort feature importances by importance in descending order
Finally print the Dataframe

In [ ]:
import pandas as pd
pd.DataFrame(model.featureImportances.toArray(),
             index=input_cols, columns=['importances']).\
            sort_values(by='importance', ascending=False)

Apply the trained model to the assembled training data to make predictions
Then Select and display Cover_Type , prediction and probability

In [ ]:
predictions = model.transform(assembled_train_data)
predictions.select('Cover_Type', 'prediction', 'probability').show(10, truncate=False)

Create a MulticlassClassificationEvaluator instance with specified parameters
Set the metric name to evaluate the accuracy
Set the metric name to evaluate the F1 score

In [ ]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator (labelCol="Cover_Type",predictioncol="prediction")
evaluator.setMetricName("accuracy").evaluate(predictions)
evaluator.setMetricName("f1").evaluate (predictions)

Group predictions by actual cover type and pivot on the predicted cover types
Then display the confusion matrix

In [ ]:
confusion_matrix = predictions.groupBy("Cover_Type"). \
    pivot ("prediction", range (1,8)).count().\ 
    na.fill(0.0).orderBy ("Cover _Type")

confusion_matrix.show()

Define a class probabilites function and count total number of instances in data
Group data by "Cover_Type", count instances, and calculate proportions
Return the probabilities as a list of tuples
Calculate class probabilities for train_data and test_data

In [ ]:
from pyspark.sql import DataFrame
def class_probabilities (data):
    total = data.count
    return data.groupBy("Cover_Type").count().orderBy("'Cover_Type").select (col ("count"). cast (DoubleType())) .\
    withColumn ("count _proportion", col("count") /total).\
    select ("count_proportion"). collect ()

train_prior_probabilites = class_probabilities(train_data)
test_prior_probabilites = class_probabilities(test_data)
test_prior_probabilites

Calculate the weighted average of prior probabilities by multiplying the prior probabilities of each class in the training data by the corresponding prior probabilities of each class in the testing data and summing up the results.

In [ ]:
train_prior_probabilites = [p[0] for p in train_prior_probabilites]
test_prior_probabilites = [p[0] for p in test_prior_probabilites]
sum([train_p * cv_p for train_p,cv_p in zip(train_prior_probabilites, test_prior_probabilites)])

Define a VectorAssembler to assemble input features into a single vector column
Define a DecisionTreeClassifier with specified parameters
Create a Pipeline with the VectorAssembler and DecisionTreeClassifier

In [ ]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier

assembler = VectorAssembler(inputCols=input_cols, outputCol="featureVector")

classifier = DecisionTreeClassifier(seed=1234, labelCol="Cover_Type",
                                    featuresCol="featureVector", predictionCol="prediction")
pipeline = Pipeline(stages=[assembler, classifier])


Define parameter grid for hyperparameter tuning
Create a MulticlassClassificationEvaluator for evaluation named multiclassEval

In [ ]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

paramGrid = (ParamGridBuilder()
             .addGrid(classifier.impurity, ["gini", "entropy"])
             .addGrid(classifier.maxDepth, [1, 20])
             .addGrid(classifier.maxBins, [40, 300])
             .addGrid(classifier.minInfoGain, [0.0, 0.05])
             .build())

multiclassEval = (MulticlassClassificationEvaluator()
                  .setLabelCol("Cover_Type")
                  .setPredictionCol("prediction")
                  .setMetricName("accuracy"))


Create a TrainValidationSplit instance for hyperparameter tuning
Fit the TrainValidationSplit on the training data

In [ ]:
from pyspark.ml.tuning import TrainValidationSplit

validator = TrainValidationSplit(seed=1234,
                                 estimator=pipeline,
                                 evaluator=multiclassEval,
                                 estimatorParamMaps=paramGrid,
                                 trainRatio=0.9)
validator_model = validator.fit(train_data)

Extract the best model from the TrainValidationSplit model
Extract and pprint the parameter map of the classifier stage

In [ ]:
from pprint import pprint
best_model = validator_model.bestModel
pprint(best_model.stages[1].extractParamMap())

Fit the TrainValidationSplit on the training data again
Get the validation metrics and estimator param maps from the fitted validator model
Combine metrics and params into a list of tuples
Sort the list of tuples based on validation metrics in descending orderDisplay the sorted list of tuples containing validation metrics and corresponding parameter maps


In [ ]:
validator_model = validator.fit(train_data)

metrics = validator_model.validationMetrics
params = validator_model.getEstimatorParamMaps()
metrics_and_params = list(zip(metrics, params))
metrics_and_params.sort(key=lambda x: x[0], reverse=True)
metrics_and_params

Sort the metrics list in reverse order and print the highest validation metric achieved

In [ ]:
metrics.sort(reverse= True)
print(metrics[0])

Evaluate the best model on the test data and print the accuracy

In [ ]:
multiclassEval.evaluate(best_model.transform(test_data))

# Recommendation System

A recommendation system is a software application that suggests items or content to users based on their preferences, behaviors, and historical interactions. It leverages algorithms to analyze user data and identify patterns, aiming to provide personalized and relevant recommendations. There are two primary types of recommendation systems: collaborative filtering, which recommends items based on the preferences of users with similar tastes, and content-based filtering, which suggests items similar to those the user has previously liked. Hybrid approaches combine these methods for more robust and 
accurate suggestions. Recommendation systems are widely used in e-commerce platforms, streaming services, social media, and other online applications to enhance user experience, engagement, and satisfaction by delivering tailored content or product suggestions.

### Data Loading and Preparation

The first step in building a model is to understand the data that is available, and parse or transform it into forms that are useful for analysis in Spark.

Spark MLlib’s ALS implementation does not strictly require numeric IDs for users and items, but is more efficient when the IDs are in fact representable as 32-bit integers. It’s advantageous to use Int to represent IDs, but this would mean that the IDs can’t exceed Int.MaxValue, or 2147483647. 

Code explanation:\
Each line of the file contains a user ID, an product ID, and a score. The json file is read and converted to a Spark DataFrame

In [ ]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import findspark

findspark.init()
spark = SparkSession.builder.getOrCreate()

ratings = spark.read.json("movies 1.json").select("user_id","product_id","score").cache()
ratings = ratings.head(10000)
ratings = spark.createDataFrame(ratings)

ratings.show(5)

### Splitting and Training the Pre-processed data

From pyspark.ml we import the following libraries: 
ALS: This module implements the Alternating Least Squares (ALS) algorithm, which is commonly used for collaborative filtering. 
RegressionEvaluator: It evaluates the performance of regression models.
StringIndexer: Used to convert categorical variables into numerical indices.
Pipeline: Allows you to chain multiple transformers and estimators into a single pipeline.

The indexers will convert string columns to numerical indices. A Pipeline object is created to execute all the stages (indexers) in order. The fit() method of the pipeline is called with the original ratings DataFrame to fit the indexers, and then the transform() method is called to transform the data. This results in a new DataFrame ratings_indexed where the "user_id" and "product_id" columns are replaced with their respective numerical indices.

The randomSplit() method is used to split the indexed ratings data into training and validation sets in an 8:2 ratio. An ALS object is created with specified parameters. A RegressionEvaluator object is created to evaluate the model's performance. It will calculate the RMSE (Root Mean Squared Error) between the predicted ratings and the actual ratings. The ALS model is trained using the fit() method with the training data.

The trained model is used to generate predictions on the validation data using the transform() method. The predictions DataFrame is then displayed, showing the first 10 rows.

In [ ]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

indexers = [
    StringIndexer(inputCol=column, outputCol=column+"_index").fit(ratings)
    for column in ["user_id", "product_id"]
]

pipeline = Pipeline(stages=indexers)
ratings_indexed = pipeline.fit(ratings).transform(ratings)

training_data,validation_data = ratings_indexed.randomSplit([8.0,2.0])

als = ALS(userCol="user_id_index",itemCol="product_id_index",ratingCol="score",rank=10,maxIter=5,regParam=0.01,coldStartStrategy="drop")
evaluator = RegressionEvaluator(metricName="rmse",labelCol="score",predictionCol="prediction")

model = als.fit(training_data)
predictions=model.transform(validation_data)
predictions.show(10,False)

### ALS algorithm for collaborative filtering

Collaborative filtering is a popular technique used in recommendation systems to generate personalized recommendations for users based on their interactions with items (e.g., products, movies, articles) and similarities with other users. It operates on the principle of leveraging the collective wisdom of users to make predictions about preferences. There are two main types of collaborative filtering:

1. **User-based Collaborative Filtering:** This approach recommends items to a user based on the preferences of similar users. It identifies users with similar behaviors or tastes and suggests items that they have liked or interacted with.

2. **Item-based Collaborative Filtering:** In this method, recommendations are made by identifying similar items to the ones a user has interacted with. It looks for items that have been liked or rated similarly by other users and suggests them to the target user.

Collaborative filtering does not require explicit knowledge of item characteristics or user preferences but relies on observed interactions between users and items. It is widely used in e-commerce platforms, streaming services, and content recommendation systems to enhance user experience and engagement.


We first filter the `validation_data` DataFrame to select data related to a specific user with `user_id_index` equal to 1.0. Then, we apply the ALS model to generate recommendations for this user based on their interactions with products. The resulting recommendations are ordered by prediction score in descending order and displayed.

In [ ]:
user1 = validation_data.filter(validation_data['user_id_index']==1.0).select(['product_id','user_id','user_id_index','product_id_index'])
user1.show()
recommendations = model.transform(user1) 
recommendations.orderBy('prediction',ascending=False).show()

### Evaluate the performance of the recommendation model using RMSE and MAE

We calculate the Root Mean Squared Error (RMSE) to evaluate the model's performance in predicting user ratings. The RMSE measures the differences between predicted and actual ratings, providing an overall assessment of model accuracy. Additionally, we compute the Mean Absolute Error (MAE) as an alternative evaluation metric, which represents the average absolute difference between predicted and actual ratings.

Root Mean Squared Error (RMSE):
RMSE = sqrt(Σ(predicted_rating - actual_rating)^2 / n)

Mean Absolute Error (MAE):
MAE = Σ|predicted_rating - actual_rating| / n

Here:
- predicted_rating: the predicted rating by the recommendation model.
- actual_rating: the actual rating provided by the user.
- n: the total number of predictions.

In [ ]:
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) = {rmse}")

# Additional Evaluation Metric: Mean Absolute Error (MAE)
evaluator_mae = RegressionEvaluator(
    metricName="mae",
    labelCol="score",
    predictionCol="prediction"
)

mae = evaluator_mae.evaluate(predictions)
print(f"Mean Absolute Error (MAE) = {mae}")

# Monte Carlo Simulation

Financial stock risk analysis involves evaluating the potential uncertainties and volatility associated with investing in a particular stock or the overall market. Investors and analysts use various tools and methodologies to assess factors that may impact stock prices, such as market trends, economic indicators, company performance, and geopolitical events. Quantitative measures, including standard deviation, beta, and value at risk (VaR), are often employed to quantify and analyze the level of risk in a stock or portfolio. Fundamental analysis examines a company's financial health, earnings, and growth potential, while technical analysis studies historical price and trading volume patterns. Additionally, sentiment analysis considers market sentiment and news to gauge investor behavior. By conducting comprehensive risk analysis, investors can make more informed decisions, develop risk management strategies, and optimize their investment portfolios in response to changing market conditions.

In [ ]:
import pyspark
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings("ignore")
spark = SparkSession.builder.config("spark.driver.memory", "4g").appName('MCS').getOrCreate()

### Data Loading and Preparation

We start by loading the stock and factor data into Spark DataFrames. The `stocks` DataFrame contains information about various stocks, while the `factors` DataFrame contains additional factors that may influence stock prices. Both DataFrames are read from CSV files located in the specified directories.

We then perform some preprocessing steps on the `stocks` and `factors` DataFrames, including extracting the stock symbols from file paths, filtering the data based on date ranges, and converting date formats.

After preparing the data, we convert the Spark DataFrames to Pandas DataFrames for further analysis.

In [ ]:
from pyspark.sql import functions as fun
from pyspark.sql import Window
from datetime import datetime

stocks = spark.read.csv(["data/stocksA/ABAX.csv","data/stocksA/AAME.csv","data/stocksA/AEPI.csv"], header='true', inferSchema='true')

stocks = stocks.withColumn("Symbol", fun.input_file_name()).withColumn("Symbol",fun.element_at(fun.split("Symbol", "/"), -1))\
                .withColumn("Symbol",fun.element_at(fun.split("Symbol", "\."), 1))
stocks.show(2)
factors = spark.read.csv(["data/stocks/ABAX.csv","data/stocks/AAME.csv","data/stocks/AEPI.csv"], header='true', inferSchema='true')
factors = factors.withColumn("Symbol", fun.input_file_name()).withColumn("Symbol",fun.element_at(fun.split("Symbol", "/"), -1)).\
                    withColumn("Symbol",fun.element_at(fun.split("Symbol", "\."), 1))

stocks = stocks.withColumn('count', fun.count('Symbol').over(Window.partitionBy('Symbol'))).filter(fun.col('count') > 260*5 + 10)
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
stocks = stocks.withColumn('Date',fun.to_date(fun.to_timestamp(fun.col('Date'),'dd-MMM-yy')))
stocks.printSchema()

stocks = stocks.filter(fun.col('Date') >= datetime(2009, 10, 23)).filter(fun.col('Date') <= datetime(2014, 10, 23))
factors = factors.withColumn('Date',fun.to_date(fun.to_timestamp(fun.col('Date'),'dd-MMM-yy')))
factors = factors.filter(fun.col('Date') >= datetime(2009, 10, 23)).filter(fun.col('Date') <= datetime(2014, 10, 23))
factors.show(2)

stocks_pd_df = stocks.toPandas()
factors_pd_df = factors.toPandas()
factors_pd_df.head(5)

### Determining the Factor Weights

Here, we determine the factor weights for each stock using linear regression. We calculate the returns of stocks and factors over a rolling window of 10 steps, then apply linear regression to estimate the coefficients of factors contributing to stock returns.


1. **Defining Variables and Functions:**
      - Here, we define variables and a custom function necessary for our calculations.


2. **Calculating Rolling Returns:**
      - Rolling returns for both stocks and factors are calculated using a rolling window approach.


3. **Combining Returns with DataFrames:**
      - Rolling returns are combined with their respective DataFrames for further analysis.


4. **Merging DataFrames and Preprocessing:**
      - The DataFrames are merged, and preprocessing steps, such as handling missing values, are performed.


5. **Linear Regression and Coefficients Calculation:**
     - Finally, linear regression is performed, and coefficients are calculated for each stock.


Renames columns and organizes the DataFrame for clarity and readability.
Returns the DataFrame containing OLS coefficients per stock.


In [ ]:
import pandas as pd
from sklearn.linear_model import LinearRegression
import warnings
warnings.filterwarnings("ignore")

n_steps = 10
def my_fun(x):
    return ((x.iloc[-1] - x.iloc[0]) / x.iloc[0])

stock_returns = stocks_pd_df.groupby('Symbol').Close.rolling(window=n_steps).apply(my_fun)
factors_returns = factors_pd_df.groupby('Symbol').Close.rolling(window=n_steps).apply(my_fun)
stock_returns = stock_returns.reset_index().sort_values('level_1').reset_index()
factors_returns = factors_returns.reset_index().sort_values('level_1').reset_index()

stocks_pd_df_with_returns = stocks_pd_df.assign(stock_returns = stock_returns['Close'])
factors_pd_df_with_returns = factors_pd_df.assign(factors_returns = \
factors_returns['Close'],factors_returns_squared = factors_returns['Close']**2)
factors_pd_df_with_returns = factors_pd_df_with_returns.pivot(index='Date',columns='Symbol',values=['factors_returns', 'factors_returns_squared'])
factors_pd_df_with_returns.columns = factors_pd_df_with_returns.columns.to_series().str.join('_').reset_index()[0]
factors_pd_df_with_returns = factors_pd_df_with_returns.reset_index()
print(factors_pd_df_with_returns.head(1))
print(factors_pd_df_with_returns.columns)

# For each stock, create input DF for linear regression training
stocks_factors_combined_df = pd.merge(stocks_pd_df_with_returns,factors_pd_df_with_returns,how="left", on="Date")
feature_columns = list(stocks_factors_combined_df.columns[-6:])

with pd.option_context('mode.use_inf_as_na', True):
    stocks_factors_combined_df = stocks_factors_combined_df.dropna(subset=feature_columns + ['stock_returns'])

def find_ols_coef(df):
    y = df[['stock_returns']].values
    X = df[feature_columns]
    regr = LinearRegression()
    regr_output = regr.fit(X, y)
    return list(df[['Symbol']].values[0]) + list(regr_output.coef_[0])

coefs_per_stock = stocks_factors_combined_df.groupby('Symbol').apply(find_ols_coef)
coefs_per_stock = pd.DataFrame(coefs_per_stock).reset_index()
coefs_per_stock.columns = ['symbol', 'factor_coef_list']
coefs_per_stock = pd.DataFrame(coefs_per_stock.factor_coef_list.tolist(),index=coefs_per_stock.index,columns = ['Symbol'] + feature_columns)
coefs_per_stock

### Sampling

In this section, we sample from factor returns data to analyze return distributions for each factor. Kernel Density Estimation (KDE) plots visualize return distributions for each factor, and we calculate the correlation matrix between sampled factors.

In [ ]:
from numpy.random import multivariate_normal

samples = factors_returns.loc[factors_returns.Symbol == \
factors_returns.Symbol.unique()[0]]['Close']
samples.plot.kde()

f_1 = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[0]]['Close']
f_2 = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[1]]['Close']
f_3 = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[2]]['Close']
print(f_1.size,len(f_2),f_3.size)
pd.DataFrame({'f1': list(f_1)[1:1040], 'f2': list(f_2)[1:1040], 'f3':list(f_3)}).corr()

factors_returns_cov = pd.DataFrame({'f1': list(f_1)[1:1040],'f2': list(f_2)[1:1040],'f3': list(f_3)}).cov().to_numpy()
factors_returns_mean = pd.DataFrame({'f1': list(f_1)[1:1040],'f2': list(f_2)[1:1040],'f3': list(f_3)}).mean()

multivariate_normal(factors_returns_mean, factors_returns_cov)

### Running the Trials

Finally, we conduct Monte Carlo simulations to generate trial returns for each stock. We use a multivariate normal  distribution to generate random factors based on the mean and covariance of factor returns. Subsequently, we apply coefficients obtained from linear regression to calculate expected returns for each stock under different scenarios. The trials are executed in parallel across multiple threads for efficiency.


In [ ]:
from pyspark.sql.types import IntegerType
import random
from numpy.random import seed
from pyspark.sql.types import LongType, ArrayType, DoubleType
from pyspark.sql.functions import udf
from pyspark.sql.functions import col, explode

b_coefs_per_stock = spark.sparkContext.broadcast(coefs_per_stock)
b_feature_columns = spark.sparkContext.broadcast(feature_columns)
b_factors_returns_mean = spark.sparkContext.broadcast(factors_returns_mean)
b_factors_returns_cov = spark.sparkContext.broadcast(factors_returns_cov)

parallelism = 1000
num_trials = 1000000
base_seed = 1496
seeds = [b for b in range(base_seed,
base_seed + parallelism)]
seedsDF = spark.createDataFrame(seeds, IntegerType())
seedsDF = seedsDF.repartition(parallelism)

def calculate_trial_return(x):
    trial_return_list = []
    for i in range(int(num_trials/parallelism)):
        random_int = random.randint(0, num_trials*num_trials)
        seed(x)
        random_factors = multivariate_normal(b_factors_returns_mean.value,
        b_factors_returns_cov.value)
        coefs_per_stock_df = b_coefs_per_stock.value
        returns_per_stock = (coefs_per_stock_df[b_feature_columns.value] *(list(random_factors) + list(random_factors**2)))
        trial_return_list.append(float(returns_per_stock.sum(axis=1).sum(), b_coefs_per_stock.value.size))
    return trial_return_list

udf_return = udf(calculate_trial_return, ArrayType(DoubleType()))

def calculate_trial_return(x):
    trial_return_list = []
    for i in range(int(num_trials / parallelism)):
        random_int = random.randint(0, num_trials * num_trials)
        seed(x)
        random_factors = multivariate_normal(b_factors_returns_mean.value, b_factors_returns_cov.value)
        coefs_per_stock_df = b_coefs_per_stock.value
        returns_per_stock = (coefs_per_stock_df[b_feature_columns.value] * (list(random_factors) + list(random_factors**2)))
        trial_return_list.append(float(returns_per_stock.sum(axis=1).sum()) / b_coefs_per_stock.value.size)
        trial_return_list.append(float(returns_per_stock.sum(axis=1).sum()) / b_coefs_per_stock.value.size)
    return trial_return_list
udf_return = udf(calculate_trial_return, ArrayType(DoubleType()))

trials = seedsDF.withColumn("trial_return", udf_return(col("value")))
trials = trials.select('value', explode('trial_return').alias('trial_return'))
trials.cache()

### Evaluation of Trial Returns

In this section, we evaluate the trial returns generated from Monte Carlo simulations. We calculate the 5th percentile of trial returns as a measure of downside risk and compute the average trial return for further analysis.

### Evaluation of Trial Returns

In this section, we evaluate the trial returns generated from Monte Carlo simulations. We calculate the 5th percentile of trial returns as a measure of downside risk and compute the average trial return for further analysis.

In [ ]:
trials.approxQuantile('trial_return', [0.05], 0.0)

trials.orderBy(col('trial_return').asc()).limit(int(trials.count()/20)).agg(fun.avg(col("trial_return"))).show()