<a href="https://colab.research.google.com/github/abphilip-resources/PY-Spark-1/blob/master/1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Exploring Apache Spark : 1**

## **Dependencies**

#### Check existing versions

In [1]:
!java -version

openjdk version "11.0.16" 2022-07-19
OpenJDK Runtime Environment (build 11.0.16+8-post-Ubuntu-0ubuntu118.04)
OpenJDK 64-Bit Server VM (build 11.0.16+8-post-Ubuntu-0ubuntu118.04, mixed mode, sharing)


In [2]:
!python --version

Python 3.7.15


#### Install PySpark


If you have an old installation of spark which involved setting up environment variables in your .bash_profile, you will need to clear those as they will affect the running of this new pyspark installation.

In [3]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 48.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=2d3bc1e25aef7608306b66dee1cc968477669d76760af7ace3b8e63aa613d136
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


## **Introduction to RDDs**

#### Create Session

* Creating RDDs and DataFrames using SparkContext
* Interoperability between RDDs and DataFrames
* Multiple rows and multiple column specifications for DataFrames
* Creating DataFrames using SQLContext
* Selecting, editing and renaming columns in dataframes
* Interoperability between Pandas and Spark dataframes

In [4]:
import pyspark

In [5]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

In [6]:
sc = SparkContext()
spark = SparkSession(sc)

In [7]:
from pyspark.sql.types import Row
from datetime import datetime

#### Creating RDDs using sc.parallelize()

We effectively create one row of data storing information about a car in a dealership:
* the first element is the index 
* the second element is the name of the car
* thirdly, we store the quantity of cars in our inventory

In [8]:
simple_data = sc.parallelize([1, "Nissan Versa", 12])
simple_data

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

#### The count() function

This returns the number of elements in the RDD. Each element in the list is now an element of the RDD

In [9]:
simple_data.count()

3

#### The first() function

This returns the first element of the RDD

In [10]:
simple_data.first()

1

#### The take() function

This will return the first n elements of the RDD where n is the argument passed to the function

In [11]:
simple_data.take(2)

[1, 'Nissan Versa']

#### The collect() function

Returns a list containing all the elements in the RDD. We get a 1D list with the 3 elements

In [14]:
simple_data.collect()

[1, 'Nissan Versa', 12]

#### Convert the RDD to a DataFrame

Converting the current RDD to a DataFrame results in an error as it's 1-dimensional

* This RDD does not have "columns", it cannot be represented as a tabular data frame
* DataFrames are structured datasets

In [21]:
df = simple_data.map(lambda x: (x, )).toDF()
df.show()

+----+
|  _1|
+----+
|   1|
|null|
|  12|
+----+



## **Managing Records with RDDs**

#### RDDs with records using sc.parallelize()

We effectively create two rows of data by passing in two lists to sc.parallelize()

In [24]:
records = sc.parallelize([[1, "Nissan Versa", 12],
                          [2, "Ford Fiesta", 9]])
records.map(lambda x: (x, )).toDF().show()

+-------------+
|           _1|
+-------------+
|[1, null, 12]|
| [2, null, 9]|
+-------------+



#### The number of records corresponds to the number of rows


Even though we have twice as much data in this RDD as in the previous one we created, the count() returns only two elements

In [25]:
records.count()

2

#### The RDD is now effectively a list of lists

While the previous RDD contained a 1D list, this one contains two elements each of which is a 1D list - giving a 2D list.

In [26]:
records.collect()

[[1, 'Nissan Versa', 12], [2, 'Ford Fiesta', 9]]

#### The first element is a list

Effectively the first "row" in the RDD

In [27]:
records.first()

[1, 'Nissan Versa', 12]

In [28]:
records.take(2)

[[1, 'Nissan Versa', 12], [2, 'Ford Fiesta', 9]]

#### This RDD can be converted to a DataFrame

This RDD does have "columns", it can be represented as a tabular data frame

