- Create a spark session

In [25]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Dataframe_session_name').getOrCreate()

In [26]:
spark

- Read a CSV file

In [27]:
df = spark.read.csv('BasicDataframe.csv')
df

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string]

In [28]:
df.show()

+-------+---+------+----------+----------+
|    _c0|_c1|   _c2|       _c3|       _c4|
+-------+---+------+----------+----------+
|   Name|Age|Salary|Department|Experience|
|  Pawan| 26| 50000| Furniture|         4|
|Chandar| 55| 70000| Furniture|        20|
| Sanjay| 22| 35000|Operations|         1|
|  Anand| 25| 20000| Furniture|         1|
+-------+---+------+----------+----------+



## Dataframes

- Making top row as header

In [29]:
df = spark.read.option('header','true').csv('BasicDataframe.csv')
df.show()

+-------+---+------+----------+----------+
|   Name|Age|Salary|Department|Experience|
+-------+---+------+----------+----------+
|  Pawan| 26| 50000| Furniture|         4|
|Chandar| 55| 70000| Furniture|        20|
| Sanjay| 22| 35000|Operations|         1|
|  Anand| 25| 20000| Furniture|         1|
+-------+---+------+----------+----------+



In [30]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Salary: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Experience: string (nullable = true)



In [31]:
from pyspark.sql.functions import col
df = df.withColumn('Salary', col('Salary').cast('integer'))
df = df.withColumn('Age', col('Age').cast('integer'))
df = df.withColumn('Experience', col('Experience').cast('integer'))
df.show()

+-------+---+------+----------+----------+
|   Name|Age|Salary|Department|Experience|
+-------+---+------+----------+----------+
|  Pawan| 26| 50000| Furniture|         4|
|Chandar| 55| 70000| Furniture|        20|
| Sanjay| 22| 35000|Operations|         1|
|  Anand| 25| 20000| Furniture|         1|
+-------+---+------+----------+----------+



In [32]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- Experience: integer (nullable = true)



In [33]:
df = spark.read.csv(header=True, path='BasicDataframe.csv', inferSchema=True)
df.show()

+-------+---+------+----------+----------+
|   Name|Age|Salary|Department|Experience|
+-------+---+------+----------+----------+
|  Pawan| 26| 50000| Furniture|       4.0|
|Chandar| 55| 70000| Furniture|      20.0|
| Sanjay| 22| 35000|Operations|       1.0|
|  Anand| 25| 20000| Furniture|       1.0|
+-------+---+------+----------+----------+



In [34]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- Experience: double (nullable = true)



In [35]:
type(df)

pyspark.sql.dataframe.DataFrame

In [36]:
df.head()

Row(Name='Pawan', Age=26, Salary=50000, Department='Furniture', Experience=4.0)

In [37]:
df.head(3)

[Row(Name='Pawan', Age=26, Salary=50000, Department='Furniture', Experience=4.0),
 Row(Name='Chandar', Age=55, Salary=70000, Department='Furniture', Experience=20.0),
 Row(Name='Sanjay', Age=22, Salary=35000, Department='Operations', Experience=1.0)]

In [38]:
df.tail(2)

[Row(Name='Sanjay', Age=22, Salary=35000, Department='Operations', Experience=1.0),
 Row(Name='Anand', Age=25, Salary=20000, Department='Furniture', Experience=1.0)]

In [39]:
df.isEmpty()

False

In [40]:
df.explain()

