<a href="https://colab.research.google.com/github/Tiwari666/pyspark/blob/main/pyspark_basic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# **pyspark Introduction:**

PySpark is a Python API for Apache Spark, a distributed computing system. The name "PySpark" is a combination of "Python" and "Spark." It allows us to write Spark applications using Python, leveraging the capabilities of Spark's distributed computing framework.


PySpark can interact with various data sources, including SQL databases, through JDBC (Java Database Connectivity) drivers. This allows us to read data from and write data to SQL databases using PySpark.

In [99]:
!pip install pyspark



In [100]:
import pyspark

# two different ways of defiing SparkSession in pyspark:
In PySpark, we can define a SparkSession using two different approaches:

A) Using SparkSession.builder:USED IN Spark 2.0 and later versions--must use this one.

B) Using SparkContext: used in Spark versions prior to 2.0.--Deprecated.


# Conclusion:

Using SparkSession.builder is recommended as it offers more flexibility and features compared to directly using SparkContext for creating a session. Also, using SparkSession.builder is the standard way of creating a session in Spark 2.0 and later versions.

In [104]:
from pyspark.sql import SparkSession

In [137]:
# How we define session matters:
# creating a SparkSession in PySpark. SparkSession is the entry point to Spark SQL and DataFrame API.
sparkSession=SparkSession.builder.appName('Exercise').getOrCreate()

In [48]:
# Alternative
# SparkSession = SparkSession.builder \
#     .appName("PySparkOperations") \
#     .getOrCreate()

In [106]:
sparkSession

In [107]:
df_pyspark=sparkSession.read.csv('test1.csv')
df_pyspark

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string]

# Different ways of reading columns in pyspark:

# But we must write "('header','true')" while reading the data in the pyspark.

# Columns : see important difference while writing "('header','true')"

# we can see the differences in the immediate following codes.

# Note: Only one method/ standard methos is only used in the session.

In [108]:
#  reading a CSV file into a DataFrame using PySpark.

df_pyspark=sparkSession.read.option('header','true').csv('test1.csv')  # retains original column names

In [None]:
# # df_pyspark=SparkSession.read.csv('test1.csv') # gives _c0, _c1 as column names

#   # Selecting columns by index
# df_selected_col = df_pyspark.select('_c0', '_c1')

#   # Show the selected columns
# df_selected_col.show()

In [109]:
df_pyspark

DataFrame[Name: string, age: string, Experience: string, Salary: string]

In [110]:
type(df_pyspark)

In [111]:
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 31|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|       Jo| 23|         5| 25000|
|      Sam| 21|         4| 28000|
|       Li| 29|         1| 15000|
+---------+---+----------+------+



In [112]:
# displaying the schema of the DataFrame in PySpark. The schema defines the structure of the DataFrame, including column names and data types.
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [113]:
# converts the PySpark DataFrame df_pyspark to a Pandas DataFrame df_pandas
df_pandas = df_pyspark.toPandas()
df_pandas.head()

Unnamed: 0,Name,age,Experience,Salary
0,Krish,31,10,30000
1,Sudhanshu,30,8,25000
2,Sunny,29,4,20000
3,Paul,31,3,20000
4,Harsha,21,1,15000


In [114]:
type(df_pandas)

# **pyspark.sql.functions as F**

Importing pyspark.sql.functions as F is a common convention in PySpark for accessing various SQL functions provided by Spark SQL. pyspark.sql.functions contains a wide range of functions that one can use for data manipulation, transformation, aggregation, and more when working with PySpark DataFrames.

In [115]:
import pyspark.sql.functions as F

In [116]:
# Show DataFrame Content
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 31|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|       Jo| 23|         5| 25000|
|      Sam| 21|         4| 28000|
|       Li| 29|         1| 15000|
+---------+---+----------+------+



In [117]:
# Selecting Columns
df_selected = df_pyspark.select('Name', 'age', 'Salary')
df_selected.show()

+---------+---+------+
|     Name|age|Salary|
+---------+---+------+
|    Krish| 31| 30000|
|Sudhanshu| 30| 25000|
|    Sunny| 29| 20000|
|     Paul| 31| 20000|
|   Harsha| 21| 15000|
|  Shubham| 23| 18000|
|       Jo| 23| 25000|
|      Sam| 21| 28000|
|       Li| 29| 15000|
+---------+---+------+



In [118]:
# Filtering Rows
df_filtered = df_pyspark.filter(df_pyspark['age'] > 29)
df_filtered.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|     Paul| 31|         3| 20000|
+---------+---+----------+------+



In [119]:
# Grouping and Aggregating
df_grouped = df_pyspark.groupBy('Experience').agg({'Salary': 'sum'})
df_grouped.show()

+----------+-----------+
|Experience|sum(Salary)|
+----------+-----------+
|         3|    20000.0|
|         8|    25000.0|
|         5|    25000.0|
|         1|    30000.0|
|        10|    30000.0|
|         4|    48000.0|
|         2|    18000.0|
+----------+-----------+