In [35]:
df = records.toDF()

#### The DataFrame has automatically detected the types of the data


In [32]:
df

DataFrame[_1: bigint, _2: string, _3: bigint]

In [33]:
type(df)

pyspark.sql.dataframe.DataFrame

#### The show() function for a DataFrame

By default it returns the first 20 rows of the DataFrame. We can pass in the number of rows to return as an argument

In [34]:
df.show()

+---+------------+---+
| _1|          _2| _3|
+---+------------+---+
|  1|Nissan Versa| 12|
|  2| Ford Fiesta|  9|
+---+------------+---+



#### Creating dataframes using sc.parallelize() and Row() functions

Row functions allow specifying column names for dataframes

In [42]:
data = sc.parallelize([Row(index = 1,
                           vehicle_name = "Nissan Versa",
                           vehicle_stock = 12)]
)
data

ParallelCollectionRDD[75] at readRDDFromFile at PythonRDD.scala:274

#### This RDD contains a single Row element

In [37]:
data.count()

1

In [39]:
data.collect()

[Row(index=1, vehicle_name='Nissan Versa', vehicle_stock=12)]

In [40]:
df = data.toDF()
df.show()

+-----+------------+-------------+
|index|vehicle_name|vehicle_stock|
+-----+------------+-------------+
|    1|Nissan Versa|           12|
+-----+------------+-------------+



#### Working with multiple rows

In [43]:
data = sc.parallelize([Row(index = 1,
                       vehicle_name = "Nissan Versa",
                       vehicle_stock = 12
                       ),
                       Row(index = 2,
                       vehicle_name = "Ford Fiesta",
                       vehicle_stock = 9
                       ),
                       Row(index = 3,
                       vehicle_name = "Hyundai Accent",
                       vehicle_stock = 8)]
)
data

ParallelCollectionRDD[76] at readRDDFromFile at PythonRDD.scala:274

In [44]:
df = data.toDF()
df.show()

+-----+--------------+-------------+
|index|  vehicle_name|vehicle_stock|
+-----+--------------+-------------+
|    1|  Nissan Versa|           12|
|    2|   Ford Fiesta|            9|
|    3|Hyundai Accent|            8|
+-----+--------------+-------------+



#### Multiple columns with complex data types

Here, we highlight many of the different data types which we can store in an RDD. We start with a string, float and integer

In [45]:
complex_data = sc.parallelize([Row(
                               col_string = "Alice",
                               col_double = 3.14,
                               col_integer = 20)
])

In [46]:
complex_data_df = complex_data.toDF()
complex_data_df.show()

+----------+----------+-----------+
|col_string|col_double|col_integer|
+----------+----------+-----------+
|     Alice|      3.14|         20|
+----------+----------+-----------+



In [47]:
complex_data_df

DataFrame[col_string: string, col_double: double, col_integer: bigint]

In [48]:
complex_data = sc.parallelize([Row(
                               col_string = "Alice",
                               col_double = 3.14,
                               col_integer = 20,
                               col_boolean = True,
                               col_list = [2,4,6])
])

In [49]:
complex_data_df = complex_data.toDF()
complex_data_df.show()

+----------+----------+-----------+-----------+---------+
|col_string|col_double|col_integer|col_boolean| col_list|
+----------+----------+-----------+-----------+---------+
|     Alice|      3.14|         20|       true|[2, 4, 6]|
+----------+----------+-----------+-----------+---------+



In [50]:
complex_data_df

DataFrame[col_string: string, col_double: double, col_integer: bigint, col_boolean: boolean, col_list: array<bigint>]

#### Multiple rows with complex data types

Our RDDs can also contain lists, dictionaries, Row types and timestamps

