Create Sparkcontext

SparkContext:

It is the entry point to Spark's functionality. It is responsible for connecting to a Spark cluster, initializing the cluster resources, and coordinating the execution of Spark jobs.

Role: Manages the Spark application’s environment and communicates with the cluster manager. It provides the fundamental methods for creating RDDs (Resilient Distributed Datasets) and executing jobs.

In [1]:
from pyspark import SparkContext

# Create a SparkContext object
sc = SparkContext(appName="MySparkApplication")

In [2]:
sc

In [4]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("MySparkApplication") \
    .getOrCreate()

In [5]:
# Get the SparkContext from the SparkSession
sc = spark.sparkContext

In [6]:
sc

How to create RDDs

In [7]:
from pyspark.sql import SparkSession

In [8]:
# Create a SparkSession
spark = SparkSession.builder.appName("RDD-Demo").getOrCreate()

In [10]:
numbers = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(numbers)

In [11]:
# Collect action: Retrieve all elements of the RDD

rdd.collect()

[1, 2, 3, 4, 5]

# Create an RDD from a list of tuples

In [12]:
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("Alice", 40)]
rdd = spark.sparkContext.parallelize(data)

In [13]:
# Collect action: Retrieve all elements of the RDD
print("All elements of the rdd: ", rdd.collect())

All elements of the rdd:  [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('Alice', 40)]


RDDs Operation: Actions

In [14]:
# Count action: Count the number of elements in the RDD
count = rdd.count()
print("The total number of elements in rdd: ", count)

The total number of elements in rdd:  4


In [15]:
# First action: Retrieve the first element of the RDD
first_element = rdd.first()
print("The first element of the rdd: ", first_element)

The first element of the rdd:  ('Alice', 25)


In [16]:
# Take action: Retrieve the n elements of the RDD
taken_elements = rdd.take(2)
print("The first two elements of the rdd: ", taken_elements)

The first two elements of the rdd:  [('Alice', 25), ('Bob', 30)]


In [22]:
# Foreach action: Print each element of the RDD
rdd.foreach(lambda x: print(x))


In [23]:
result = mapped_rdd.collect()
print("rdd with uppercease name: ", result)

rdd with uppercease name:  [('ALICE', 25), ('BOB', 30), ('CHARLIE', 35), ('ALICE', 40)]


RDDs Operation: Transformations

In [18]:
# Map transformation: Convert name to uppercase
mapped_rdd = rdd.map(lambda x: (x[0].upper(), x[1]))


In [19]:
mapped_rdd

PythonRDD[8] at RDD at PythonRDD.scala:53

PYSPARK DATAFRAMES

SparkSession:

Introduced in Spark 2.0, SparkSession is a unified entry point for reading data, working with DataFrames, and executing SQL queries.

Role: It encapsulates SparkContext and provides a high-level API for data processing. It includes functionality for working with structured data and allows you to use DataFrame and SQL APIs.

In [2]:
import pyspark

In [3]:
print('hello')

hello


In [4]:
import pyspark

# Print PySpark version
print(f"PySpark Version: {pyspark.__version__}")


PySpark Version: 3.5.1


In [5]:
import subprocess

# Get the Java version
def get_java_version():
    try:
        version_info = subprocess.check_output(['java', '-version'], stderr=subprocess.STDOUT)
        version_info = version_info.decode('utf-8').split('\n')[0]  # Extract the version line
        print(f"Java Version: {version_info}")
    except FileNotFoundError:
        print("Java is not installed or not found in the PATH.")

get_java_version()


Java Version: java version "1.8.0_391"


In [6]:
from pyspark.sql import SparkSession

In [7]:
#creating spark session
spark = SparkSession.builder.appName('Dataframe').getOrCreate()


https://github.com/krishnaik06/Pyspark-With-Python/blob/main/Tutorial%204-%20Pyspark%20Dataframes-%20Filter%20operation.ipynb

In [8]:
spark
#sparksession created.checking.

In [9]:
##create sata frame
#df_pyspark = spark.createDataFrame(Data)
#Data =[....]

## read the dataset
df_pyspark=spark.read.option('header','true').csv('C:/Users/rammo/OneDrive/Desktop/Book1.csv',inferSchema=True)

