# PYSPARK [Dataframe, transformation n action,views] NOV 20 2024

# `Transformation and Actions`

### parallelize()
In PySpark, parallelize() is a function used to create an RDD (Resilient Distributed Dataset) from a local Python collection.

This **allows** to distribute your data across multiple nodes in a cluster, enabling parallel processing and significantly improving performance for large datasets.

### collect()
It is an action that retrieves all elements from an RDD or DataFrame and brings them to the driver node as a list.

In [0]:

#to create rdds and  dataframe
from pyspark import SparkContext
from pyspark.sql import  SparkSession

sc =SparkContext.getOrCreate()
spark = SparkSession.builder.appName('pyspark first program').getOrCreate()

#create the rdd

rdd = sc.parallelize([('C',85,76,87,91), ('B',85,76,87,91), ("A", 85,78,96,92), ("A", 92,76,89,96)], 4)
mydata = ['Division','English','Mathematics','Physics','Chemistry']
marks_df = spark.createDataFrame(rdd, schema=mydata)

print(rdd.collect())
print("\n")
print(rdd) #---Transformation which gives rdd value
print("\n")
rdd.collect() #----Action gives non rdd value

[('C', 85, 76, 87, 91), ('B', 85, 76, 87, 91), ('A', 85, 78, 96, 92), ('A', 92, 76, 89, 96)]


ParallelCollectionRDD[158] at readRDDFromInputStream at PythonRDD.scala:435


Out[44]: [('C', 85, 76, 87, 91),
 ('B', 85, 76, 87, 91),
 ('A', 85, 78, 96, 92),
 ('A', 92, 76, 89, 96)]

### first()
It is an action that returns the first element from an RDD or DataFrame.

In [0]:
first_rdd = sc.parallelize([1,2,3,4,5,5,6,7,8,9])
first_rdd.first() #First method is action

Out[45]: 1

### count()
It  is an action that returns the total number of elements in an RDD or DataFrame.

In [0]:
rdd = sc.parallelize([('C',85,76,87,91), ('B',85,76,87,91), ("A", 85,78,96,92), ("A", 92,76,89,96)], 4)
mydata = ['Division','English','Mathematics','Physics','Chemistry']
marks_df = spark.createDataFrame(rdd, schema=mydata)
print(rdd.count())

4


In [0]:
count_rdd = sc.parallelize([1,2,3,4,5,5,6,7,8,9])
print(count_rdd.count())
count_rdd.count()

10
Out[47]: 10

### take(n) 
It is an action that returns the first n elements from an RDD or DataFrame as a list.

In [0]:
rdd = sc.parallelize([('C',85,76,87,91), ('B',85,76,87,91), ("A", 85,78,96,92), ("A", 92,76,89,96)], 4)
mydata = ['Division','English','Mathematics','Physics','Chemistry']
marks_df = spark.createDataFrame(rdd, schema=mydata)
rdd.take(2)

Out[48]: [('C', 85, 76, 87, 91), ('B', 85, 76, 87, 91)]

In [0]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
count_rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
print(count_rdd.take(2))


[1, 2]


In [0]:
count_rdd = sc.parallelize([1,2,3,4,5,5,6,7,8,9])
print(count_rdd.take(2))
count_rdd.take(6)


[1, 2]
Out[50]: [1, 2, 3, 4, 5, 5]

### Lambda function

#### 1. map()
It is a transformation that applies a function to each element of an RDD or DataFrame, creating a new RDD or DataFrame with the transformed elements.

In [0]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
map_rdd = sc.parallelize([1,2,3])
print(map_rdd.map(lambda x:x+10))
print(map_rdd.map(lambda x:x+10).collect())

PythonRDD[193] at RDD at PythonRDD.scala:58
[11, 12, 13]


#### 2. reduce()
It is an action that aggregates elements of an RDD or DataFrame into a single value.

In [0]:
reduce_rdd = sc.parallelize([1,2,3])
print(reduce_rdd.reduce(lambda x,y:x+y))

6


#### 3. filter()
It is a transformation that creates a new RDD or DataFrame containing only the elements that satisfy a given condition.

In [0]:
filter_rdd = sc.parallelize([1,2,3])
print(filter_rdd.filter(lambda x:x%2==0))
print(filter_rdd.filter(lambda x:x%2==0).collect())

PythonRDD[198] at RDD at PythonRDD.scala:58
[2]


#### 3.a startswith('')
It is a function used to check if a string or column starts with a specific prefix. It returns a boolean value: True if the string starts with the prefix, False otherwise.

