**Step 1:** Install Java, Spark, and PySpark
PySpark requires Java and Apache Spark.

- **1.1 Install Java**
- PySpark requires Java 8 or 11.
- Check if Java is installed:
- Open Command Prompt (cmd) and type:
- java -version
- If Java is not installed, download and install it from Oracle JDK.
- After installation, set the JAVA_HOME environment variable:
- Open Control Panel → System → Advanced System Settings → Environment Variables.
- Under System Variables, click New and set:
- Variable Name: JAVA_HOME
- Variable Value: C:\Program Files\Java\jdk-11 (Replace with your installed Java path)
- Click OK and restart your PC.

- **1.2 Install Apache Spark**
- Download Spark from Apache Spark Download.
- Select Spark 3.x with Hadoop 3.x.
- Choose Pre-built for Apache Hadoop.
- Extract the downloaded file to C:\spark.
- Set Environment Variables for Spark:
- Open Control Panel → System → Advanced System Settings → Environment Variables.
- Under System Variables, click New and set:
- Variable Name: SPARK_HOME
- Variable Value: C:\spark (path to your Spark folder)
- Find Path in System Variables, click Edit, and add:
- C:\spark\bin
- C:\spark\sbin
- Click OK and restart your PC.
- Verify Spark Installation: Open Command Prompt (cmd) and run:
- spark-shell
- If Spark starts without errors, it's installed correctly.

- **1.3 Install PySpark**
- Now, install PySpark using pip:
- pip install pyspark
- Verify the installation:
- pyspark
- If PySpark starts, your setup is working.

**Step 2:** Install Jupyter Notebook and Configure PySpark
- **2.1 Install Jupyter and Findspark**
- pip install jupyter findspark
- 2.2 Configure PySpark in Jupyter
- Create a new Jupyter kernel for PySpark:
- python -m ipykernel install --user --name=pyspark --display-name "Python (PySpark)"
- Start Jupyter Notebook:
- jupyter notebook
- Open a new notebook and run the following code to initialize PySpark:
- import findspark
- findspark.init()
- from pyspark.sql import SparkSession
- spark = SparkSession.builder.appName("MyApp").getOrCreate()
- print(spark)
- Run the cell. If Spark initializes successfully, your setup is complete!


In [8]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MyApp").getOrCreate()

print(spark)


<pyspark.sql.session.SparkSession object at 0x000001917BFADB50>


# Pyspark foundation

Apache Spark is a powerful open-source distributed computing system, and PySpark is the Python API for Apache Spark, allowing data scientists and engineers to work with large datasets efficiently.

Let’s go step by step through each topic with explanations, why it is important, use cases, and real-time examples.

### PySpark Introduction

PySpark is the Python interface for Apache Spark, which enables big data processing using Python. It allows us to work with Spark’s features, such as distributed computing, real-time stream processing, and machine learning, without writing Java or Scala.

### Why is PySpark needed?

- Traditional data processing tools (like Pandas) cannot handle very large datasets efficiently.

- PySpark allows processing terabytes or petabytes of data across multiple nodes.

- It integrates well with Hadoop, AWS, and cloud storage solutions.

### Use Case in Data Science

- Used in ETL (Extract, Transform, Load) processes for cleaning and processing large datasets.

- Helpful for handling structured and unstructured data in industries like finance, healthcare, and e-commerce.

- Used in Machine Learning Pipelines to train models on large datasets efficiently.

**Real-time Example**

Netflix’s recommendation system processes petabytes of user data using PySpark to analyze user preferences and suggest movies/shows.

### What is Spark Configuration?

Spark Configuration refers to how we set up and optimize the Spark environment for distributed processing.

- Configuring Spark properly ensures efficient resource utilization.

- Helps in tuning memory usage and execution speed.

- Avoids performance bottlenecks when handling large datasets.

In [33]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyPySparkApp") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

print(spark)


<pyspark.sql.session.SparkSession object at 0x000001917BFADB50>


- appName → Name of the Spark application.

- executor.memory → Allocates memory for each executor (worker node).

- driver.memory → Allocates memory for the driver (master node).

### Use Case in Data Science

- Used when handling large machine learning datasets to optimize memory.

