#  Iteration in the cloud

RDDs:
https://spark.apache.org/docs/latest/rdd-programming-guide.html

Iterations:
https://sparkbyexamples.com/pyspark/pyspark-loop-iterate-through-rows-in-dataframe/

https://sparkbyexamples.com/pyspark/pyspark-map-transformation/

https://sparkbyexamples.com/spark/spark-map-vs-mappartitions-transformation/

PySpark map (map()) is an RDD transformation that is used to apply the transformation function (lambda) on every element of RDD/DataFrame and returns a new RDD. 

map() and foreach() are the most used functions for RDD iterating in Pyspark.

Mostly for simple computations, instead of iterating through using map() and foreach(), you should use either DataFrame select() or DataFrame withColumn() in conjunction with PySpark SQL functions.

RDD map() transformation is used to apply any complex operations like adding a column, updating a column, transforming the data e.t.c, the output of map transformations would always have the same number of records as input.

    Note1: DataFrame doesn’t have map() transformation to use with DataFrame hence you need to DataFrame to RDD first.
    Note2: If you have a heavy initialization use PySpark mapPartitions() transformation instead of map(), as with mapPartitions() heavy initialization executes only once for each partition instead of every record.

### Map() examples.

In [None]:
# Refering columns by index.
rdd=df.rdd.map(lambda x: 
    (x[0]+","+x[1],x[2],x[3]*2)
    )  
df2=rdd.toDF(["name","gender","new_salary"])
df2.show()

The above example iterates through every row in a DataFrame by applying transformations to the data, since I need a DataFrame back, I have converted the result of RDD to DataFrame with new column names. Note that here I have used index to get the column values, alternatively, you can also refer to the DataFrame column names while iterating.

In [None]:
# Referring Column Names (another example)
rdd2=df.rdd.map(lambda x: 
    (x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2)
    ) 

### Foreach examples:

In [3]:
#To get row by index. 
df.rdd.foreach(lambda x: 
              print(x[0], x[1], x[2], x[3])) 

#To get columns.  
df.rdd.foreach(lambda x: print((x.column1, x.column2)))

#Functions:
def f(row):
    print(row.name)

df.foreach(f)

NameError: name 'df' is not defined

#  Create functions to iterate through rows

https://www.geeksforgeeks.org/how-to-loop-through-each-row-of-dataframe-in-pyspark/

PySpark provides map(), mapPartitions() to loop/iterate through rows in RDD/DataFrame to perform the complex transformations, and these two returns the same number of records as in the original DataFrame but the number of columns could be different (after add/update).

PySpark also provides foreach() & foreachPartitions() actions to loop/iterate through each Row in a DataFrame but these two returns nothing, In this article, I will explain how to use these methods to get DataFrame column values and process.


### Method 1: Using collect()

We can use collect() action operation for retrieving all the elements of the Dataset to the driver function then loop through it using for loop.

In [2]:
# retrieving all the elements
# of the dataframe using collect()
# Storing in the variable
data_collect = df.collect()
 
# looping thorough each row of the dataframe
for row in data_collect:
    # while looping through each
    # row printing the data of Id, Name and City
    print(row["Id"],row["Name"],"  ",row["City"])

NameError: name 'df' is not defined

### Method 2: Using toLocalIterator()

We can use toLocalIterator(). This returns an iterator that contains all the rows in the DataFrame. It is similar to collect(). The only difference is that collect() returns the list whereas toLocalIterator() returns an iterator.

Note: This function is similar to collect() function as used in the above example the only difference is that this function returns the iterator whereas the collect() function returns the list.

In [None]:
data_itr = df.rdd.toLocalIterator()
 
# looping thorough each row of the dataframe
for row in data_itr:
   
    # while looping through each row printing
    # the data of Id, Job Profile and City
    print(row["Id"]," ",row["Job Profile"],"  ",row["City"])

### Method 3: Using iterrows() (ONLY PANDAS)

The iterrows() function for iterating through each row of the Dataframe, is the function of pandas library, so first, we have to convert the PySpark Dataframe into Pandas Dataframe using toPandas() function. Then loop through it using for loop.

