# ST446 Assignment 2
--------------------------------------

# P1: Topic modelling on Wikipedia data

#### Download Wikipedia dump file

This code will download the latest Wikipedia dump into an RDD consisting of string elements, each element corresponding to the text of a Wikipedia page. Precisely, each RDD element will be a string equal to the text enclosed within `<page>` and `</page>` XML tags in the input dataset with carriage-return characters (`\n`) removed. 

In [1]:
# Wikipedia dump
# we use the enwiki-latest-pages-articles1.xml-p1p41242.bz2 dump
#!wget https://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles1.xml-p1p41242.bz2
# decompress the file
#!bzip2 -d enwiki-latest-pages-articles1.xml-p1p41242.bz2
# rename the uncompressed file to an XML format
#!mv enwiki-latest-pages-articles1.xml-p1p41242 enwiki-latest-pages-articles1.xml
# move to Hadoop Namenode
#!hadoop fs -put enwiki-latest-pages-articles1.xml /

In [2]:
# IMPORTANT: adjust to reflect the cluster name and Hadoop masternode (IP port) of your cluster
path = "hdfs://st446-assignment2-cluster-m:8020/enwiki-latest-pages-articles1.xml"

In [3]:
# initial Dataframe
# pages are separated by <page> and </page>
df1 = spark.read.text(path, lineSep="</page>")
df1.limit(10).toPandas()

Unnamed: 0,value
0,"<mediawiki xmlns=""http://www.mediawiki.org/xml..."
1,\n <page>\n <title>Anarchism</title>\n ...
2,\n <page>\n <title>AfghanistanHistory</tit...
3,\n <page>\n <title>AfghanistanGeography</t...
4,\n <page>\n <title>AfghanistanPeople</titl...
5,\n <page>\n <title>AfghanistanCommunicatio...
6,\n <page>\n <title>AfghanistanTransportati...
7,\n <page>\n <title>AfghanistanMilitary</ti...
8,\n <page>\n <title>AfghanistanTransnationa...
9,\n <page>\n <title>AssistiveTechnology</ti...


In [4]:
# accessing RDD element attached to the Dataframe
rdd1 = df1.rdd

In [5]:
# first element
rdd1.take(1)