In [10]:
### Check the schema
df_pyspark.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [11]:
df_pyspark.show()

+---+---------+----+----------+------+
|_c0|     Name| age|Experience|Salary|
+---+---------+----+----------+------+
|  1|     John|  32|         5| 20000|
|  2|    Krish|  31|        10| 30000|
|  3|Sudhanshu|  30|         8| 25000|
|  4|    Sunny|  29|         4| 20000|
|  5|     Paul|  24|         3| 20000|
|  6|   Harsha|  21|         1| 15000|
|  7|  Shubham|  23|         2| 18000|
|  8|   Mahesh|NULL|      NULL| 40000|
|  9|     NULL|  34|        10| 38000|
| 10|     NULL|  36|      NULL|  NULL|
+---+---------+----+----------+------+



In [12]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

In [13]:
df_pyspark.head(1)

[Row(_c0=1, Name='John', age=32, Experience=5, Salary=20000)]

In [14]:
df_pyspark.select(['NAME','EXPERIENCE']).show()

+---------+----------+
|     NAME|EXPERIENCE|
+---------+----------+
|     John|         5|
|    Krish|        10|
|Sudhanshu|         8|
|    Sunny|         4|
|     Paul|         3|
|   Harsha|         1|
|  Shubham|         2|
|   Mahesh|      NULL|
|     NULL|        10|
|     NULL|      NULL|
+---------+----------+



In [15]:
df_pyspark['NAME'] #to check column avalability.

Column<'NAME'>

In [16]:
df_pyspark.dtypes

[('_c0', 'int'),
 ('Name', 'string'),
 ('age', 'int'),
 ('Experience', 'int'),
 ('Salary', 'int')]

In [17]:
df_pyspark.describe().show()

+-------+------------------+------+-----------------+-----------------+-----------------+
|summary|               _c0|  Name|              age|       Experience|           Salary|
+-------+------------------+------+-----------------+-----------------+-----------------+
|  count|                10|     8|                9|                8|                9|
|   mean|               5.5|  NULL|28.88888888888889|            5.375|25111.11111111111|
| stddev|3.0276503540974917|  NULL|5.158595846847387|3.543101950067402|8964.435905906801|
|    min|                 1|Harsha|               21|                1|            15000|
|    max|                10| Sunny|               36|               10|            40000|
+-------+------------------+------+-----------------+-----------------+-----------------+



In [18]:
### Adding Columns in data frame
df_pyspark=df_pyspark.withColumn('Experience After 2 year',df_pyspark['EXPERIENCE']+2)
df_pyspark.show()

+---+---------+----+----------+------+-----------------------+
|_c0|     Name| age|Experience|Salary|Experience After 2 year|
+---+---------+----+----------+------+-----------------------+
|  1|     John|  32|         5| 20000|                      7|
|  2|    Krish|  31|        10| 30000|                     12|
|  3|Sudhanshu|  30|         8| 25000|                     10|
|  4|    Sunny|  29|         4| 20000|                      6|
|  5|     Paul|  24|         3| 20000|                      5|
|  6|   Harsha|  21|         1| 15000|                      3|
|  7|  Shubham|  23|         2| 18000|                      4|
|  8|   Mahesh|NULL|      NULL| 40000|                   NULL|
|  9|     NULL|  34|        10| 38000|                     12|
| 10|     NULL|  36|      NULL|  NULL|                   NULL|
+---+---------+----+----------+------+-----------------------+



Pyspark Handling Missing Values
*Dropping Columns.
*Dropping Rows.
*Various Parameter In Dropping functionalities.
*Handling Missing values by Mean, MEdian And Mode.

In [19]:
### Drop the columns
df_pyspark=df_pyspark.drop('Experience After 2 year')
df_pyspark.show()

+---+---------+----+----------+------+
|_c0|     Name| age|Experience|Salary|
+---+---------+----+----------+------+
|  1|     John|  32|         5| 20000|
|  2|    Krish|  31|        10| 30000|
|  3|Sudhanshu|  30|         8| 25000|
|  4|    Sunny|  29|         4| 20000|
|  5|     Paul|  24|         3| 20000|
|  6|   Harsha|  21|         1| 15000|
|  7|  Shubham|  23|         2| 18000|
|  8|   Mahesh|NULL|      NULL| 40000|
|  9|     NULL|  34|        10| 38000|
| 10|     NULL|  36|      NULL|  NULL|
+---+---------+----+----------+------+