In [122]:
# Group by 'Experience' and aggregate 'Salary' by sum
df_grouped = df_pyspark.groupBy('Experience').agg({'Salary': 'sum'})

#Show the grouped and aggregated DataFrame
df_grouped.show()

+----------+-----------+
|Experience|sum(Salary)|
+----------+-----------+
|         3|    20000.0|
|         8|    25000.0|
|         5|    25000.0|
|         1|    30000.0|
|        10|    30000.0|
|         4|    48000.0|
|         2|    18000.0|
+----------+-----------+



In [123]:
# Sort the result in descending order based on the sum of 'Salary'
df_grouped = df_grouped.orderBy(df_grouped['sum(Salary)'].desc())

# Show the grouped and aggregated DataFrame
df_grouped.show()

+----------+-----------+
|Experience|sum(Salary)|
+----------+-----------+
|         4|    48000.0|
|         1|    30000.0|
|        10|    30000.0|
|         8|    25000.0|
|         5|    25000.0|
|         3|    20000.0|
|         2|    18000.0|
+----------+-----------+



In [124]:
# Sorting
df_sorted = df_pyspark.orderBy('Experience')
df_sorted.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|   Harsha| 21|         1| 15000|
|       Li| 29|         1| 15000|
|    Krish| 31|        10| 30000|
|  Shubham| 23|         2| 18000|
|     Paul| 31|         3| 20000|
|    Sunny| 29|         4| 20000|
|      Sam| 21|         4| 28000|
|       Jo| 23|         5| 25000|
|Sudhanshu| 30|         8| 25000|
+---------+---+----------+------+



In [125]:
df_pyspark.dtypes

[('Name', 'string'),
 ('age', 'string'),
 ('Experience', 'string'),
 ('Salary', 'string')]

In [126]:
from pyspark.sql.functions import col

# Convert data types to integer
df_pyspark = df_pyspark.withColumn('age', df_pyspark['age'].cast('int'))
df_pyspark = df_pyspark.withColumn('Experience', df_pyspark['Experience'].cast('int'))
df_pyspark = df_pyspark.withColumn('Salary', df_pyspark['Salary'].cast('int'))

In [127]:
df_pyspark.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [128]:
# Sorting
df_sorted = df_pyspark.orderBy('Experience')
df_sorted.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|   Harsha| 21|         1| 15000|
|       Li| 29|         1| 15000|
|  Shubham| 23|         2| 18000|
|     Paul| 31|         3| 20000|
|    Sunny| 29|         4| 20000|
|      Sam| 21|         4| 28000|
|       Jo| 23|         5| 25000|
|Sudhanshu| 30|         8| 25000|
|    Krish| 31|        10| 30000|
+---------+---+----------+------+



In [134]:
# Joins
other_df = sparkSession.createDataFrame([(21, 'Adi'), (22, 'Bob'),(23, 'Charli'),(21, 'Jay'), (62, 'Mike')], ['age', 'Name'])
other_df.show()

+---+------+
|age|  Name|
+---+------+
| 21|   Adi|
| 22|   Bob|
| 23|Charli|
| 21|   Jay|
| 62|  Mike|
+---+------+



In [135]:
df_joined = df_pyspark.join(other_df, 'age')
df_joined.show()

+---+-------+----------+------+------+
|age|   Name|Experience|Salary|  Name|
+---+-------+----------+------+------+
| 21|    Sam|         4| 28000|   Adi|
| 21| Harsha|         1| 15000|   Adi|
| 23|     Jo|         5| 25000|Charli|
| 23|Shubham|         2| 18000|Charli|
| 21|    Sam|         4| 28000|   Jay|
| 21| Harsha|         1| 15000|   Jay|
+---+-------+----------+------+------+



In [136]:
df_joined = df_pyspark.join(other_df, 'age', how='left')
df_joined.show()

+---+---------+----------+------+------+
|age|     Name|Experience|Salary|  Name|
+---+---------+----------+------+------+
| 29|    Sunny|         4| 20000|  NULL|
| 29|       Li|         1| 15000|  NULL|
| 31|    Krish|        10| 30000|  NULL|
| 31|     Paul|         3| 20000|  NULL|
| 21|   Harsha|         1| 15000|   Jay|
| 21|   Harsha|         1| 15000|   Adi|
| 21|      Sam|         4| 28000|   Jay|
| 21|      Sam|         4| 28000|   Adi|
| 30|Sudhanshu|         8| 25000|  NULL|
| 23|  Shubham|         2| 18000|Charli|
| 23|       Jo|         5| 25000|Charli|
+---+---------+----------+------+------+



In [139]:
# Adding Columns
from pyspark.sql.functions import when, lit

# Add a new column 'Department' with values based on conditions
df_with_department = df_pyspark.withColumn('Department',
                                           when(df_pyspark['Name'] == 'Krish', lit('Sales'))
                                           .when(df_pyspark['Name'] == 'Paul', lit('Marketing'))
                                           .when((df_pyspark['Name'] == 'Sam') |
                                                 (df_pyspark['Name'] == 'Sunny') |
                                                 (df_pyspark['Name'] == 'Jo'), lit('IT'))
                                           .otherwise(lit('Other')))
