<a href="https://colab.research.google.com/github/fralfaro/tutorial_pyspark/blob/main/notebooks/learning-apache-spark/notebooks/01-data-strcture/1.3-conversion-between-rdd-and-dataframe.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 52.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=3814503c8eefd125ae157e16d6b05a366dac415e74f36bfb91be7ae5dbe4dc7a
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [2]:
# files
!wget https://raw.githubusercontent.com/fralfaro/tutorial_pyspark/main/notebooks/learning-apache-spark/data/mtcars.csv

--2022-04-19 20:09:06--  https://raw.githubusercontent.com/fralfaro/tutorial_pyspark/main/notebooks/learning-apache-spark/data/mtcars.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1694 (1.7K) [text/plain]
Saving to: ‘mtcars.csv’


2022-04-19 20:09:06 (24.3 MB/s) - ‘mtcars.csv’ saved [1694/1694]



In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

## Example DataFrame

In [4]:
mtcars = spark.read.csv(path='mtcars.csv',
                        sep=',',
                        encoding='UTF-8',
                        comment=None,
                        header=True, 
                        inferSchema=True)

## DataFrame to RDD
A **DataFrame** can be easily converted to an **RDD** by calling the `pyspark.sql.DataFrame.rdd()` function. Each element in the returned RDD is an **pyspark.sql.Row** object. An Row is a list of key-value pairs.

In [5]:
mtcars.rdd.take(2)

[Row(_c0='Mazda RX4', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.62, qsec=16.46, vs=0, am=1, gear=4, carb=4),
 Row(_c0='Mazda RX4 Wag', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.875, qsec=17.02, vs=0, am=1, gear=4, carb=4)]

With an RDD object, we can apply a set of mapping functions, such as **map**, **mapValues**, **flatMap**, **flatMapValues** and a lot of other methods that come from RDD.

In [6]:
mtcars_map = mtcars.rdd.map(lambda x: (x['_c0'], x['mpg']))
mtcars_map.take(5)

[('Mazda RX4', 21.0),
 ('Mazda RX4 Wag', 21.0),
 ('Datsun 710', 22.8),
 ('Hornet 4 Drive', 21.4),
 ('Hornet Sportabout', 18.7)]

In [7]:
mtcars_mapvalues = mtcars_map.mapValues(lambda x: [x, x * 10])
mtcars_mapvalues.take(5)

[('Mazda RX4', [21.0, 210.0]),
 ('Mazda RX4 Wag', [21.0, 210.0]),
 ('Datsun 710', [22.8, 228.0]),
 ('Hornet 4 Drive', [21.4, 214.0]),
 ('Hornet Sportabout', [18.7, 187.0])]

## RDD to DataFrame

To convert an RDD to a DataFrame, we can use the `SparkSession.createDataFrame()` function. Every element in the RDD **has be to an Row object**.

Create an RDD

In [9]:
rdd_raw = sc.textFile('mtcars.csv')
rdd_raw.take(5)

[',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',
 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',
 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1',
 'Hornet 4 Drive,21.4,6,258,110,3.08,3.215,19.44,1,0,3,1']

#### Save the first row to a variable

In [10]:
header = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] == 'mpg').collect()[0]
header[0] = 'model'
header

['model',
 'mpg',
 'cyl',
 'disp',
 'hp',
 'drat',
 'wt',
 'qsec',
 'vs',
 'am',
 'gear',
 'carb']

#### Save the rest to a new RDD

In [11]:
rdd = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] != 'mpg')
rdd.take(2)

[['Mazda RX4',
  '21',
  '6',
  '160',
  '110',
  '3.9',
  '2.62',
  '16.46',
  '0',
  '1',
  '4',
  '4'],
 ['Mazda RX4 Wag',
  '21',
  '6',
  '160',
  '110',
  '3.9',
  '2.875',
  '17.02',
  '0',
  '1',
  '4',
  '4']]

#### Convert RDD elements to RDD Row objects

First we define a function which takes a list of column names and a list of values and create a Row of key-value pairs. **Since keys in an Row object are variable names, we can’t simply pass a dictionary to the Row() function**. We can think of a dictionary as an argument list and use the `**` to unpack the argument list.

See an example.

In [12]:
from pyspark.sql import Row
my_dict = dict(zip(['a', 'b', 'c'], range(1, 4)))
Row(**my_dict)

Row(a=1, b=2, c=3)

#### Let’s define the function.

In [13]:
def list_to_row(keys, values):
    row_dict = dict(zip(keys, values))
    return Row(**row_dict)

#### Convert elements to RDD objects

In [14]:
rdd_rows = rdd.map(lambda x: list_to_row(header, x))
rdd_rows.take(3)

[Row(model='Mazda RX4', mpg='21', cyl='6', disp='160', hp='110', drat='3.9', wt='2.62', qsec='16.46', vs='0', am='1', gear='4', carb='4'),
 Row(model='Mazda RX4 Wag', mpg='21', cyl='6', disp='160', hp='110', drat='3.9', wt='2.875', qsec='17.02', vs='0', am='1', gear='4', carb='4'),
 Row(model='Datsun 710', mpg='22.8', cyl='4', disp='108', hp='93', drat='3.85', wt='2.32', qsec='18.61', vs='1', am='1', gear='4', carb='1')]

Now we can convert the RDD to a DataFrame.

In [15]:
df = spark.createDataFrame(rdd_rows)
df.show(5)

+-----------------+----+---+----+---+----+-----+-----+---+---+----+----+
|            model| mpg|cyl|disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|  21|  6| 160|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|  21|  6| 160|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4| 108| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6| 258|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8| 360|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows

