In [1]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
Collecting py4j==0.10.9.7
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Using legacy setup.py install for pyspark, since package 'wheel' is not installed.
Installing collected packages: py4j, pyspark
    Running setup.py install for pyspark: started
    Running setup.py install for pyspark: finished with status 'done'
Successfully installed py4j-0.10.9.7 pyspark-3.4.1


You should consider upgrading via the 'c:\users\rohit.pandey\onedrive - azure exponentia ai\desktop\devops\pyspark\env\scripts\python.exe -m pip install --upgrade pip' command.


In [2]:
import pyspark 

In [4]:
import pandas as pd

df = pd.read_csv('data.csv')
df

Unnamed: 0,name,age
0,rohit,23
1,sachin,28
2,shivam,25


### if you really want to work with spark we have start the spark session 

In [6]:
from pyspark.sql import SparkSession


In [10]:
spark = SparkSession.builder.appName('Practice').getOrCreate()

In [11]:
spark

In [12]:
df_spark = spark.read.csv('data.csv')

In [14]:
df_spark.show()

+------+---+
|   _c0|_c1|
+------+---+
| name |age|
| rohit| 23|
|sachin| 28|
|shivam| 25|
+------+---+



In [19]:
# if you see above we get column name _c0 or _c1 instead name age because it is taking by default columns name , 

# to make it name and as collumn we have to do it manually
df_pyspark = spark.read.option('header','true').csv('data.csv')    # it will make first row as header

In [20]:
df_pyspark.show()

+------+---+
| name |age|
+------+---+
| rohit| 23|
|sachin| 28|
|shivam| 25|
+------+---+



In [22]:
type(df)

pandas.core.frame.DataFrame

In [23]:

type(df_pyspark)  # 

pyspark.sql.dataframe.DataFrame

In [27]:
df_pyspark.head(2)  # it will show top 2 records

[Row(name ='rohit', age='23'), Row(name ='sachin', age='28')]

In [29]:
# lets see the schema of df_pyspark
df_pyspark.printSchema()

root
 |-- name : string (nullable = true)
 |-- age: string (nullable = true)



## lets cover


- pyspark dataframe
- reading the dataset
- checking the datatype of column(Schema)
- check column and indexing
- check describe option similar to pandas
- adding column dropping column 

In [30]:
from pyspark.sql import SparkSession

In [34]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()

In [35]:
spark

In [36]:
spark_df = spark.read.option('header','true').csv('data2.csv')

In [37]:
spark_df.show()

+------+---+----------+
| name |age|Experience|
+------+---+----------+
| rohit| 23|        10|
|sachin| 28|         8|
|shivam| 25|         4|
+------+---+----------+



In [38]:
# lets check the schema similear to df.info() in pandas

spark_df.printSchema()

root
 |-- name : string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)



In [39]:
# by default spark will take all values as string thats why be seeing here string 
# to make datatype as per the value we have to pass one more arguments in csv function called inferSchem = true lets do that 

spark_df = spark.read.option('header','true').csv('data2.csv', inferSchema='true')

In [40]:
spark_df.printSchema()

root
 |-- name : string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)



In [47]:
# we also don't need of option function to pass header we can pass in csv method also along with inferschema  

spark_df = spark.read.option('header','true').csv('data2.csv', header=True, inferSchema='true')
spark_df.printSchema()
spark_df.show()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)

+------+---+----------+
|  name|age|Experience|
+------+---+----------+
| rohit| 23|        10|
|sachin| 28|         8|
|shivam| 25|         4|
+------+---+----------+



In [48]:




# dataframe is kind of datastructure to perform various kind of operations 




##### selecting column and indexing 

In [49]:
spark_df.columns

['name', 'age', 'Experience']

In [50]:
# lets select the columns 

spark_df.select('name').show()    # this again the dataframe 


+------+
|  name|
+------+
| rohit|
|sachin|
|shivam|
+------+



In [51]:
# to select multiple column we have to pass list of colums name
spark_df.select(['name','age']).show()


+------+---+
|  name|age|
+------+---+
| rohit| 23|
|sachin| 28|
|shivam| 25|
+------+---+



#### checking the datatype of columns using dtype 

In [52]:
spark_df.dtypes

[('name', 'string'), ('age', 'int'), ('Experience', 'int')]

#### check the describe

In [53]:
spark_df.describe()

DataFrame[summary: string, name: string, age: string, Experience: string]

In [54]:
spark_df.describe().show()

+-------+------+------------------+-----------------+
|summary|  name|               age|       Experience|
+-------+------+------------------+-----------------+
|  count|     3|                 3|                3|
|   mean|  null|25.333333333333332|7.333333333333333|
| stddev|  null| 2.516611478423583|3.055050463303893|
|    min| rohit|                23|                4|
|    max|shivam|                28|               10|
+-------+------+------------------+-----------------+



#### adding and dropping collumns 

In [55]:
#spark_df.withColumn returns the new :class:'dataframe'by adding the column or replacing the existing column that has the same name 

In [57]:
spark_df.withColumn('Experience after 2 year', spark_df['Experience']+2).show()

+------+---+----------+-----------------------+
|  name|age|Experience|Experience after 2 year|
+------+---+----------+-----------------------+
| rohit| 23|        10|                     12|
|sachin| 28|         8|                     10|
|shivam| 25|         4|                      6|
+------+---+----------+-----------------------+



In [59]:
spark_df['Experience']   # return only commun 

Column<'Experience'>

In [60]:
# dropping collumns using drop function 

In [62]:
spark_df.drop('Experience after 2 year')
spark_df.show()

+------+---+----------+
|  name|age|Experience|
+------+---+----------+
| rohit| 23|        10|
|sachin| 28|         8|
|shivam| 25|         4|
+------+---+----------+



In [67]:
# renaming the column 

spark_df.withColumnRenamed('name', 'new Name').show()

+--------+---+----------+
|new Name|age|Experience|
+--------+---+----------+
|   rohit| 23|        10|
|  sachin| 28|         8|
|  shivam| 25|         4|
+--------+---+----------+



## pyspark handling missing values 


- dropping columns
- Dropping rows
- Various parameter in Dropping functionalities 
- handling missing values by mean , mediun, mode

In [68]:
from pyspark.sql import SparkSession


In [69]:
spark = SparkSession.builder.appName('Practice3').getOrCreate()

In [78]:
# lets read the csv file 

spark_df = spark.read.csv('data3.csv', header=True, inferSchema=True)
spark_df.show()

+---------+----+----------+-------+
|     name| age|Experience| salary|
+---------+----+----------+-------+
|    rohit|  23|        10|  25000|
|   sachin|  28|         8|  20000|
|   shivam|  25|         4|  20000|
|sudhandhu|  32|         3|1000000|
|    priya|  22|         5| 800000|
|    shyam|  25|         3| 100000|
|     null|  56|        10|   null|
|   mahesh|null|      null|  40000|
|     null|  10|      null|   null|
+---------+----+----------+-------+



In [76]:
# in above dataframe clearly we can see the null values lets handle it 

In [80]:


# drop the column 

spark_df.drop('name').show()

+----+----------+-------+
| age|Experience| salary|
+----+----------+-------+
|  23|        10|  25000|
|  28|         8|  20000|
|  25|         4|  20000|
|  32|         3|1000000|
|  22|         5| 800000|
|  25|         3| 100000|
|  56|        10|   null|
|null|      null|  40000|
|  10|      null|   null|
+----+----------+-------+



In [81]:
# lets drop nan values  Na object has varius methods like replace fill drop

spark_df.na.drop().show()

+---------+---+----------+-------+
|     name|age|Experience| salary|
+---------+---+----------+-------+
|    rohit| 23|        10|  25000|
|   sachin| 28|         8|  20000|
|   shivam| 25|         4|  20000|
|sudhandhu| 32|         3|1000000|
|    priya| 22|         5| 800000|
|    shyam| 25|         3| 100000|
+---------+---+----------+-------+



In [89]:
# drop has varius features like na.drop(how='any', thresh=None, subset=None)

spark_df.na.drop(how='all').show()# how can have two value any or all , in case of any drop will remove row and all means  if all column contain null then only it will remove 

spark_df.na.drop(how='any', thresh=2).show() # it will check if in complete row atleast two values is non null then it will not delete else it will delete the rows    

spark_df.na.drop(how='any', subset=['age','Experience','salary']).show()  # it will check NA values only in age|Experience| salary 
spark_df.na.drop(how='any', subset=['salary']).show()

+---------+----+----------+-------+
|     name| age|Experience| salary|
+---------+----+----------+-------+
|    rohit|  23|        10|  25000|
|   sachin|  28|         8|  20000|
|   shivam|  25|         4|  20000|
|sudhandhu|  32|         3|1000000|
|    priya|  22|         5| 800000|
|    shyam|  25|         3| 100000|
|     null|  56|        10|   null|
|   mahesh|null|      null|  40000|
|     null|  10|      null|   null|
+---------+----+----------+-------+

+---------+----+----------+-------+
|     name| age|Experience| salary|
+---------+----+----------+-------+
|    rohit|  23|        10|  25000|
|   sachin|  28|         8|  20000|
|   shivam|  25|         4|  20000|
|sudhandhu|  32|         3|1000000|
|    priya|  22|         5| 800000|
|    shyam|  25|         3| 100000|
|     null|  56|        10|   null|
|   mahesh|null|      null|  40000|
+---------+----+----------+-------+

+---------+---+----------+-------+
|     name|age|Experience| salary|
+---------+---+----------+--

In [99]:
# filling the missing values  it will take two parameter the values and subset, value from wich na replace , thresold again set of columns 

# make sure the the type of column should we same if you have enable inferSchema = true while rading 

spark_df.na.fill(0, ['Experience','age','salary']).show()

spark_df.na.fill('no name', ['name']).show()


+---------+---+----------+-------+
|     name|age|Experience| salary|
+---------+---+----------+-------+
|    rohit| 23|        10|  25000|
|   sachin| 28|         8|  20000|
|   shivam| 25|         4|  20000|
|sudhandhu| 32|         3|1000000|
|    priya| 22|         5| 800000|
|    shyam| 25|         3| 100000|
|     null| 56|        10|      0|
|   mahesh|  0|         0|  40000|
|     null| 10|         0|      0|
+---------+---+----------+-------+

+---------+----+----------+-------+
|     name| age|Experience| salary|
+---------+----+----------+-------+
|    rohit|  23|        10|  25000|
|   sachin|  28|         8|  20000|
|   shivam|  25|         4|  20000|
|sudhandhu|  32|         3|1000000|
|    priya|  22|         5| 800000|
|    shyam|  25|         3| 100000|
|  no name|  56|        10|   null|
|   mahesh|null|      null|  40000|
|  no name|  10|      null|   null|
+---------+----+----------+-------+



In [100]:
# replacing the value with mean using imputer function 


from pyspark.ml.feature import Imputer

In [101]:
imputer = Imputer(
    inputCols = ['Experience','age','salary'],
    outputCols=["{}_imputed".format(c) for c in ['Experience','age','salary'] ]
).setStrategy("mean")  # can chenge strategy with mean mediun mode 

In [102]:
imputer.fit(spark_df).transform(spark_df).show()  # using this we have replaced the na values with means 

+---------+----+----------+-------+------------------+-----------+--------------+
|     name| age|Experience| salary|Experience_imputed|age_imputed|salary_imputed|
+---------+----+----------+-------+------------------+-----------+--------------+
|    rohit|  23|        10|  25000|                10|         23|         25000|
|   sachin|  28|         8|  20000|                 8|         28|         20000|
|   shivam|  25|         4|  20000|                 4|         25|         20000|
|sudhandhu|  32|         3|1000000|                 3|         32|       1000000|
|    priya|  22|         5| 800000|                 5|         22|        800000|
|    shyam|  25|         3| 100000|                 3|         25|        100000|
|     null|  56|        10|   null|                10|         56|        286428|
|   mahesh|null|      null|  40000|                 6|         27|         40000|
|     null|  10|      null|   null|                 6|         10|        286428|
+---------+----+

## pyspark dataframe 
- Filter Operation 
- &, |, ==
- `


In [114]:
from pyspark.sql import SparkSession

In [115]:
spark = SparkSession.builder.appName('practice4').getOrCreate()

In [116]:
spark_df = spark.read.csv('data4.csv', header=True, inferSchema=True)

In [117]:
spark_df.show()

+---------+---+----------+-------+
|     name|age|Experience| salary|
+---------+---+----------+-------+
|    rohit| 23|        10|  25000|
|   sachin| 28|         8|  20000|
|   shivam| 25|         4|  20000|
|sudhandhu| 32|         3|1000000|
|    priya| 22|         5| 800000|
|    shyam| 25|         3| 100000|
+---------+---+----------+-------+



### filter opration 


In [118]:
# salary of the people less than or equal to 25000

spark_df.filter("salary<=25000").show()

+------+---+----------+------+
|  name|age|Experience|salary|
+------+---+----------+------+
| rohit| 23|        10| 25000|
|sachin| 28|         8| 20000|
|shivam| 25|         4| 20000|
+------+---+----------+------+



In [119]:
spark_df.filter("salary==25000").show()

+-----+---+----------+------+
| name|age|Experience|salary|
+-----+---+----------+------+
|rohit| 23|        10| 25000|
+-----+---+----------+------+



In [120]:
spark_df.filter("salary<=25000").select(['name','salary']).show()

+------+------+
|  name|salary|
+------+------+
| rohit| 25000|
|sachin| 20000|
|shivam| 20000|
+------+------+



In [123]:
spark_df.filter(spark_df['salary']==25000).show()
spark_df.filter((spark_df['salary']==25000) & (spark_df['name']=='rohit')  ).show()
spark_df.filter((spark_df['salary']==25000) | (spark_df['name']=='sachin')  ).show()

+-----+---+----------+------+
| name|age|Experience|salary|
+-----+---+----------+------+
|rohit| 23|        10| 25000|
+-----+---+----------+------+

+-----+---+----------+------+
| name|age|Experience|salary|
+-----+---+----------+------+
|rohit| 23|        10| 25000|
+-----+---+----------+------+

+------+---+----------+------+
|  name|age|Experience|salary|
+------+---+----------+------+
| rohit| 23|        10| 25000|
|sachin| 28|         8| 20000|
+------+---+----------+------+