In [None]:
pd_df = df.toPandas()
 
# looping through each row using iterrows()
# used to iterate over dataframe rows as index,
# series pair
for index, row in pd_df.iterrows():
   
    # while looping through each row
    # printing the Id, Name and Salary
    # by passing index instead of Name
    # of the column
    print(row[0],row[1]," ",row[3])

### Method 4: Using map()

map() function with lambda function for iterating through each row of Dataframe. For looping through each row using map() first we have to convert the PySpark dataframe into RDD because map() is performed on RDD’s only, so first convert into RDD it then use map() in which, lambda function for iterating through each row and stores the new RDD in some variable then convert back that new RDD into Dataframe using toDF() by passing schema into it.

In [None]:
# importing necessary libraries
import pyspark
from pyspark.sql import SparkSession
 
# function to create new SparkSession
def create_session():
  spk = SparkSession.builder \
      .master("local") \
      .appName("employee_profile.com") \
      .getOrCreate()
  return spk
 
def create_df(spark,data,schema):
  df1 = spark.createDataFrame(data,schema)
  return df1
 
if __name__ == "__main__":
 
  # calling function to create SparkSession
  spark = create_session()
     
  input_data = [(1,"Shivansh","Data Scientist",2000000,"Noida"),
          (2,"Rishabh","Software Developer",1500000,"Banglore"),
          (3,"Swati","Data Analyst",1000000,"Hyderabad"),
          (4,"Amar","Data Analyst",950000,"Noida"),
          (5,"Arpit","Android Developer",1600000,"Pune"),
          (6,"Ranjeet","Python Developer",1800000,"Gurugram"),
          (7,"Priyanka","Full Stack Developer",2200000,"Banglore")]
 
  schema = ["Id","Name","Job Profile","Salary","City"]
 
  # calling function to create dataframe
  df = create_df(spark,input_data,schema)
 
  # map() is only be performed on rdd
  # so converting the dataframe into rdd using df.rdd
  rdd = df.rdd.map(lambda loop: (
      loop["Id"],loop["Name"],loop["Salary"],loop["City"])
  )
 
  # after looping the getting the data from each row
  # converting back from RDD to Dataframe
  df2 = rdd.toDF(["Id","Name","Salary","City"])
 
  # showing the new Dataframe
  df2.show()

### Method 6: Using select()

The select() function is used to select the number of columns. After selecting the columns, we are using the collect() function that returns the list of rows that contains only the data of selected columns.

In [None]:
# importing necessary libraries
import pyspark
from pyspark.sql import SparkSession
 
# function to create new SparkSession
def create_session():
    spk = SparkSession.builder \
        .master("local") \
        .appName("employee_profile.com") \
        .getOrCreate()
    return spk
 
 
def create_df(spark, data, schema):
    df1 = spark.createDataFrame(data, schema)
    return df1
 
 
if __name__ == "__main__":
 
    # calling function to create SparkSession
    spark = create_session()
 
    input_data = [(1, "Shivansh", "Data Scientist", 2000000, "Noida"),
                  (2, "Rishabh", "Software Developer", 1500000, "Banglore"),
                  (3, "Swati", "Data Analyst", 1000000, "Hyderabad"),
                  (4, "Amar", "Data Analyst", 950000, "Noida"),
                  (5, "Arpit", "Android Developer", 1600000, "Pune"),
                  (6, "Ranjeet", "Python Developer", 1800000, "Gurugram"),
                  (7, "Priyanka", "Full Stack Developer", 2200000, "Banglore")]
 
    schema = ["Id", "Name", "Job Profile", "Salary", "City"]
 
    # calling function to create dataframe
    df = create_df(spark, input_data, schema)
 
    # getting each row of dataframe containing
    # only selected columns Selected columns are
    # 'Name' and 'Salary' getting the list of rows
    # with selected column data using collect()
    rows_looped = df.select("Name", "Salary").collect()
 
    # printing the data of each row
    for rows in rows_looped:
       
        # here index 0 and 1 refers to the data
        # of 'Name' column and 'Salary' column
        print(rows[0], rows[1])