== Physical Plan ==
FileScan csv [Name#410,Age#411,Salary#412,Department#413,Experience#414] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/d:/DEVELOPMENT/Big Data/Pyspark/BasicDataframe.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Name:string,Age:int,Salary:int,Department:string,Experience:double>




In [41]:
df.distinct().count()

4

In [42]:
df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- Experience: double (nullable = true)



In [43]:
type(df)

pyspark.sql.dataframe.DataFrame

In [44]:
df.columns

['Name', 'Age', 'Salary', 'Department', 'Experience']

In [45]:
print(df.count())

4


- Selection

In [46]:
df.select('Name').show()  # dataframe type

+-------+
|   Name|
+-------+
|  Pawan|
|Chandar|
| Sanjay|
|  Anand|
+-------+



In [47]:
df.select('Name').where(df['Salary']>50000).show()

+-------+
|   Name|
+-------+
|Chandar|
+-------+



In [48]:
df.select('Name').where(df['Salary']>50000).count()

1

In [49]:
df.select(['Name', 'Age']).show()  # dataframe type

+-------+---+
|   Name|Age|
+-------+---+
|  Pawan| 26|
|Chandar| 55|
| Sanjay| 22|
|  Anand| 25|
+-------+---+



In [50]:
df.dtypes

[('Name', 'string'),
 ('Age', 'int'),
 ('Salary', 'int'),
 ('Department', 'string'),
 ('Experience', 'double')]

In [51]:
df.describe()

DataFrame[summary: string, Name: string, Age: string, Salary: string, Department: string, Experience: string]

In [52]:
df.describe().show()

+-------+------+------------------+-----------------+----------+---------------+
|summary|  Name|               Age|           Salary|Department|     Experience|
+-------+------+------------------+-----------------+----------+---------------+
|  count|     4|                 4|                4|         4|              4|
|   mean|  NULL|              32.0|          43750.0|      NULL|            6.5|
| stddev|  NULL|15.427248620541512|21360.00936329383|      NULL|9.1104335791443|
|    min| Anand|                22|            20000| Furniture|            1.0|
|    max|Sanjay|                55|            70000|Operations|           20.0|
+-------+------+------------------+-----------------+----------+---------------+



- CRUD columns

In [53]:
df = df.withColumn('5 years later', df['Age']+5)
df.show()

+-------+---+------+----------+----------+-------------+
|   Name|Age|Salary|Department|Experience|5 years later|
+-------+---+------+----------+----------+-------------+
|  Pawan| 26| 50000| Furniture|       4.0|           31|
|Chandar| 55| 70000| Furniture|      20.0|           60|
| Sanjay| 22| 35000|Operations|       1.0|           27|
|  Anand| 25| 20000| Furniture|       1.0|           30|
+-------+---+------+----------+----------+-------------+



In [54]:
df = df.withColumnRenamed('5 years later', 'Age+5')
df.show()

+-------+---+------+----------+----------+-----+
|   Name|Age|Salary|Department|Experience|Age+5|
+-------+---+------+----------+----------+-----+
|  Pawan| 26| 50000| Furniture|       4.0|   31|
|Chandar| 55| 70000| Furniture|      20.0|   60|
| Sanjay| 22| 35000|Operations|       1.0|   27|
|  Anand| 25| 20000| Furniture|       1.0|   30|
+-------+---+------+----------+----------+-----+



In [55]:
df = df.drop('Age+5')
df.show()

+-------+---+------+----------+----------+
|   Name|Age|Salary|Department|Experience|
+-------+---+------+----------+----------+
|  Pawan| 26| 50000| Furniture|       4.0|
|Chandar| 55| 70000| Furniture|      20.0|
| Sanjay| 22| 35000|Operations|       1.0|
|  Anand| 25| 20000| Furniture|       1.0|
+-------+---+------+----------+----------+



## Filter

In [56]:
df.show()

+-------+---+------+----------+----------+
|   Name|Age|Salary|Department|Experience|
+-------+---+------+----------+----------+
|  Pawan| 26| 50000| Furniture|       4.0|
|Chandar| 55| 70000| Furniture|      20.0|
| Sanjay| 22| 35000|Operations|       1.0|
|  Anand| 25| 20000| Furniture|       1.0|
+-------+---+------+----------+----------+



In [57]:
df.filter('Salary<=40000').show()

+------+---+------+----------+----------+
|  Name|Age|Salary|Department|Experience|
+------+---+------+----------+----------+
|Sanjay| 22| 35000|Operations|       1.0|
| Anand| 25| 20000| Furniture|       1.0|
+------+---+------+----------+----------+



In [58]:
df.filter('Salary<=40000').select(['Name','Age']).show()

+------+---+
|  Name|Age|
+------+---+
|Sanjay| 22|
| Anand| 25|
+------+---+



In [59]:
df.filter(df['Salary']<=40000).select(['Name','Age']).show()

+------+---+
|  Name|Age|
+------+---+
|Sanjay| 22|
| Anand| 25|
+------+---+



- AND

In [60]:
df.filter((df['Salary']>40000) & (df['Salary']<60000)).select(['Name','Age']).show()

+-----+---+
| Name|Age|
+-----+---+
|Pawan| 26|
+-----+---+



- NOT

In [61]:
df.filter(~((df['Salary']>40000) & (df['Salary']<60000))).select(['Name','Age']).show()

+-------+---+
|   Name|Age|
+-------+---+
|Chandar| 55|
| Sanjay| 22|
|  Anand| 25|
+-------+---+



- OR

In [62]:
df.filter((df['Salary']<=40000) | (df['Salary']>60000)).select(['Name','Age']).show()

+-------+---+
|   Name|Age|
+-------+---+
|Chandar| 55|
| Sanjay| 22|
|  Anand| 25|
+-------+---+



## Group by and Agg.

In [63]:
df.show()

+-------+---+------+----------+----------+
|   Name|Age|Salary|Department|Experience|
+-------+---+------+----------+----------+
|  Pawan| 26| 50000| Furniture|       4.0|
|Chandar| 55| 70000| Furniture|      20.0|
| Sanjay| 22| 35000|Operations|       1.0|
|  Anand| 25| 20000| Furniture|       1.0|
+-------+---+------+----------+----------+



In [64]:
type(df.groupBy('Department'))

pyspark.sql.group.GroupedData

In [65]:
df.groupBy('Department').count().show()

+----------+-----+
|Department|count|
+----------+-----+
| Furniture|    3|
|Operations|    1|
+----------+-----+



In [66]:
df.groupBy('Department', 'Experience').count().show()

+----------+----------+-----+
|Department|Experience|count|
+----------+----------+-----+
| Furniture|       1.0|    1|
| Furniture|      20.0|    1|
| Furniture|       4.0|    1|
|Operations|       1.0|    1|
+----------+----------+-----+



In [67]:
df.groupBy('Department').min('Salary').show()

+----------+-----------+
|Department|min(Salary)|
+----------+-----------+
| Furniture|      20000|
|Operations|      35000|
+----------+-----------+



In [68]:
df.groupBy('Department').mean().show()

+----------+------------------+------------------+-----------------+
|Department|          avg(Age)|       avg(Salary)|  avg(Experience)|
+----------+------------------+------------------+-----------------+
| Furniture|35.333333333333336|46666.666666666664|8.333333333333334|
|Operations|              22.0|           35000.0|              1.0|
+----------+------------------+------------------+-----------------+



In [69]:
df.groupBy('Department').max('Salary').show()

+----------+-----------+
|Department|max(Salary)|
+----------+-----------+
| Furniture|      70000|
|Operations|      35000|
+----------+-----------+



In [70]:
df.groupBy('Department').sum('Salary').show()

+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
| Furniture|     140000|
|Operations|      35000|
+----------+-----------+



In [71]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf().setAppName('Map_and_filter').setMaster("local[*]")
spark = SparkSession(conf = conf)

n = [1,2,3,4,5]
rdd = spark.parallelize(n)

sq_rdd = rdd.map(lambda x: x * x)

filtered_rdd = sq_rdd.filter(lambda x: x > 16)

res = filtered_rdd.collect()
print(res)

spark.stop()

TypeError: SparkSession.__init__() got an unexpected keyword argument 'conf'

In [75]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("MapFilter").setMaster("local")
sc = SparkContext(conf=conf)

data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

mapped_rdd = rdd.map(lambda x: x * 2)
print(mapped_rdd.collect())  # Output: [2, 4, 6, 8, 10]

filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
print(filtered_rdd.collect())  # Output: [2, 4]

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Dataframe_session_name, master=local[*]) created by getOrCreate at C:\Users\adity\AppData\Local\Temp\ipykernel_12400\632769566.py:2 