- Helps in scaling applications for big data processing efficiently.

A retail company analyzing customer purchase history may need to configure Spark to process millions of transactions per second.

# Resilient Distributed Datasets (RDD) in PySpark

RDDs (Resilient Distributed Datasets) are the fundamental data structure in Apache Spark. They provide a fault-tolerant, parallel processing, and distributed approach to handle large-scale datasets.

In data science, RDDs are useful when working with large datasets that do not fit into memory and need to be processed efficiently.

### Understanding RDDs in Data Science

**Resilient** → RDDs recover automatically from failures.

**Distributed** → Data is split across multiple nodes for parallel processing.

**Dataset** → Collection of records.

### Why use RDDs in Data Science?

- **Handles Large Data** → Can process TBs of data efficiently.

- **Lazy Evaluation** → Executes transformations only when needed.

- **In-Memory Computation** → Faster than traditional MapReduce.

- **Supports Parallel Processing** → Works on multiple CPU cores.

- **Fault Tolerance** → Recovers lost data automatically.

### Creating RDDs in PySpark

To work with RDDs, we need PySpark. Let’s start by setting up Spark in a Jupyter Notebook.

### setup pyspark in jupyter notebook 

In [63]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RDDExample").getOrCreate()

sc = spark.sparkContext  # Get Spark Context

In [65]:
sc

In [67]:
#findspark.init() → Helps Jupyter Notebook find the PySpark installation.

#SparkSession.builder.appName("RDDExample").getOrCreate() → Creates a Spark session.

#sc = spark.sparkContext → Creates a Spark Context (needed to work with RDDs).

### Create an RDD (Resilient Distributed Dataset)

RDDs can be created in two ways:

- From a Python list

- From an external data source (file, database, etc.)

**Method 1** Create RDD from a List

In [None]:
#sample data 
data =[10,20,30,40,50]

#convert list to rdd
rdd=sc.parallelize(data)

print(rdd.collect())

In [None]:
#sc.parallelize(data) → Converts the Python list into an RDD.

#collect() → Retrieves all elements from the RDD.

 **Method 2:** Create RDD from a Text File

Let's say we have a file "data.txt" with the following content:

In [99]:
rdd_file=sc.textFile("data.txt")
print(rdd_file.collect())

['apple', 'banana', 'orange', 'grape', 'mango']


In [101]:
#sc.textFile("data.txt") → Reads a text file and creates an RDD.

#collect() → Retrieves all lines from the file as a list.

### RDD Transformations (Processing Data)

RDD transformations create new RDDs from existing ones.
They are lazy, meaning they are not executed until an action is triggered.

 **Example 1:** Map (Apply Function to Each Element)

In [119]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()


In [129]:
import sys
print(sys.version)


3.12.4 | packaged by Anaconda, Inc. | (main, Jun 18 2024, 15:03:56) [MSC v.1929 64 bit (AMD64)]


In [131]:
print(spark.sparkContext.uiWebUrl)


http://OA-STUDENT:4040


In [133]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.persist()
print(rdd.collect())  # Trigger execution


[1, 2, 3, 4, 5]


In [135]:
from pyspark import StorageLevel

print(rdd.getStorageLevel())  # Output: Serialized, Memory Deserialized 1x Replicated


Memory Serialized 1x Replicated


In [137]:
from pyspark import StorageLevel

print(rdd.getStorageLevel())  # Output: Serialized, Memory Deserialized 1x Replicated


Memory Serialized 1x Replicated


In [145]:
print(sc)


<SparkContext master=local[*] appName=MyApp>


In [149]:
# Stop the existing SparkContext if it exists
from pyspark import SparkContext

if 'sc' in locals():
    sc.stop()

# Now create a new SparkContext
sc = SparkContext(appName="MyApp", master="local[*]")


In [8]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [None]:
from pyspark import SparkContext

# Stop any existing SparkContext if it's running
if 'sc' in locals():
    sc.stop()

# Create a new SparkContext
sc = SparkContext.getOrCreate()

# Now proceed with your RDD operations
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd_squared = rdd.map(lambda x: x ** 2)
print(rdd_squared.collect())