In [20]:
### Rename the columns
df_pyspark.withColumnRenamed('Name','New Name').show()


+---+---------+----+----------+------+
|_c0| New Name| age|Experience|Salary|
+---+---------+----+----------+------+
|  1|     John|  32|         5| 20000|
|  2|    Krish|  31|        10| 30000|
|  3|Sudhanshu|  30|         8| 25000|
|  4|    Sunny|  29|         4| 20000|
|  5|     Paul|  24|         3| 20000|
|  6|   Harsha|  21|         1| 15000|
|  7|  Shubham|  23|         2| 18000|
|  8|   Mahesh|NULL|      NULL| 40000|
|  9|     NULL|  34|        10| 38000|
| 10|     NULL|  36|      NULL|  NULL|
+---+---------+----+----------+------+



In [21]:
df_pyspark.na.drop().show()
#droping null rows.

+---+---------+---+----------+------+
|_c0|     Name|age|Experience|Salary|
+---+---------+---+----------+------+
|  1|     John| 32|         5| 20000|
|  2|    Krish| 31|        10| 30000|
|  3|Sudhanshu| 30|         8| 25000|
|  4|    Sunny| 29|         4| 20000|
|  5|     Paul| 24|         3| 20000|
|  6|   Harsha| 21|         1| 15000|
|  7|  Shubham| 23|         2| 18000|
+---+---------+---+----------+------+



In [22]:
### any==how
df_pyspark.na.drop(how="any").show()

+---+---------+---+----------+------+
|_c0|     Name|age|Experience|Salary|
+---+---------+---+----------+------+
|  1|     John| 32|         5| 20000|
|  2|    Krish| 31|        10| 30000|
|  3|Sudhanshu| 30|         8| 25000|
|  4|    Sunny| 29|         4| 20000|
|  5|     Paul| 24|         3| 20000|
|  6|   Harsha| 21|         1| 15000|
|  7|  Shubham| 23|         2| 18000|
+---+---------+---+----------+------+



In [23]:
##threshold
df_pyspark.na.drop(how="any",thresh=2).show() #Drops rows with fewer than 2 non-null values
"""thresh=1:- This parameter specifies a threshold for dropping rows. It indicates 
that a row will be dropped if it has fewer than thresh non-null values."""

+---+---------+----+----------+------+
|_c0|     Name| age|Experience|Salary|
+---+---------+----+----------+------+
|  1|     John|  32|         5| 20000|
|  2|    Krish|  31|        10| 30000|
|  3|Sudhanshu|  30|         8| 25000|
|  4|    Sunny|  29|         4| 20000|
|  5|     Paul|  24|         3| 20000|
|  6|   Harsha|  21|         1| 15000|
|  7|  Shubham|  23|         2| 18000|
|  8|   Mahesh|NULL|      NULL| 40000|
|  9|     NULL|  34|        10| 38000|
| 10|     NULL|  36|      NULL|  NULL|
+---+---------+----+----------+------+



'thresh=1:- This parameter specifies a threshold for dropping rows. It indicates \nthat a row will be dropped if it has fewer than thresh non-null values.'

In [29]:
##Subset
df_pyspark.na.drop(how="any",subset=['EXPERIENCE']).show() #drop null values in subset.


+---+---------+---+----------+------+
|_c0|     Name|age|Experience|Salary|
+---+---------+---+----------+------+
|  1|     John| 32|         5| 20000|
|  2|    Krish| 31|        10| 30000|
|  3|Sudhanshu| 30|         8| 25000|
|  4|    Sunny| 29|         4| 20000|
|  5|     Paul| 24|         3| 20000|
|  6|   Harsha| 21|         1| 15000|
|  7|  Shubham| 23|         2| 18000|
|  9|     NULL| 34|        10| 38000|
+---+---------+---+----------+------+



In [30]:
### Filling the Missing Value
df_pyspark.na.fill('Missing Values',['age','Experience'])


DataFrame[_c0: int, Name: string, age: int, Experience: int, Salary: int]

