<a href="https://colab.research.google.com/github/NushratRia/Big-Data/blob/main/Lab_4_PySpark_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Resilient Distributed Datasets**, often known as RDDs, are the components used in a cluster's parallel processing that run and operate across numerous nodes. Since RDDs are immutable elements, you cannot alter them after creation

In [2]:
!pip install PySpark

Collecting PySpark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: PySpark
  Building wheel for PySpark (setup.py) ... [?25l[?25hdone
  Created wheel for PySpark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=ba0832d0726cbcc21adb6d25ddf55ac535e1c82c8964696d908abef2e3f9b1ee
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built PySpark
Installing collected packages: PySpark
Successfully installed PySpark-3.4.1


In [3]:
import pyspark

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [5]:
rdds = spark.sparkContext.parallelize([("Dhaka", 1), ("CTG", 2), ("SYL", 3)])
#parallelize (method to create RDDS)

In [6]:
rdds

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:287

In [7]:
rdds.collect()

[('Dhaka', 1), ('CTG', 2), ('SYL', 3)]

In [8]:
rdds.count() #total number of Tuples in rdds

3

Creating from RDD

In [9]:
from datetime import date, datetime

In [10]:
rdds1 = spark.sparkContext.parallelize([(1, 1.0, "string1", date(2023,1,1), datetime(2023,1,12,0)),
                                        (2, 2.0, "string2", date(2023,2,1), datetime(2023,1,2,12,0)),
                                        (3, 3.0, "string3", date(2023,3,1), datetime(2023,1,3,12,0))])
#parallelize (method to create RDDS)

In [11]:
rdds1

ParallelCollectionRDD[2] at readRDDFromFile at PythonRDD.scala:287

In [12]:
rdds1.collect()

[(1,
  1.0,
  'string1',
  datetime.date(2023, 1, 1),
  datetime.datetime(2023, 1, 12, 0, 0)),
 (2,
  2.0,
  'string2',
  datetime.date(2023, 2, 1),
  datetime.datetime(2023, 1, 2, 12, 0)),
 (3,
  3.0,
  'string3',
  datetime.date(2023, 3, 1),
  datetime.datetime(2023, 1, 3, 12, 0))]

In [14]:
#rdd into dataframe
df = spark.createDataFrame(rdds1, schema=["num", "float", "string", "date", "datetime"])
#schema to define datatypes

In [15]:
df

DataFrame[num: bigint, float: double, string: string, date: date, datetime: timestamp]

In [16]:
df.show()

+---+-----+-------+----------+-------------------+
|num|float| string|      date|           datetime|
+---+-----+-------+----------+-------------------+
|  1|  1.0|string1|2023-01-01|2023-01-12 00:00:00|
|  2|  2.0|string2|2023-02-01|2023-01-02 12:00:00|
|  3|  3.0|string3|2023-03-01|2023-01-03 12:00:00|
+---+-----+-------+----------+-------------------+



In [17]:
df.show(1)

+---+-----+-------+----------+-------------------+
|num|float| string|      date|           datetime|
+---+-----+-------+----------+-------------------+
|  1|  1.0|string1|2023-01-01|2023-01-12 00:00:00|
+---+-----+-------+----------+-------------------+
only showing top 1 row



In [18]:
df.printSchema()

root
 |-- num: long (nullable = true)
 |-- float: double (nullable = true)
 |-- string: string (nullable = true)
 |-- date: date (nullable = true)
 |-- datetime: timestamp (nullable = true)



RDD example 1

In [26]:
users_list_0 = ["1|John|London", "2|Martin|New York", "3|Sam|Sydney", "4|Alan|Mexico City", "5|Jacob|Florida"]
print(users_list_0)

users_list_0_rdd = spark.sparkContext.parallelize(users_list_0)

users_list_0_rdd = users_list_0_rdd.map(lambda ele: (int(ele.split('|')[0]), ele.split('|')[1], ele.split('|')[2]))
users_df_0 = spark.createDataFrame(users_list_0_rdd)
users_df_0.show(10)
users_df_0.printSchema()

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

users_schema = StructType([ StructField("id", IntegerType(), True),
                           StructField("name", StringType(), True),
                        StructField("city", StringType(), True)])

users_df_01 = spark.createDataFrame(users_list_0_rdd, schema=users_schema)
users_df_01.show(10)
users_df_01.printSchema()

['1|John|London', '2|Martin|New York', '3|Sam|Sydney', '4|Alan|Mexico City', '5|Jacob|Florida']
+---+------+-----------+
| _1|    _2|         _3|
+---+------+-----------+
|  1|  John|     London|
|  2|Martin|   New York|
|  3|   Sam|     Sydney|
|  4|  Alan|Mexico City|
|  5| Jacob|    Florida|
+---+------+-----------+

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)

+---+------+-----------+
| id|  name|       city|
+---+------+-----------+
|  1|  John|     London|
|  2|Martin|   New York|
|  3|   Sam|     Sydney|
|  4|  Alan|Mexico City|
|  5| Jacob|    Florida|
+---+------+-----------+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)