In [53]:
complex_data = sc.parallelize([Row(
                                col_list = [2,4,6],
                                col_dict = {"a1": 0},
                                col_row = Row(x=15, y=25, z=35),
                                col_time = datetime(2018, 7, 1, 14, 1, 2)
                            ),              
                            Row(
                                col_list = [2,4,6,8,10], 
                                col_dict = {"a1": 0,"a2": 1 }, 
                                col_row = Row(x=45, y=55, z=65),
                                col_time = datetime(2018, 7, 2, 14, 1, 3)
                            ),
                            Row(
                                col_list = [2,4,6,8,10,12,14], 
                                col_dict = {"a1": 0, "a2": 1, "a3": 2 }, 
                                col_row = Row(x=75, y=85, z=95),
                                col_time = datetime(2018, 7, 3, 14, 1, 4)
                            )
]) 

In [54]:
complex_data_df = complex_data.toDF()
complex_data_df.show()

+--------------------+--------------------+------------+-------------------+
|            col_list|            col_dict|     col_row|           col_time|
+--------------------+--------------------+------------+-------------------+
|           [2, 4, 6]|           {a1 -> 0}|{15, 25, 35}|2018-07-01 14:01:02|
|    [2, 4, 6, 8, 10]|  {a1 -> 0, a2 -> 1}|{45, 55, 65}|2018-07-02 14:01:03|
|[2, 4, 6, 8, 10, ...|{a1 -> 0, a2 -> 1...|{75, 85, 95}|2018-07-03 14:01:04|
+--------------------+--------------------+------------+-------------------+



In [55]:
complex_data_df

DataFrame[col_list: array<bigint>, col_dict: map<string,bigint>, col_row: struct<x:bigint,y:bigint,z:bigint>, col_time: timestamp]

## **SQLContext**

#### Creating DataFrames using SQLContext

SQLContext can create dataframes directly from raw data

In [56]:
sqlContext = SQLContext(sc)



In [57]:
sqlContext

<pyspark.sql.context.SQLContext at 0x7f25212bcc50>

#### The SQLContext range() function returns a DataFrame

This creates a column of 4 integers from 0-3. The name of this column is set to "id"

In [61]:
df = sqlContext.range(4)
df

DataFrame[id: bigint]

In [62]:
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
+---+



In [63]:
df.count()

4

#### Rows specified in tuples

SQLContext DataFrames are specified in the form a list of tuples

In [64]:
data = [('Nissan Versa', 12),
        ('Ford Fiesta', 9),
        ('Hyundai Accent', 8)]

#### The DataFrame is created without column headers

In [65]:
sqlContext.createDataFrame(data).show()

+--------------+---+
|            _1| _2|
+--------------+---+
|  Nissan Versa| 12|
|   Ford Fiesta|  9|
|Hyundai Accent|  8|
+--------------+---+



#### Set the column names when creating the DataFrame

In [66]:
sqlContext.createDataFrame(data, ['vehicle_name', 'vehicle_stock']).show()

+--------------+-------------+
|  vehicle_name|vehicle_stock|
+--------------+-------------+
|  Nissan Versa|           12|
|   Ford Fiesta|            9|
|Hyundai Accent|            8|
+--------------+-------------+



#### Adding more data to the DataFrame

In [68]:
complex_data = [
  (1.0, 12, "Nissan Versa", True, [2,4,6], {"a1": 0}, Row(x=1, y=2, z=3), datetime(2018, 7, 1, 14, 1, 2)),
  (2.0, 13, "Ford Fiesta", True, [2,4,6,8,10], {"a1": 0,"a2": 1 }, Row(x=1, y=2, z=3), datetime(2018, 7, 2, 14, 1, 3)),
  (3.0, 15, "Hyundai Accent", True, [2,4,6,8,10,12,14], {"a1": 0, "a2": 1, "a3": 2 }, Row(x=1, y=2, z=3), datetime(2018, 7, 3, 14, 1, 4))
] 

In [69]:
sqlContext.createDataFrame(complex_data).show()

+---+---+--------------+----+--------------------+--------------------+---------+-------------------+
| _1| _2|            _3|  _4|                  _5|                  _6|       _7|                 _8|
+---+---+--------------+----+--------------------+--------------------+---------+-------------------+
|1.0| 12|  Nissan Versa|true|           [2, 4, 6]|           {a1 -> 0}|{1, 2, 3}|2018-07-01 14:01:02|
|2.0| 13|   Ford Fiesta|true|    [2, 4, 6, 8, 10]|  {a1 -> 0, a2 -> 1}|{1, 2, 3}|2018-07-02 14:01:03|
|3.0| 15|Hyundai Accent|true|[2, 4, 6, 8, 10, ...|{a1 -> 0, a2 -> 1...|{1, 2, 3}|2018-07-03 14:01:04|
+---+---+--------------+----+--------------------+--------------------+---------+-------------------+



In [70]:
complex_data_df = sqlContext.createDataFrame(complex_data, 
  ['col_int', 'col_double', 'col_string', 'col_bool', 'col_array', 'col_dictionary', 'col_row', 'col_date_time']
)
complex_data_df.show()

+-------+----------+--------------+--------+--------------------+--------------------+---------+-------------------+
|col_int|col_double|    col_string|col_bool|           col_array|      col_dictionary|  col_row|      col_date_time|
+-------+----------+--------------+--------+--------------------+--------------------+---------+-------------------+
|    1.0|        12|  Nissan Versa|    true|           [2, 4, 6]|           {a1 -> 0}|{1, 2, 3}|2018-07-01 14:01:02|
|    2.0|        13|   Ford Fiesta|    true|    [2, 4, 6, 8, 10]|  {a1 -> 0, a2 -> 1}|{1, 2, 3}|2018-07-02 14:01:03|
|    3.0|        15|Hyundai Accent|    true|[2, 4, 6, 8, 10, ...|{a1 -> 0, a2 -> 1...|{1, 2, 3}|2018-07-03 14:01:04|
+-------+----------+--------------+--------+--------------------+--------------------+---------+-------------------+



#### Creating dataframes using SQL Context and the Row function

Row functions can be used without specifying column names

In [71]:
data = sc.parallelize([Row(1, "Nissan Versa", 12), Row(2, "Ford Fiesta", 9), Row(3, "Hyundai Accent", 8)])

In [72]:
data.toDF().show()

+---+--------------+---+
| _1|            _2| _3|
+---+--------------+---+
|  1|  Nissan Versa| 12|
|  2|   Ford Fiesta|  9|
|  3|Hyundai Accent|  8|
+---+--------------+---+



#### Create a new Row with the column names

In [73]:
column_names = Row('index', 'vehicle_name', 'vehicle_stock')

## **Data Extraction**

#### The RDD's map() function

Return a new RDD by applying a function to each element of this RDD. Here, we call column_names for every row of the RDD with the row elements passed as positional arguments. 

In [74]:
cars = data.map(lambda r: column_names(*r))
cars

PythonRDD[176] at RDD at PythonRDD.scala:53

#### This has the effect of setting column headers

In [75]:
cars.collect()

[Row(index=1, vehicle_name='Nissan Versa', vehicle_stock=12),
 Row(index=2, vehicle_name='Ford Fiesta', vehicle_stock=9),
 Row(index=3, vehicle_name='Hyundai Accent', vehicle_stock=8)]

In [76]:
cars_df = sqlContext.createDataFrame(cars)
cars_df

DataFrame[index: bigint, vehicle_name: string, vehicle_stock: bigint]

In [77]:
cars_df.show()

+-----+--------------+-------------+
|index|  vehicle_name|vehicle_stock|
+-----+--------------+-------------+
|    1|  Nissan Versa|           12|
|    2|   Ford Fiesta|            9|
|    3|Hyundai Accent|            8|
+-----+--------------+-------------+



#### Extracting specific rows from dataframes

In [78]:
complex_data_df.first()

Row(col_int=1.0, col_double=12, col_string='Nissan Versa', col_bool=True, col_array=[2, 4, 6], col_dictionary={'a1': 0}, col_row=Row(x=1, y=2, z=3), col_date_time=datetime.datetime(2018, 7, 1, 14, 1, 2))

In [79]:
complex_data_df.take(2)

[Row(col_int=1.0, col_double=12, col_string='Nissan Versa', col_bool=True, col_array=[2, 4, 6], col_dictionary={'a1': 0}, col_row=Row(x=1, y=2, z=3), col_date_time=datetime.datetime(2018, 7, 1, 14, 1, 2)),
 Row(col_int=2.0, col_double=13, col_string='Ford Fiesta', col_bool=True, col_array=[2, 4, 6, 8, 10], col_dictionary={'a1': 0, 'a2': 1}, col_row=Row(x=1, y=2, z=3), col_date_time=datetime.datetime(2018, 7, 2, 14, 1, 3))]

#### Extracting specific cells from dataframes

In [82]:
complex_data_df.collect()[0][2]

'Nissan Versa'

In [83]:
complex_data_df.collect()[0][4]

[2, 4, 6]

In [None]:
complex_data_df.show()

+-------+----------+--------------+--------+--------------------+--------------------+---------+-------------------+
|col_int|col_double|    col_string|col_bool|           col_array|      col_dictionary|  col_row|      col_date_time|
+-------+----------+--------------+--------+--------------------+--------------------+---------+-------------------+
|    1.0|        12|  Nissan Versa|    true|           [2, 4, 6]|           [a1 -> 0]|[1, 2, 3]|2018-07-01 14:01:02|
|    2.0|        13|   Ford Fiesta|    true|    [2, 4, 6, 8, 10]|  [a1 -> 0, a2 -> 1]|[1, 2, 3]|2018-07-02 14:01:03|
|    3.0|        15|Hyundai Accent|    true|[2, 4, 6, 8, 10, ...|[a1 -> 0, a2 -> 1...|[1, 2, 3]|2018-07-03 14:01:04|
+-------+----------+--------------+--------+--------------------+--------------------+---------+-------------------+



## **Data Manipulation**

#### One way for selection to use the rdd.map() operation

In [84]:
complex_data_df.rdd.map(lambda x: (x.col_string, x.col_dictionary)).collect()

[('Nissan Versa', {'a1': 0}),
 ('Ford Fiesta', {'a1': 0, 'a2': 1}),
 ('Hyundai Accent', {'a1': 0, 'a2': 1, 'a3': 2})]

#### The DataFrame select() function helps retrieve data

In [85]:
complex_data_df.select(
                        'col_string',
                        'col_array',
                        'col_date_time'
                      ).show()

+--------------+--------------------+-------------------+
|    col_string|           col_array|      col_date_time|
+--------------+--------------------+-------------------+
|  Nissan Versa|           [2, 4, 6]|2018-07-01 14:01:02|
|   Ford Fiesta|    [2, 4, 6, 8, 10]|2018-07-02 14:01:03|
|Hyundai Accent|[2, 4, 6, 8, 10, ...|2018-07-03 14:01:04|
+--------------+--------------------+-------------------+



#### Editing column data
The rdd.map() function can be used to perform an operation on every element in a column

In [None]:
complex_data_df.rdd\
               .map(lambda x: ("2018 " + x.col_string))\
               .collect()

['2018 Nissan Versa', '2018 Ford Fiesta', '2018 Hyundai Accent']

#### Adding a column
Add a new column which sums up the contents of the int and double columns. We do this using the withColumn() function

In [None]:
complex_data_df.select(
                       'col_int',
                       'col_double'
                      )\
               .withColumn(
                       "col_sum",
                        complex_data_df.col_int + complex_data_df.col_double
                      )\
               .show()

+-------+----------+-------+
|col_int|col_double|col_sum|
+-------+----------+-------+
|    1.0|        12|   13.0|
|    2.0|        13|   15.0|
|    3.0|        15|   18.0|
+-------+----------+-------+



In [None]:
complex_data_df.select('col_bool')\
               .withColumn(
                           "col_opposite",
                           complex_data_df.col_bool == False )\
               .show()

+--------+------------+
|col_bool|col_opposite|
+--------+------------+
|    true|       false|
|    true|       false|
|    true|       false|
+--------+------------+



#### Editing a column name
Make use of the withColumnRenamed() function

In [None]:
complex_data_df.withColumnRenamed("col_dictionary","col_map").show()

+-------+----------+--------------+--------+--------------------+--------------------+---------+-------------------+
|col_int|col_double|    col_string|col_bool|           col_array|             col_map|  col_row|      col_date_time|
+-------+----------+--------------+--------+--------------------+--------------------+---------+-------------------+
|    1.0|        12|  Nissan Versa|    true|           [2, 4, 6]|           [a1 -> 0]|[1, 2, 3]|2018-07-01 14:01:02|
|    2.0|        13|   Ford Fiesta|    true|    [2, 4, 6, 8, 10]|  [a1 -> 0, a2 -> 1]|[1, 2, 3]|2018-07-02 14:01:03|
|    3.0|        15|Hyundai Accent|    true|[2, 4, 6, 8, 10, ...|[a1 -> 0, a2 -> 1...|[1, 2, 3]|2018-07-03 14:01:04|
+-------+----------+--------------+--------+--------------------+--------------------+---------+-------------------+



In [None]:
complex_data_df.select(complex_data_df.col_string.alias("vehicle_name")).show()

+--------------+
|  vehicle_name|
+--------------+
|  Nissan Versa|
|   Ford Fiesta|
|Hyundai Accent|
+--------------+



## **Pandas**

### Interoperablity between Pandas dataframe and Spark dataframe

In [None]:
import pandas

In [None]:
df_pandas = complex_data_df.toPandas()
df_pandas

Unnamed: 0,col_int,col_double,col_string,col_bool,col_array,col_dictionary,col_row,col_date_time
0,1.0,12,Nissan Versa,True,"[2, 4, 6]",{'a1': 0},"(1, 2, 3)",2018-07-01 14:01:02
1,2.0,13,Ford Fiesta,True,"[2, 4, 6, 8, 10]","{'a1': 0, 'a2': 1}","(1, 2, 3)",2018-07-02 14:01:03
2,3.0,15,Hyundai Accent,True,"[2, 4, 6, 8, 10, 12, 14]","{'a1': 0, 'a2': 1, 'a3': 2}","(1, 2, 3)",2018-07-03 14:01:04


#### Create a Spark DataFrame from a Pandas DataFrame

In [None]:
df_spark = sqlContext.createDataFrame(df_pandas).show()  
df_spark

+-------+----------+--------------+--------+--------------------+--------------------+---------+-------------------+
|col_int|col_double|    col_string|col_bool|           col_array|      col_dictionary|  col_row|      col_date_time|
+-------+----------+--------------+--------+--------------------+--------------------+---------+-------------------+
|    1.0|        12|  Nissan Versa|    true|           [2, 4, 6]|           [a1 -> 0]|[1, 2, 3]|2018-07-01 14:01:02|
|    2.0|        13|   Ford Fiesta|    true|    [2, 4, 6, 8, 10]|  [a1 -> 0, a2 -> 1]|[1, 2, 3]|2018-07-02 14:01:03|
|    3.0|        15|Hyundai Accent|    true|[2, 4, 6, 8, 10, ...|[a1 -> 0, a2 -> 1...|[1, 2, 3]|2018-07-03 14:01:04|
+-------+----------+--------------+--------+--------------------+--------------------+---------+-------------------+

