In [9]:

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("SparkByExamples.com").getOrCreate()

data = ["Project","Gutenberg’s","Alice’s","Adventures",
"in","Wonderland","Project","Gutenberg’s","Adventures",
"in","Wonderland","Project","Gutenberg’s"]

rdd = spark.sparkContext.parallelize(data, 3)
rdd

ParallelCollectionRDD[4] at readRDDFromFile at PythonRDD.scala:274

In [10]:
rdd.getNumPartitions()

3

In [11]:
rdd.saveAsTextFile("./rdd_test")

                                                                                

In [12]:
rdd2 = rdd.map(lambda x: (x, 1))
for e in rdd2.collect():
    print(e)

('Project', 1)
('Gutenberg’s', 1)
('Alice’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)
('Adventures', 1)
('in', 1)
('Wonderland', 1)
('Project', 1)
('Gutenberg’s', 1)


### map() with Dataframe

In [14]:
data = [('James','Smith','M',30),
  ('Anna','Rose','F',41),
  ('Robert','Williams','M',62), 
]

columns = ["firstname","lastname","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

df.show()

+---------+--------+------+------+
|firstname|lastname|gender|salary|
+---------+--------+------+------+
|    James|   Smith|     M|    30|
|     Anna|    Rose|     F|    41|
|   Robert|Williams|     M|    62|
+---------+--------+------+------+



In [15]:
rdd2 = df.rdd 
rdd2.saveAsTextFile("./df_conver_rdd")

### Dataframe sau khi convert to RDD co format nhu sau:
```
Row(firstname='James', lastname='Smith', gender='M', salary=30)
Row(firstname='Anna', lastname='Rose', gender='F', salary=41)
Row(firstname='Robert', lastname='Williams', gender='M', salary=62)
```

#### Use index to get the column values

In [19]:
rdd2 = df.rdd.map(lambda x:
            (x[0] + ", " + x[1], x[3] * 2)
        )

df2 = rdd2.toDF(["name", "new_salary"])
df2.show()

+----------------+----------+
|            name|new_salary|
+----------------+----------+
|    James, Smith|        60|
|      Anna, Rose|        82|
|Robert, Williams|       124|
+----------------+----------+



In [20]:
rdd2 = df.rdd.map(lambda x:
            (x[0] + ", " + x[1], x[2], x[3] * 2)
        )

df2 = rdd2.toDF(["name", "gender", "new_salary"])
df2.show()

+----------------+------+----------+
|            name|gender|new_salary|
+----------------+------+----------+
|    James, Smith|     M|        60|
|      Anna, Rose|     F|        82|
|Robert, Williams|     M|       124|
+----------------+------+----------+



#### Using column names to get column values

In [22]:
rdd2=df.rdd.map(lambda x: 
    (x["firstname"]+","+x["lastname"],x["gender"],x["salary"]*2)
    ) 
rdd2.toDF(["fullname", "gender", "salary"]).show()

+---------------+------+------+
|       fullname|gender|salary|
+---------------+------+------+
|    James,Smith|     M|    60|
|      Anna,Rose|     F|    82|
|Robert,Williams|     M|   124|
+---------------+------+------+



In [23]:
# Referring Column Names
rdd2=df.rdd.map(lambda x: 
    (x.firstname+","+x.lastname,x.gender,x.salary*2)
    ) 

#### You can also create a custom function to perform an operation. Below func1() function executes for every DataFrame row from the lambda function.

In [24]:
def func1(x):
    firstName=x.firstname
    lastName=x.lastname
    name=firstName+","+lastName
    gender=x.gender.lower()
    salary=x.salary*2
    return (name,gender,salary)

rdd2=df.rdd.map(lambda x: func1(x))
rdd2.toDF().show()

+---------------+---+---+
|             _1| _2| _3|
+---------------+---+---+
|    James,Smith|  m| 60|
|      Anna,Rose|  f| 82|
|Robert,Williams|  m|124|
+---------------+---+---+