[Row(value='<mediawiki xmlns="http://www.mediawiki.org/xml/export-0.10/" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mediawiki.org/xml/export-0.10/ http://www.mediawiki.org/xml/export-0.10.xsd" version="0.10" xml:lang="en">\n  <siteinfo>\n    <sitename>Wikipedia</sitename>\n    <dbname>enwiki</dbname>\n    <base>https://en.wikipedia.org/wiki/Main_Page</base>\n    <generator>MediaWiki 1.42.0-wmf.22</generator>\n    <case>first-letter</case>\n    <namespaces>\n      <namespace key="-2" case="first-letter">Media</namespace>\n      <namespace key="-1" case="first-letter">Special</namespace>\n      <namespace key="0" case="first-letter" />\n      <namespace key="1" case="first-letter">Talk</namespace>\n      <namespace key="2" case="first-letter">User</namespace>\n      <namespace key="3" case="first-letter">User talk</namespace>\n      <namespace key="4" case="first-letter">Wikipedia</namespace>\n      <namespace key="5" case="first-letter">Wikipedi

---

## P1.1 Creating a document corpus

#### i) Dataframe creation (and pre-processing)

In [6]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql import Row
from pyspark.sql.functions import col
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import size
from pyspark.ml.feature import CountVectorizer
import re

In [7]:
spark = SparkSession.builder.appName("WikipediaTopicModeling").getOrCreate()

# Access the existing SparkContext from the SparkSession
sc = spark.sparkContext


In [8]:
# Load the Wikipedia dump into an RDD
wikiDataRDD = rdd1


In [9]:
#print(wikiDataRDD.take(5))

In [10]:

def data_checks(data):
    # Extract the string from the tuple if necessary
    text = data[0] if isinstance(data, tuple) else data
    
    if not isinstance(text, str):
        return ''
    
    return text

checkedRDD = wikiDataRDD.map(data_checks)


In [11]:
#print(checkedRDD.take(5))

In [12]:

def parse_page_and_preclean_text(xml):
    
    text = xml[0] if isinstance(xml, tuple) else xml
    
    if not isinstance(text, str):
        return None
        #return ""
    
    title_match = re.search('<title>(.*?)</title>', xml)
    title = title_match.group(1) if title_match else None

    id_match = re.search('<id>(.*?)</id>', xml, re.IGNORECASE)
    page_id = id_match.group(1) if id_match else None

    text_match = re.search('<text.*?>(.*?)</text>', xml, re.DOTALL)
    text = text_match.group(1) if text_match else None

    
    if isinstance(text,str):
        categories = re.findall(r'\[\[Category:(.*?)(?:\|.*)?\]\]', text, re.IGNORECASE)
    else:
        categories = ['NONE']
        
    text = re.sub(r'<!--.*?-->', '', text)
    text = re.sub(r"{{.*?}}", "", text)
    text = re.sub(r"\[\[File:.*?\]\]", "", text)
    text = re.sub(r"\[\[Image:.*?\]\]", "", text)
    text = re.sub(r"\n: ''.*?''", "", text)
    text = re.sub(r"\n!.*", "", text)
    text = re.sub(r"^:''.*", "", text)
    text = re.sub(r"&nbsp", "", text)
    text = re.sub(r"http\S+", "", text)
    text = re.sub(r"\d+", "", text)
    text = re.sub(r"\(.*?\)", "", text)
    text = re.sub(r"Category:.*", "", text)
    text = re.sub(r"\| .*", "", text)
    text = re.sub(r"\n\|.*", "", text)
    text = re.sub(r"\n \|.*", "", text)
    text = re.sub(r".* \|\n", "", text)
    text = re.sub(r".*\|\n", "", text)
    text = re.sub(r"{{Infobox.*", "", text)
    text = re.sub(r"{{infobox.*", "", text)
    text = re.sub(r"{{taxobox.*", "", text)
    text = re.sub(r"{{Taxobox.*", "", text)
    text = re.sub(r"{{ Infobox.*", "", text)
    text = re.sub(r"{{ infobox.*", "", text)
    text = re.sub(r"{{ taxobox.*", "", text)
    text = re.sub(r"{{ Taxobox.*", "", text)
    text = re.sub(r"\* .*", "", text)
    text = re.sub(r"<.*?>", "", text)
    text = re.sub(r"\n", "", text)
    text = re.sub(r"\!|\"|\#|\$|\%|\&|\'|\(|\)|\*|\+|\,|\-|\.|\/|\:|\;|\|\?|\@|\[|\\|\]|\^|\_|\`|\{|\||\}|\~", " ", text)
    text = re.sub(r" +", " ", text)
    text = re.sub(r'== See also ==.*', '', text, flags=re.DOTALL)
    text = re.sub(r'<ref>.*?</ref>', '', text)
    text = re.sub(r'&lt;|&gt;|&quot;|&amp;', '', text)
    text = re.sub(r'\[\[Category:.*?\]\]', '', text)
    text = re.sub(r'\{\{.*?\}\}', '', text)
    text = re.sub(r'<ref>.*?</ref>', '', text, flags=re.DOTALL)
    text = re.sub(r'\[http[^\]]*\]', '', text)
    text = re.sub(r'\n+', ' ', text)
    text = re.sub(r"'{2,}", '', text) # Remove bold/italic markup
    text = re.sub(r"\[\[(?:[^\]|]*\|)?([^\]]+)\]\]", r'\1', text) 
    text = re.sub(r"\s{2,}|\n", " ", text).strip()
        
    # Limit text to first 3000 words
    text = " ".join(text.split()[:3000]) if text else None
    
    return Row(title=title, page_id=page_id, text=text, categories=categories)

# Apply the parsing function to each element in the RDD
parsedRDD_raw = checkedRDD.map(parse_page_and_preclean_text).filter(lambda x: x is not None)



As wikipedia pages are quite difficult to clean, the list of predefined regular expressions is used: [Topic Modelling Part 1: Creating Article Corpus from Simple Wikipedia Dump by Abhijeet Kumar](https://appliedmachinelearning.wordpress.com/2017/08/28/topic-modelling-part-1-creating-article-corpus-from-simple-wikipedia-dump/)


In [13]:
pagesWithCategoriesRDD = parsedRDD_raw.filter(lambda row: len(row.categories) > 0)


In [14]:
schema = StructType([
    StructField("title", StringType(), True),
    StructField("page_id", StringType(), True),  
    StructField("text", StringType(), True),
    StructField("categories", ArrayType(StringType()), True)
])

corpusDF = spark.createDataFrame(pagesWithCategoriesRDD, schema=schema)

In [15]:
corpusDF = corpusDF.filter(
    col("title").isNotNull() & 
    (col("text") != "")
)

corpusDF = corpusDF.filter((col("categories").isNotNull()) & (size(col("categories")) > 0))


In [16]:
from pyspark.sql.functions import split

corpusDF = corpusDF.filter(size(split(col("text"), " ")) > 10)


#### ii) Top 20 rows of the corpusDF Dataframe

In [17]:

# Filter out rows where 'text' column starts with "REDIRECT" or contains "disambiguation}}"
corpusDF = corpusDF.filter(~col("text").startswith("REDIRECT") & ~col("text").contains("disambiguation}}"))


# Show the cleaned DataFrame to verify the results
corpusDF.select("page_id", "title", "text", "categories").show(20)


+-------+--------------------+--------------------+--------------------+
|page_id|               title|                text|          categories|
+-------+--------------------+--------------------+--------------------+
|     12|           Anarchism|Anarchism is a po...|[Anarchism, Anti-...|
|     39|              Albedo|The map shows the...|[Land surface eff...|
|    290|                   A|A sharp A or a is...|[ISO basic Latin ...|
|    303|             Alabama|Alabama lt ref gt...|[Alabama, 1819 es...|
|    305|            Achilles|In Greek mytholog...|[Greek mythologic...|
|    307|     Abraham Lincoln|Abraham Lincoln w...|[Abraham Lincoln,...|
|    308|           Aristotle|collapsible list ...|[Aristotle, Arist...|
|    309|An American in Paris|An American in Pa...|[1928 composition...|
|    316|Academy Award for...|The Academy Award...|[Academy Awards, ...|
|    324|      Academy Awards|The Academy Award...|[Academy Awards, ...|
|    330|             Actrius|Actresses is a Ca...|

#### iii) Applying pre-processing steps to parse the data, removing stop words and any other special symbols, and creating feature vectors and vocabulary

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
import string
import nltk
from nltk.stem import WordNetLemmatizer

# Assuming necessary NLTK data is downloaded
# Initialize Spark session
spark = SparkSession.builder.appName("Wikipedia Processing").getOrCreate()

# Define the custom processing function
additional_stop_words = {'lt', 'gt', 'ref', 'quot', 'amp', 'nbsp', 'p', 'pp'}
stop_words = set(nltk.corpus.stopwords.words('english')).union(additional_stop_words)
table = str.maketrans('', '', string.punctuation)
lmtzr = WordNetLemmatizer()

def custom_processing(words):
    cleaned_tokens = [w.lower().translate(table) for w in words]
    filtered_tokens = [w for w in cleaned_tokens if w.isalpha() and w not in stop_words]
    lemmatized_tokens = [lmtzr.lemmatize(w) for w in filtered_tokens]
    return lemmatized_tokens

# Register the UDF
custom_processing_udf = udf(custom_processing, ArrayType(StringType()))

# Tokenize the text
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tokenized_df = tokenizer.transform(corpusDF)

# Apply the custom UDF to process tokens
processed_tokens_df = tokenized_df.withColumn("processed_tokens", custom_processing_udf(col("tokens")))



In [19]:
from pyspark.sql.functions import col, when
from pyspark.sql.functions import array

# it does not work performance wise with the full data unfortunately. I have not been able to find the mistake but tried several settings for minDF etc.
#processed_tokens_sample = processed_tokens_df.sample(False, 0.1) # also does not finish processing (1h)
processed_tokens_sample = processed_tokens_df.limit(20) # works

# Replace null values in 'filtered_words' with an empty list (assuming it's a list of tokens)
processed_tokens_df = processed_tokens_sample.withColumn("processed_tokens", when(col("processed_tokens").isNull(), array()).otherwise(col("processed_tokens")))



In [20]:

# Setup CountVectorizer
cv = CountVectorizer(inputCol="processed_tokens", outputCol="features", minDF=10)

# Fit and transform the data
cv_model = cv.fit(processed_tokens_sample)
result_df = cv_model.transform(processed_tokens_sample)


#### iv) Showing the first 20 feature vectors and corresponding vocabulary entries

In [21]:
# Show the results
result_df.select("page_id", "features").show(20)

+-------+--------------------+
|page_id|            features|
+-------+--------------------+
|     12|(64,[1,2,3,4,6,7,...|
|     39|(64,[0,2,4,6,7,8,...|
|    290|(64,[0,2,3,4,6,7,...|
|    303|(64,[0,1,2,3,4,5,...|
|    305|(64,[0,1,2,3,4,6,...|
|    307|(64,[0,1,2,3,4,5,...|
|    308|(64,[0,1,2,3,4,6,...|
|    309|(64,[0,1,2,3,4,5,...|
|    316|(64,[0,3,9,10,17,...|
|    324|(64,[0,1,2,3,4,5,...|
|    330|(64,[0,2,3,5,7,13...|
|    332|(64,[4,5,7,8,9,10...|
|    334|(64,[0,1,2,3,4,6,...|
|    336|(64,[0,1,2,3,4,6,...|
|    339|(64,[0,1,2,3,4,5,...|
|    340|(64,[0,1,3,5,7,13...|
|    344|(64,[0,1,2,3,4,5,...|
|    358|(64,[0,1,2,3,4,6,...|
|    359|(64,[0,1,2,3,4,5,...|
|    569|(64,[0,1,2,3,4,5,...|
+-------+--------------------+



In [24]:

# Show the vocabulary
vocabulary = cv_model.vocabulary
print("Vocabulary:", vocabulary[:20])

Vocabulary: ['name', 'state', 'time', 'first', 'one', 'american', 'form', 'also', 'new', 'year', 'th', 'century', 'people', 'work', 'may', 'united', 'many', 'science', 'used', 'study']


---

## P1.2 Perform topic modelling

#### i) Identifying 10 topics across all Wikipedia pages and no less than 6 words describing each topic. Showing the top 6 words and corresponding weights for each topic 

In [27]:
from pyspark.ml.clustering import LDA

# Specify the number of topics
num_topics = 10

# Initialize LDA with the desired number of topics
lda = LDA(k=num_topics, maxIter=10)

# Fit the LDA model to your data
lda_model = lda.fit(result_df)

# Extract topic distributions
topics = lda_model.describeTopics(maxTermsPerTopic=6)

# Show the topics with words
vocab_list = cv_model.vocabulary
topics_rdd = topics.rdd

# Map topic indices to words
topics_words = topics_rdd\
    .map(lambda row: (row[0], list(zip([vocab_list[word_idx] for word_idx in row[1]], row[2]))))\
    .collect()

# Print topics
for topic in topics_words:
    print(f"Topic {topic[0]}:")
    for word, weight in topic[1]:
        print(f"    {word}: {weight}")
    print("\n")


Topic 0:
    form: 0.13405627798106887
    time: 0.06764501697683707
    used: 0.041258257564561036
    science: 0.030883014775732326
    one: 0.030689576588536077
    many: 0.03004463638564091


Topic 1:
    including: 0.019699047808233176
    new: 0.018335378979625106
    word: 0.018203020804433662
    member: 0.018019298726935938
    two: 0.017866489950531896
    several: 0.017751519613775503


Topic 2:
    modern: 0.01957354860870912
    year: 0.01942293264302994
    known: 0.019068745190909484
    became: 0.018838649078161875
    long: 0.018099257566906803
    among: 0.017707667044135084


Topic 3:
    american: 0.020103015804952693
    since: 0.01932282976899238
    th: 0.018929940160454562
    day: 0.018504321269705526
    many: 0.017794612510482158
    world: 0.017579242013252862


Topic 4:
    well: 0.02002803206411524
    link: 0.018451846775392723
    used: 0.018279177154636127
    second: 0.018070472440232715
    origin: 0.01796578771803066
    since: 0.017642693054614775



#### ii) Printing title, topic and corresponding words for the first 10 pages in the Dataframe

In [28]:

# Transform the vectorized DataFrame to get topic distributions
transformed_df = lda_model.transform(result_df)

# Select the first 10 rows, titles, and topic distribution
first_10_pages = transformed_df.select("title", "topicDistribution").take(10)

# Get the topic words from the LDA model
topics_words = lda_model.describeTopics(maxTermsPerTopic=6)
vocab_list = cv_model.vocabulary
topics_words_rdd = topics_words.rdd
topics_descriptions = topics_words_rdd\
    .map(lambda row: (row[0], list(zip([vocab_list[word_idx] for word_idx in row[1]], row[2]))))\
    .collect()

# Dictionary to map topic index to words
topics_dict = {topic[0]: topic[1] for topic in topics_descriptions}

for page in first_10_pages:
    title = page['title']
    # Get the index of the highest topic weight
    dominant_topic_index = page['topicDistribution'].argmax()
    # Get the words for the dominant topic
    topic_words = topics_dict[dominant_topic_index]
    # Print the information
    print(f"Title: {title}")
    print(f"Dominant Topic: {dominant_topic_index}")
    print("Words:", ", ".join([f"{word}: {weight:.4f}" for word, weight in topic_words]))
    print("\n")


Title: Anarchism
Dominant Topic: 0
Words: form: 0.1341, time: 0.0676, used: 0.0413, science: 0.0309, one: 0.0307, many: 0.0300


Title: Albedo
Dominant Topic: 5
Words: name: 0.1536, state: 0.1198, american: 0.0473, first: 0.0396, time: 0.0372, united: 0.0323


Title: A
Dominant Topic: 0
Words: form: 0.1341, time: 0.0676, used: 0.0413, science: 0.0309, one: 0.0307, many: 0.0300


Title: Alabama
Dominant Topic: 5
Words: name: 0.1536, state: 0.1198, american: 0.0473, first: 0.0396, time: 0.0372, united: 0.0323


Title: Achilles
Dominant Topic: 5
Words: name: 0.1536, state: 0.1198, american: 0.0473, first: 0.0396, time: 0.0372, united: 0.0323


Title: Abraham Lincoln
Dominant Topic: 5
Words: name: 0.1536, state: 0.1198, american: 0.0473, first: 0.0396, time: 0.0372, united: 0.0323


Title: Aristotle
Dominant Topic: 0
Words: form: 0.1341, time: 0.0676, used: 0.0413, science: 0.0309, one: 0.0307, many: 0.0300


Title: An American in Paris
Dominant Topic: 5
Words: name: 0.1536, state: 0.1198,

#### iii) Analyse and discuss whether the topic and words actually represent (or approximate) the title and categories of each page.

#### Conclusion
While the LDA model provides an interesting overview of the thematic structure within the Wikipedia dataset, the analysis reveals that the topics and their corresponding words might not always represent or accurately approximate the specific title and categories of each page. This suggests a need for further refinement of the topic modeling process, such as adjusting the number of topics, incorporating more nuanced text processing, or applying domain-specific adjustments to better capture the essence of each Wikipedia page.


# P2: Big data programming (PDF version in GitHub Repo)

## P2.1 MapReduce programming

#### i) Provide a code snippet for a `map()` and `reduce()` functions showing how you would determine the maximum temperature each year, along with a brief explanation of your code.

In [46]:
# Define the map function
def map_temperature(record):
    date, temperature = record.split('; ')
    year = date.split('-')[0]
    return (year, int(temperature))

The `map()` function processes each line of the dataset, extracting the year from the date and emitting a key-value pair where the key is the year and the value is the temperature.

In [47]:
# Define the reduce function to find the maximum temperature for each year
def reduce_max_temperature(group):
    year, temperatures = group
    max_temperature = max(temperatures, key=lambda x: x[1])[1]
    return (year, max_temperature)

The `reduce()` function takes key-value pairs produced by the map() function, groups them by key (year), and then reduces each group to a single key-value pair where the value is the maximum temperature recorded in that year.

**Explanation:**
- The map() function is applied to each record in the dataset. It extracts the year from the date and emits a key-value pair with the year as the key and the observed temperature as the value.

- The framework groups all key-value pairs by key (year), resulting in an intermediate dataset where each key (year) is associated with a list of temperatures recorded in that year.

- The reduce() function is then applied to each group (year and its corresponding list of temperatures). It calculates the maximum temperature for that year and emits a key-value pair with the year as the key and the maximum temperature as the value.

- The result is a dataset where each record represents a year and the highest temperature recorded in that year.

This process allows for efficient processing of large datasets by distributing the computation across multiple nodes in a cluster, where each node can process a subset of the data in parallel.

**Example:**

In [24]:
# Sample temperature data representing (date; temperature) format
data = [
    "2015-07-13; 24",
    "2015-08-15; 30",
    "2015-12-02; 20",
    "2016-07-12; 25",
    "2016-08-19; 27",
    "2016-02-02; 19",
    "2017-07-01; 28",
    "2017-08-22; 33",
    "2017-12-11; 22"
]

# Apply the map function to each record
mapped_temperatures = map(map_temperature, data)

# Sort and group by year to prepare for reduction
from itertools import groupby
mapped_temperatures = sorted(mapped_temperatures, key=lambda x: x[0])
grouped_temperatures = groupby(mapped_temperatures, key=lambda x: x[0])


# Apply the reduce function
max_temperatures_by_year = [reduce_max_temperature(group) for group in grouped_temperatures]

max_temperatures_by_year

[('2015', 30), ('2016', 27), ('2017', 33)]

#### ii) Explain how the input keys of the map phase will influence data movement across the network.

Understanding the influence of input keys in the map phase on data movement across the network is critical in optimizing the performance of MapReduce jobs. When data is processed in a distributed system using MapReduce, the goal is to minimize the amount of data transmitted over the network because network I/O can significantly slow down the computation, especially when dealing with large datasets.

#### The Influence of Input Keys on Data Movement

During the map phase, the input dataset is split into smaller chunks, which are then processed in parallel across different nodes in the cluster. The output of the map phase is a set of intermediate key-value pairs. How these keys are chosen and partitioned will significantly impact the subsequent shuffle and sort phase, where the intermediate data is grouped (shuffled) by key and transferred across the network to the reducers.

If the map phase produces a large number of unique keys, this can lead to a substantial amount of data movement during the shuffle phase, as the system needs to transfer these key-value pairs across the network to ensure that all values associated with the same key are brought to the same reducer for processing. On the other hand, if the keys are well-distributed and the amount of data per key is balanced, the network traffic can be minimized, leading to more efficient processing.

#### Strategies to Minimize Data Movement

1. **Key Choice**: Choosing the right keys is crucial. Keys should be selected in a way that they evenly distribute the data across reducers, avoiding scenarios where a single reducer is overwhelmed with a large portion of the data.
2. **Combiner Function**: A combiner function can be used locally on each mapper to perform a preliminary reduction of the data, which can significantly reduce the amount of data that needs to be sent over the network.
3. **Partitioning**: Custom partitioners can be implemented to control how map output keys are assigned to reducers, ensuring a more even distribution of the data load.

**Conclusion**

The choice of input keys in the map phase and how they are handled can dramatically affect the efficiency of a MapReduce job by influencing the amount of data transferred over the network. Efficient data processing in distributed systems requires careful consideration of how data is partitioned and aggregated to minimize costly network transfers and ensure balanced workloads across the cluster.


## P2.2 Machine learning pipeline

#### i) Would combining different machine learning models in a Spark pipeline make sense to perform multiclass classification? Justify your answer.

Combining different machine learning models in a Spark pipeline to perform multiclass classification can indeed make sense and offer several advantages, depending on the complexity of the task, the diversity of the data, and the specific goals of the classification problem. This approach is often referred to as ensemble learning, where multiple models are used together to improve the overall performance. Here are some reasons why this approach can be beneficial:

1. **Improved Accuracy**: Different models may have different strengths and weaknesses depending on the characteristics of the data. By combining these models, it's possible to leverage their strengths and mitigate their weaknesses, leading to improved overall accuracy in the classification task.

2. **Robustness to Overfitting**: Individual models might overfit to the training data, especially if the model is too complex or the data is noisy. An ensemble of models, especially if they are diverse (e.g., decision trees, logistic regression, support vector machines), can reduce the risk of overfitting by averaging out their predictions, making the final decision more robust.

3. **Handling Data Variety**: Multiclass classification problems often involve datasets with a wide variety of features and data types. Some models might be better at handling certain types of data or features. Combining models allows for a more flexible approach to dealing with different data characteristics.

4. **Increased Model Complexity**: Some problems might be too complex for a single model to capture all the nuances in the data. A combination of models can express a more complex decision boundary than any individual model, potentially leading to better performance on complex problems.

5. **Spark’s Distributed Computing**: Spark’s ability to distribute computation across a cluster makes it an ideal platform for implementing such combined models. Training multiple models in parallel and aggregating their predictions can be efficiently performed in a distributed manner, making the process scalable and faster compared to single-model approaches on large datasets.

#### Implementation Considerations

When implementing a combination of models in a Spark pipeline, it's important to consider:

- **Model Selection and Diversity**: The choice of models to combine should be based on their ability to complement each other. Too similar models might not provide the benefits expected from an ensemble approach.
- **Combination Strategy**: Strategies like voting, averaging, or more complex methods like stacking can be used to combine the predictions from different models. The choice depends on the problem and the models used.
- **Resource and Time Constraints**: Training multiple models and combining their predictions require more computational resources and time. It's crucial to balance the performance gains with the additional costs.

#### Downsides of Combining Different Machine Learning Models in a Spark Pipeline

While combining different machine learning models in a Spark pipeline for multiclass classification offers numerous benefits, it also comes with its own set of challenges and downsides. Here are some potential drawbacks to consider:

1. **Increased Complexity**: Managing multiple models within a pipeline can significantly increase the complexity of the system. This complexity arises not only from the need to tune multiple models but also from the complexity of combining their predictions in a meaningful way.

2. **Higher Resource Demand**: Training and deploying multiple models require more computational resources. This can lead to increased costs and longer training times, especially for large datasets. While Spark efficiently distributes these tasks, the overhead of coordinating multiple models can still be significant.

3. **Difficulty in Interpretation**: One of the trade-offs for improved performance can be the loss of interpretability. Single models, especially simpler ones like decision trees or linear models, can offer clear insights into how predictions are made. Combining models, especially using complex strategies like stacking, can make it harder to interpret how the final predictions are derived.

4. **Model Tuning and Selection**: Finding the right combination of models and tuning their hyperparameters can be a daunting task. The search space for the optimal setup grows exponentially with the addition of each model, making the tuning process more challenging and time-consuming.

5. **Risk of Diminishing Returns**: There's a point of diminishing returns where adding more models to the ensemble does not significantly improve performance and might even degrade it due to overfitting or increased variance in the combined predictions.

6. **Dependency and Failure Risk**: Relying on multiple models means dealing with the dependencies and potential failures of more components. If the performance of one model in the ensemble significantly degrades, it can negatively impact the overall system's reliability and accuracy.

### Balancing Act

It's essential to carefully consider these downsides when deciding to combine multiple machine learning models in a Spark pipeline. The decision should be based on a thorough analysis of the problem complexity, the available resources, the importance of interpretability, and the specific requirements of the application. In many cases, the benefits of improved accuracy and robustness may outweigh the downsides, especially for complex, high-stakes classification tasks. However, for simpler problems or when resources are limited, a well-tuned single model might be a more efficient and interpretable choice.


#### Conclusion: Weighing the Pros and Cons of Combining Models in Spark Pipelines

Combining models in spark pipelines has many advantages balanced by some downsides. The primary benefits of such an approach include enhanced predictive performance, robustness against overfitting, and the ability to capture complex patterns and relationships in the data that a single model might miss. These benefits can be particularly valuable in scenarios where the highest possible accuracy is crucial, and the complexity of the data exceeds the capacity of individual models.

On the other hand, the increased system complexity, higher resource demands, potential interpretability issues, and the complexities involved in model tuning and selection represent challenges that need careful consideration. The key to leveraging the advantages while mitigating the downsides lies in a strategic and informed approach to model selection and integration within the pipeline. This involves:

- **Thorough Evaluation**: Systematically testing different combinations and configurations to find the optimal balance between performance and complexity.
- **Resource Management**: Leveraging Spark's distributed computing capabilities to manage resource demands efficiently.
- **Interpretability Strategies**: Employing techniques like feature importance metrics and model-agnostic interpretation tools to maintain insight into how predictions are made.
- **Regular Monitoring**: Continuously monitoring model performance to quickly identify and address any issues arising from model interactions or external changes in data patterns.

Ultimately, while the decision to combine models introduces complexity and challenges, the potential for significant performance gains makes it a compelling strategy for complex classification tasks. By carefully planning and managing the pipeline, it is possible to harness the strengths of multiple models to achieve superior results, making the most of the scalable and powerful environment that Spark provides for big data and machine learning applications.



#### ii) How would you structure such a pipeline in terms of Transformers and Estimators? Provide a code snippet for your solution and briefly explain it.

When dealing with multiclass classification in Spark, it often makes sense to explore different machine learning models to find the best approach for a given dataset. However, rather than relying on a single model, combining multiple models can lead to better performance by leveraging their individual strengths and mitigating their weaknesses. This is where a machine learning pipeline that incorporates multiple models becomes valuable.

#### Goal and Problem
The goal is to create a robust multiclass classification system that can accurately classify instances into one of several categories. The problem arises from the fact that different models have different biases and variances, and what works well for one part of the data might not work as well for another. By combining models, we can potentially create a system that performs better on average across the entire dataset.

#### How to Structure the Pipeline
In terms of structuring such a pipeline, it involves a series of steps where data is preprocessed (e.g., normalized, tokenized), fed into different models, and then the results of these models are combined to produce a final classification. This process typically involves Transformers and Estimators in Spark:

1. **Transformers** for preprocessing data. These can include feature vectorizers, normalizers, and any other data transformation steps necessary before model training.
2. **Estimators** for model training. These are the machine learning algorithms you want to train, such as logistic regression, random forest, or gradient-boosted trees.
3. A custom **Transformer** or strategy to combine the outputs of your models. This could be as simple as taking the mode of predictions from all models (voting) or more complex strategies like stacking or blending.

Since Spark ML's Pipeline API is designed to handle linear workflows (one set of transformations followed by a single estimator), incorporating multiple models directly into a single pipeline for training requires a custom approach. After training, the models' predictions can be combined using a custom Transformer or by manually implementing a combination strategy outside of the Pipeline.

#### Example Code and Explanation
Below is a simplified example that outlines the main components of such a pipeline:

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Initialize a Spark session
spark = SparkSession.builder.appName("MulticlassClassificationPipeline").getOrCreate()

# Example DataFrame with features and labels
data = spark.createDataFrame([
    (0, "text data example 1", 1.0),
    (1, "text data example 2", 0.0),
    (2, "text data example 3", 1.0)
], ["id", "text", "label"])

# Data preprocessing (Transformers)
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
vector_assembler = VectorAssembler(inputCols=data.columns[:-1], outputCol="features")
data = tokenizer.transform(data)
data = hashingTF.transform(data)
data = vector_assembler.transform(data)

# Split data into train and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)


# Models (Estimators)
lr = LogisticRegression(maxIter=10, featuresCol="features", labelCol="label")
rf = RandomForestClassifier(numTrees=10, featuresCol="features", labelCol="label")
gbt = GBTClassifier(maxIter=10, featuresCol="features", labelCol="label")

# Fit the model
lr_model = lr.fit(train_data)
rf_model = lr.fit(train_data)

# Make predictions
predictions_lr = lr_model.transform(test_data)
predictions_rf = rf_model.transform(test_data)

# Simulated pipeline steps (for illustration)

# Combining model outputs via voting (a conceptual Transformer)
class VotingTransformer(Transformer):
    def __init__(self):
        super(VotingTransformer, self).__init__()
    
    def _transform(self, dataset):
        # Assuming 'predictions_lr' and 'predictions_rf' are columns containing model predictions
        # Create a column 'final_prediction' based on simple majority voting
        dataset = dataset.withColumn('final_prediction', 
                                     when(col('predictions_lr') == col('predictions_rf'), col('predictions_lr'))
                                     .otherwise(when(col('predictions_lr') != col('predictions_rf'), 
                                                     F.lit('Tie'))  # Example handling for ties
                                                )
                                    )
        return dataset

# Example usage within a conceptual pipeline
pipeline_stages = [tokenizer, hashingTF, lr, rf, VotingTransformer()]
pipeline = Pipeline(stages=pipeline_stages)

# This example demonstrates where Transformers and Estimators are in the pipeline,
# with the final step being a custom Transformer to combine predictions.

In this conceptual example, we illustrate how a machine learning pipeline for multiclass classification could be structured within Apache Spark's ML library. The pipeline integrates multiple models and employs a strategy for combining their predictions to enhance classification performance.

#### Explanation:

1. **Data Preparation**: We start by preparing the text data using Transformers like `Tokenizer` and `HashingTF` to convert text into feature vectors. Additionally, a `VectorAssembler` is used to assemble these features into a single vector.

2. **Model Training**: We define three different classifiers: Logistic Regression, Random Forest, and Gradient-Boosted Trees. Each of these models serves as an Estimator in the pipeline, meaning they are responsible for learning from the training data.

3. **Combining Model Outputs**: The most distinctive part of this pipeline is the custom `VotingTransformer`. This conceptual Transformer takes the predictions from the logistic regression and random forest models and combines them through a simple majority voting mechanism. For simplicity, ties are marked as 'Tie', but in practice, more sophisticated tie-breaking strategies or outputting probabilities could be employed.

#### Conclusion:

By leveraging Spark's Pipeline API, we can represent the entire process of data preprocessing, model training, and prediction aggregation in a coherent and scalable manner. This conceptual example demonstrates the flexibility of Spark's machine learning library in accommodating complex workflows such as integrating multiple models. While each model individually contributes to the classification task, their combination through a voting mechanism potentially improves the robustness and accuracy of the final predictions.