In [31]:
df_pyspark.show()
# Replace null values in 'age' and 'Experience' columns with 'Missing Values'
df_pyspark = df_pyspark.na.fill('Missing Values', subset=['age', 'Experience'])

# Show updated DataFrame
#df_pyspark.show()


+---+---------+----+----------+------+
|_c0|     Name| age|Experience|Salary|
+---+---------+----+----------+------+
|  1|     John|  32|         5| 20000|
|  2|    Krish|  31|        10| 30000|
|  3|Sudhanshu|  30|         8| 25000|
|  4|    Sunny|  29|         4| 20000|
|  5|     Paul|  24|         3| 20000|
|  6|   Harsha|  21|         1| 15000|
|  7|  Shubham|  23|         2| 18000|
|  8|   Mahesh|NULL|      NULL| 40000|
|  9|     NULL|  34|        10| 38000|
| 10|     NULL|  36|      NULL|  NULL|
+---+---------+----+----------+------+



IMPUTATION 
to fill missing values.
Imputation Requires Numeric Data:
The Imputer class in PySpark is designed for numeric data. It computes summary statistics like the median or mean to fill in missing values, which requires numeric operations.



In [32]:
df_pyspark.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [35]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import col


# Define the Imputer
imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'], 
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
).setStrategy("median")

# Fit the imputer model and transform the DataFrame
imputer_model = imputer.fit(df_pyspark)
df_imputed = imputer_model.transform(df_pyspark)

# Show the DataFrame with the imputed columns
df_imputed.show(truncate=False)

# Display the imputation strategy and medians
print("Imputation Strategy: ", imputer_model.getStrategy())
#print("Medians for Imputation: ", imputer_model.medians)


+---+---------+----+----------+------+-----------+------------------+--------------+
|_c0|Name     |age |Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---+---------+----+----------+------+-----------+------------------+--------------+
|1  |John     |32  |5         |20000 |32         |5                 |20000         |
|2  |Krish    |31  |10        |30000 |31         |10                |30000         |
|3  |Sudhanshu|30  |8         |25000 |30         |8                 |25000         |
|4  |Sunny    |29  |4         |20000 |29         |4                 |20000         |
|5  |Paul     |24  |3         |20000 |24         |3                 |20000         |
|6  |Harsha   |21  |1         |15000 |21         |1                 |15000         |
|7  |Shubham  |23  |2         |18000 |23         |2                 |18000         |
|8  |Mahesh   |NULL|NULL      |40000 |30         |4                 |40000         |
|9  |NULL     |34  |10        |38000 |34         |10             

PYSPARK dataframes filteration

In [36]:
### Salary of the people less than or equal to 20000
from pyspark.sql.functions import col

# Filter rows using Column expression
filtered_df = df_pyspark.filter(col("SALARY") >= 100000)
filtered_df.show()

+---+----+---+----------+------+
|_c0|Name|age|Experience|Salary|
+---+----+---+----------+------+
+---+----+---+----------+------+



In [37]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("DataFrame Filtering") \
    .getOrCreate()


In [38]:
from pyspark.sql import Row

# Sample data
data = [
    Row(id=1, name="Alice", age=30),
    Row(id=2, name="Bob", age=25),
    Row(id=3, name="Cathy", age=29),
    Row(id=4, name="David", age=35)
]

# Create DataFrame
df = spark.createDataFrame(data)
df.show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 30|
|  2|  Bob| 25|
|  3|Cathy| 29|
|  4|David| 35|
+---+-----+---+



In [39]:
# Filter rows where age > 30
filtered_df = df.filter(df['age'] > 29)

# Show the result
filtered_df.show()


+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 30|
|  4|David| 35|
+---+-----+---+



In [40]:
# Filter rows where age < 30
filtered_df = df.where(df.age < 30)

# Show the result
filtered_df.show()


+---+-----+---+
| id| name|age|
+---+-----+---+
|  2|  Bob| 25|
|  3|Cathy| 29|
+---+-----+---+



In [41]:
# Filter using SQL expression
filtered_df = df.filter("age >= 30")

# Show the result
filtered_df.show()


+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 30|
|  4|David| 35|
+---+-----+---+



pyspark functions

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html