# Add a new column 'Education' with values based on conditions
df_new = df_with_department.withColumn('Education',
                                           when(df_with_department['Name'] == 'Krish', lit('MBA'))
                                           .when(df_with_department['Name'] == 'Paul', lit('PhD'))
                                           .when((df_with_department['Name'] == 'Sam') |
                                                 (df_with_department['Name'] == 'Sunny') |
                                                 (df_with_department['Name'] == 'Jo'), lit('Bachelor'))
                                           .otherwise(lit('Unknown')))


df_new.show()

+---------+---+----------+------+----------+---------+
|     Name|age|Experience|Salary|Department|Education|
+---------+---+----------+------+----------+---------+
|    Krish| 31|        10| 30000|     Sales|      MBA|
|Sudhanshu| 30|         8| 25000|     Other|  Unknown|
|    Sunny| 29|         4| 20000|        IT| Bachelor|
|     Paul| 31|         3| 20000| Marketing|      PhD|
|   Harsha| 21|         1| 15000|     Other|  Unknown|
|  Shubham| 23|         2| 18000|     Other|  Unknown|
|       Jo| 23|         5| 25000|        IT| Bachelor|
|      Sam| 21|         4| 28000|        IT| Bachelor|
|       Li| 29|         1| 15000|     Other|  Unknown|
+---------+---+----------+------+----------+---------+



In [140]:
df_modified = df_new.drop('age')
df_modified.show()

+---------+----------+------+----------+---------+
|     Name|Experience|Salary|Department|Education|
+---------+----------+------+----------+---------+
|    Krish|        10| 30000|     Sales|      MBA|
|Sudhanshu|         8| 25000|     Other|  Unknown|
|    Sunny|         4| 20000|        IT| Bachelor|
|     Paul|         3| 20000| Marketing|      PhD|
|   Harsha|         1| 15000|     Other|  Unknown|
|  Shubham|         2| 18000|     Other|  Unknown|
|       Jo|         5| 25000|        IT| Bachelor|
|      Sam|         4| 28000|        IT| Bachelor|
|       Li|         1| 15000|     Other|  Unknown|
+---------+----------+------+----------+---------+



In [141]:
# Aggregating Functions
row_count = df_modified.count()
print("Row count:", row_count)

Row count: 9


In [142]:
# Using aggregate fun on df col--> gives df after the aggregation
from pyspark.sql import functions as F

# Row count
row_count = df_modified.count()
print("Row count:", row_count)

# Other aggregation functions
# Maximum value of a column
max_salary = df_modified.agg(F.max('Salary')).collect()[0][0]
print("Maximum salary:", max_salary)

# Minimum value of a column
min_value = df_modified.agg(F.min('Salary')).collect()[0][0]
print("Minimum value:", min_value)

# Sum of values in a column
sum_value = df_modified.agg(F.sum('Salary')).collect()[0][0]
print("Sum of values:", sum_value)

# Average value of a column
avg_value = df_modified.agg(F.avg('Salary')).collect()[0][0]
print("Average value:", avg_value)

# Count of non-null values in a column
count_non_null = df_modified.agg(F.count('Salary')).collect()[0][0]
print("Count of non-null values:", count_non_null)

# Count distinct values in a column
count_distinct = df_modified.agg(F.countDistinct('Salary')).collect()[0][0]
print("Count of distinct values:", count_distinct)

# Standard deviation of values in a column
std_dev = df_modified.agg(F.stddev('Salary')).collect()[0][0]
print("Standard deviation:", std_dev)

Row count: 9
Maximum salary: 30000
Minimum value: 15000
Sum of values: 196000
Average value: 21777.777777777777
Count of non-null values: 9
Count of distinct values: 6
Standard deviation: 5472.151719794001


# How does the code: df_modified.agg(F.max('Salary')).collect()[0][0], work?

In [145]:
df_modified.agg(F.max('Salary')).show()

+-----------+
|max(Salary)|
+-----------+
|      30000|
+-----------+



In [149]:
max_row_format=df_modified.agg(F.max('Salary')).collect()[0]
max_row_format # It gives a list of values ( array).

Row(max(Salary)=30000)

In [148]:
max_row_format[0] # It just gives a single max value.

30000

In [150]:
quantiles = df_pyspark.approxQuantile('Salary', [0.25, 0.5, 0.75], 0.05)
print("Quantiles:", quantiles)

Quantiles: [18000.0, 20000.0, 25000.0]


In [151]:
# Writing DataFrames
df_pyspark.write.format('csv').save('output_folder_csv')
df_pyspark.write.format('parquet').save('output_folder_parquet') #store and process large amounts of data by organizing data into columns rather than rows.

In [152]:
# Stop SparkSession
sparkSession.stop()