In [0]:
from pyspark import SparkContext
# the code filters the filter_rdd_2 RDD to keep only strings starting with 'R' 
# and then prints the filtered elements to the console.
sc = SparkContext.getOrCreate()
filter_rdd_2 = sc.parallelize(['Rahul', 'Swati', 'Rohan', 'Shreya', 'Priya'])
print(filter_rdd_2.filter(lambda x: x.startswith('R')).collect())

['Rahul', 'Rohan']


### saveAsTextFile()
It is an action that saves the contents of an RDD or DataFrame as text files.

In [0]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
save_rdd = sc.parallelize([1,2,3,4,5,5])
save_rdd.saveAsTextFile('/Users/vsprakash427@gmail.com/file3.txt')

### union()
It is an action that combines two or more RDDs or DataFrames with the same schema into a single RDD or DataFrame.

In [0]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
uninon_inp = sc.parallelize([2,4,5,6,7,8,9,10])
uninon_rdd_1 = uninon_inp.filter(lambda x:x % 2 == 0)
uninon_rdd_2 = uninon_inp.filter(lambda x:x % 3 == 0)
print(uninon_rdd_1.union(uninon_rdd_2).collect())

[2, 4, 6, 8, 10, 6, 9]


### split() and flatMap()

1. The split() function in PySpark is typically used to split a string column in a DataFrame into multiple columns based on a specific delimiter. The delimiter is specified as a string argument to the split() function.

2. In PySpark, flatMap() is a transformation that applies a function to each element of an RDD or DataFrame, and then flattens the resulting sequence of elements into a new RDD or DataFrame.

In [0]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
flatmap_rdd = sc.parallelize(["Hey there", "This is PySpark RDD Transformations"])
print(flatmap_rdd.flatMap(lambda x :x.split(" ").collect()))
flatmap_rdd.flatMap(lambda x :x.split(" ").collect())

PythonRDD[211] at RDD at PythonRDD.scala:58
Out[57]: PythonRDD[212] at RDD at PythonRDD.scala:58

In [0]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
flatmap_rdd = sc.parallelize(["Hey there", "This is PySpark RDD Transformations"])
(flatmap_rdd.flatMap(lambda x: x.split(" ")).collect())

Out[60]: ['Hey', 'there', 'This', 'is', 'PySpark', 'RDD', 'Transformations']

# `Pair RDD`

### collect() - action

In [0]:
marks = [('Rahul', 88), ('Swati', 92), ('Shreya', 83), ('Abhay', 93), ('Rohan', 78)]
sc.parallelize(marks).collect()

Out[7]: [('Rahul', 88), ('Swati', 92), ('Shreya', 83), ('Abhay', 93), ('Rohan', 78)]

### reduceByey() transformation

In [0]:
marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22),
('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
print(marks_rdd.reduceByKey(lambda x, y: x + y).collect())

[('Shreya', 50), ('Swati', 45), ('Rahul', 48), ('Abhay', 55), ('Rohan', 44)]


### sortByKey() transformation

In [0]:
marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22),
('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
print(marks_rdd.sortByKey('ascending').collect())

[('Abhay', 29), ('Abhay', 26), ('Rahul', 25), ('Rahul', 23), ('Rohan', 22), ('Rohan', 22), ('Shreya', 22), ('Shreya', 28), ('Swati', 26), ('Swati', 19)]


### groupByKey() transformation

In [0]:
marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22),
('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.groupByKey().collect()
for key, value in dict_rdd:
  print(key, list(value))

Shreya [22, 28]
Swati [26, 19]
Rahul [25, 23]
Abhay [29, 26]
Rohan [22, 22]


### countByKey() action

In [0]:
marks_rdd = sc.parallelize([('Rahul', 25), ('Swati', 26), ('Rohan', 22), ('Rahul', 23), ('Swati', 19),
('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.countByKey().items()
for key, value in dict_rdd:
  print(key, value)

Rahul 2
Swati 2
Rohan 2
Shreya 1
Abhay 1


# `Select, Rename, Filter Data in a Pandas DF`

### withColumnRenamed()

In [0]:
# Importing necessary libraries
from pyspark.sql import SparkSession
 
# Create a spark session
spark = SparkSession.builder.appName('pyspark - example join').getOrCreate()
 
# Create data in dataframe
data = [(('Ram'), '1991-04-01', 'M', 3000),
        (('Mike'), '2000-05-19', 'M', 4000),
        (('Rohini'), '1978-09-05', 'M', 4000),
        (('Maria'), '1967-12-01', 'F', 4000),
        (('Jenis'), '1980-02-17', 'F', 1200)]
 
# Column names in dataframe
columns = ["Name", "DOB", "Gender", "salary"]
 
# Create the spark dataframe
df = spark.createDataFrame(data=data,
                           schema=columns)
df.withColumnRenamed("DOB","date of birth").show()
df.withColumnRenamed("DOB","date of birth").withColumnRenamed("Name","personname").show()

+------+-------------+------+------+
|  Name|date of birth|Gender|salary|
+------+-------------+------+------+
|   Ram|   1991-04-01|     M|  3000|
|  Mike|   2000-05-19|     M|  4000|
|Rohini|   1978-09-05|     M|  4000|
| Maria|   1967-12-01|     F|  4000|
| Jenis|   1980-02-17|     F|  1200|
+------+-------------+------+------+

+----------+-------------+------+------+
|personname|date of birth|Gender|salary|
+----------+-------------+------+------+
|       Ram|   1991-04-01|     M|  3000|
|      Mike|   2000-05-19|     M|  4000|
|    Rohini|   1978-09-05|     M|  4000|
|     Maria|   1967-12-01|     F|  4000|
|     Jenis|   1980-02-17|     F|  1200|
+----------+-------------+------+------+



### selectExpr()

In [0]:
# Importing necessary libraries using select exp
from pyspark.sql import SparkSession
 
# Create a spark session
spark = SparkSession.builder.appName('pyspark - example join').getOrCreate()
 
# Create data in dataframe
data = [(('Ram'), '1991-04-01', 'M', 3000),
        (('Mike'), '2000-05-19', 'M', 4000),
        (('Rohini'), '1978-09-05', 'M', 4000),
        (('Maria'), '1967-12-01', 'F', 4000),
        (('Jenis'), '1980-02-17', 'F', 1200)]
 
# Column names in dataframe
columns = ["Name", "DOB", "Gender", "salary"]
 
# Create the spark dataframe
df = spark.createDataFrame(data=data, schema=columns)


data = df.selectExpr("Gender as category","DOB","Name as name","salary")
 
data.show()

+--------+----------+------+------+
|category|       DOB|  name|salary|
+--------+----------+------+------+
|       M|1991-04-01|   Ram|  3000|
|       M|2000-05-19|  Mike|  4000|
|       M|1978-09-05|Rohini|  4000|
|       F|1967-12-01| Maria|  4000|
|       F|1980-02-17| Jenis|  1200|
+--------+----------+------+------+



### alias() for column

In [0]:
from pyspark.sql.functions import col
 
# Select the 'salary' as 'Amount' using aliasing
# Select remaining with their original name
data = df.select(col("Name"),col("DOB"),
                 col("Gender"),
                 col("salary").alias('Amount'))
data.show()

+------+----------+------+------+
|  Name|       DOB|Gender|Amount|
+------+----------+------+------+
|   Ram|1991-04-01|     M|  3000|
|  Mike|2000-05-19|     M|  4000|
|Rohini|1978-09-05|     M|  4000|
| Maria|1967-12-01|     F|  4000|
| Jenis|1980-02-17|     F|  1200|
+------+----------+------+------+



# `Pyspark view and temp view`

### create temporary view

In [0]:
from pyspark.sql import SparkSession
# Create spark session
spark = SparkSession \
.builder \
.appName("SparkByExamples.com") \
.enableHiveSupport() \
.getOrCreate()
data = [("James","Smith","USA","CA"),
("Michael","Rose","USA","NY"),
("Robert","Williams","USA","CA"),
("Maria","Jones","USA","FL")
]
columns = ["firstname","lastname","country","state"]
# Create dataframe
sampleDF = spark.sparkContext.parallelize(data).toDF(columns)
sampleDF.createOrReplaceTempView("Person")
sampleDF.createOrReplaceTempView("mydata")
sampleDF.show()

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+



### SQL queries

In [0]:
spark.sql("select * from person").show()
spark.sql("select * from mydata").show()

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+

+---------+--------+-------+-----+
|firstname|lastname|country|state|
+---------+--------+-------+-----+
|    James|   Smith|    USA|   CA|
|  Michael|    Rose|    USA|   NY|
|   Robert|Williams|    USA|   CA|
|    Maria|   Jones|    USA|   FL|
+---------+--------+-------+-----+

