I. Theoretical session:
1. Could you list out some limitation of MapReduce?
2. Provide a high level comparison of Apache Hadoop and Apache Spark.
3. What are the advantages of Apache Spark?
4. Provide a comparison of RDD and DataFrame in Spark.  

1. Limitations of MapReduce:MapReduce programs is time-consuming and complex. It is designed for batch processing and is not suitable for real-time data processing. Moreover, Cumbersome and less user-friendly. Cannot handle quick, interactive queries. Inefficient for iterative tasks due to disk I/O. Significant overhead due to data shuffling and sorting.
2.
**Apache Hadoop**:
- Open-source software for managing big data sets.
- Highly scalable and cost-effective.
- Supports advanced analytics.
- Consists of HDFS, YARN, MapReduce, and Hadoop Common.

**Apache Spark**:
- Data processing engine for big data.
- Faster than Hadoop, uses RAM for data processing.
- Ideal for machine learning and large-scale data.
- More advanced, uses AI/ML in data processing.
- Can be 100x faster than Hadoop for smaller workloads.

**Similarities**:
- Both are distributed systems for processing data at scale.
- Can recover from failure.
- Composed of several software modules.

3. Advantages of Apache Spark
* Speed:
Spark processes data in memory, which significantly boosts performance, especially for iterative algorithms.
* Ease of Use:
Provides high-level APIs in Java, Scala, Python, and R, along with an interactive shell.
* Unified Engine:
Supports a wide range of workloads, including batch processing, streaming, interactive queries, and machine learning.
* Advanced Analytics:
Includes advanced libraries such as MLlib for machine learning, GraphX for graph processing, and Spark SQL for structured data processing.
* Real-time Stream Processing:
Spark Streaming allows for real-time data processing with low latency.
* Fault Tolerance:
Uses Resilient Distributed Datasets (RDDs) to recover lost data and recompute data efficiently.
4.
**RDD**:
- Fundamental data structure in Spark.
- Offers low-level transformations and actions.
- No performance optimization.
- Useful for unstructured data.
- Good for complex tasks requiring fine-tuned transformations.

**DataFrame**:
- Distributed collection of data organized into named columns.
- Higher level of abstraction than RDDs.
- More expressive and efficient due to the Catalyst Optimizer.
- Allows imposing a structure onto a distributed collection of data.
- Supports a wide range of operations and transformations.

II. You are given a file `appl_stock.csv`, please carry out the following tasks:

1. Read this file by PySpark. Print out the schema.
2. Create new columns of combining the High, Low, Close and Adj Close as follow `[High, Low, Close, Adj Close]`.
3. Create a new column which computes the average price of High and Low prices.
4. Create a new column which computes the amount of money based on the formula `Volume * Adj Close`.
3. Using `groupby` and `year()` function to compute the average closing price per year.


In [4]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=3b30fa375068023aa184639fc0746aa89db81c82556d0df66c985ab2fee27ccb
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [6]:
from google.colab import drive
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, avg, expr

# Initialize a Spark session
spark = SparkSession.builder.appName("StockAnalysis").getOrCreate()

drive.mount('/content/drive')
file_path = '/content/drive/My Drive/Colab Notebooks/Scalable and Distributed Computing/Lab/lab7/appl_stock.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Print the schema
df.printSchema()

# Combine High, Low, Close, and Adj Close into a new column
df = df.withColumn("Combined_High_Low_Close_AdjClose", expr("array(High, Low, Close, `Adj Close`)"))

# Create a new column for the average price of High and Low prices
df = df.withColumn("Average_High_Low", (col("High") + col("Low")) / 2)

# Create a new column for the amount of money using Volume * Adj Close
df = df.withColumn("Amount_of_Money", col("Volume") * col("Adj Close"))

# Compute the average closing price per year
df_with_year = df.withColumn("Year", year(col("Date")))
avg_close_per_year = df_with_year.groupBy("Year").agg(avg("Close").alias("Average_Closing_Price"))

# Show the results
avg_close_per_year.show()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

+----+---------------------+
|Year|Average_Closing_Price|
+----+---------------------+
|2015|   120.03999980555547|
|2013|    472.6348802857143|
|2014|    295.4023416507935|
|2012|    576.0497195640002|
|2016|   104.60400786904763|
|2010|    259.8424600000002|
|2011|   364.00432532142867|
+----+---------------------+



III. You are given a data `customer_churn.csv`, which describes the churn status in clients of a marletting agency. As a data scientist, you are required to create a machine learning model **in Spark** that will help predict which customers will churn (stop buying their service). A short description of the data is as follow:
```
Name : Name of the latest contact at Company
Age: Customer Age
Total_Purchase: Total Ads Purchased
Account_Manager: Binary 0=No manager, 1= Account manager assigned
Years: Totaly Years as a customer
Num_sites: Number of websites that use the service.
Onboard_date: Date that the name of the latest contact was onboarded
Location: Client HQ Address
Company: Name of Client Company
```

1. Read, print the schema and check out the data to set the first sight of the data.
2. Format the data according to `VectorAssembler`, which is supported in MLlib of PySpark.
3. Split the data into train/test data, and then fit train data to the logistic regression model.
4. Evaluate the results and compute the AUC.

In [14]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

# Initialize a Spark session
spark = SparkSession.builder.appName("CustomerChurnPrediction").getOrCreate()

# Load the CSV data into a Spark DataFrame
file_path = '/content/drive/My Drive/Colab Notebooks/Scalable and Distributed Computing/Lab/lab7/customer_churn.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Print the schema of the DataFrame
df.printSchema()

# Show the first few rows of the DataFrame
df.show(10)

# Select the relevant features and label for the model
feature_cols = ["Age", "Total_Purchase", "Account_Manager", "Years", "Num_Sites"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Initialize the Logistic Regression model
lr = LogisticRegression(labelCol="Churn", featuresCol="features")

# Create a pipeline with the assembler and logistic regression
pipeline = Pipeline(stages=[assembler, lr])

# Prepare the data for training
data = df.select(feature_cols + ["Churn"])

# Split the data into train and test sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

# Fit the model on the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model using AUC
evaluator = BinaryClassificationEvaluator(labelCol="Churn", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)

print(f"AUC: {auc}")

# Stop the Spark session
spark.stop()


root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller|41.0|      1