RDD example 2

In [27]:
users_list_1 = [(1, "John", "London"), (2, "Martin", "New York"), (3, "Sam", "Sydney"), (4, "Alan", "Mexico City"), (5, "Jacob", "Florida")]
print(users_list_1)

users_df_1 = spark.createDataFrame(users_list_1)
users_df_1.show(10)
users_df_1.printSchema()

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

users_schema = StructType([
                          StructField("id", IntegerType(), True),
                          StructField("name", StringType(), True),
                          StructField("city", StringType(), True)])

users_df_11 = spark.createDataFrame(users_list_1, schema=users_schema)
users_df_11.show(10)
users_df_11.printSchema()

[(1, 'John', 'London'), (2, 'Martin', 'New York'), (3, 'Sam', 'Sydney'), (4, 'Alan', 'Mexico City'), (5, 'Jacob', 'Florida')]
+---+------+-----------+
| _1|    _2|         _3|
+---+------+-----------+
|  1|  John|     London|
|  2|Martin|   New York|
|  3|   Sam|     Sydney|
|  4|  Alan|Mexico City|
|  5| Jacob|    Florida|
+---+------+-----------+

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)

+---+------+-----------+
| id|  name|       city|
+---+------+-----------+
|  1|  John|     London|
|  2|Martin|   New York|
|  3|   Sam|     Sydney|
|  4|  Alan|Mexico City|
|  5| Jacob|    Florida|
+---+------+-----------+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)



RDD Example 3

In [28]:
#Option 3
from pyspark.sql import Row

users_list_2 = [Row(1, "John", "London"), Row(2, "Martin", "New York"), Row(3, "Sam", "Sydney"), Row(4, "Alan", "Mexico City"), Row(5, "Jacob", "Florida")]
print(users_list_2)

users_df_2 = spark.createDataFrame(users_list_2)
users_df_2.show(10)
users_df_2.printSchema()

users_df_21 = spark.createDataFrame(users_list_2, schema=users_schema)
users_df_21.show(10)
users_df_21.printSchema()

[<Row(1, 'John', 'London')>, <Row(2, 'Martin', 'New York')>, <Row(3, 'Sam', 'Sydney')>, <Row(4, 'Alan', 'Mexico City')>, <Row(5, 'Jacob', 'Florida')>]
+---+------+-----------+
| _1|    _2|         _3|
+---+------+-----------+
|  1|  John|     London|
|  2|Martin|   New York|
|  3|   Sam|     Sydney|
|  4|  Alan|Mexico City|
|  5| Jacob|    Florida|
+---+------+-----------+

root
 |-- _1: long (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)

+---+------+-----------+
| id|  name|       city|
+---+------+-----------+
|  1|  John|     London|
|  2|Martin|   New York|
|  3|   Sam|     Sydney|
|  4|  Alan|Mexico City|
|  5| Jacob|    Florida|
+---+------+-----------+

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)



In [29]:
#reading json and csv files
#df_json = spark.read.json("simple_zipcodes.json")
#df_csv = spark.read.csv("simple_zipcodes.csv")