<a href="https://www.kaggle.com/code/prashantsparhad/22-pyspark-create-empty-rdd-in-pyspark?scriptVersionId=133653031" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l- \ | done
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l- \ | / - \ | / - \ | / - \ | done
[?25h  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317146 sha256=d80868a0bc1b9ba1569a93c6d5bb088a4dfd94a14b282a719afb58eab7474302
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
[0m

# 1.Create Empty RDD in PySpark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark=SparkSession.builder.appName("Sparkbyexample").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/15 07:46:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
#Creates Empty RDD
emptyRDD=spark.sparkContext.emptyRDD()

In [5]:
print(emptyRDD)

EmptyRDD[0] at emptyRDD at NativeMethodAccessorImpl.java:0


# Alternatively you can also get empty RDD by using spark.sparkContext.parallelize([]).

In [6]:
rdd2=spark.sparkContext.parallelize([])

In [7]:
print(rdd2)

ParallelCollectionRDD[1] at readRDDFromFile at PythonRDD.scala:287


# Note: If you try to perform operations on empty RDD you going to get ValueError("RDD is empty")

# 2. Create Empty DataFrame with Schema (StructType)

In [8]:
from pyspark.sql.types import StructType, StructField, StringType

In [9]:
schema=StructType(
[
    StructField("Firstname",StringType(),True),
    StructField("middlename",StringType(),True),
    StructField("lastname",StringType(),True)
])

# Now use the empty RDD created above and pass it to createDataFrame() of SparkSession along with the schema for columnnames & data types.

In [10]:
df=spark.createDataFrame(emptyRDD,schema)

In [11]:
df.printSchema()

root
 |-- Firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)



# 3. Convert Empty RDD to DataFrame

# You can also create empty DataFrame by converting empty RDD to DataFrameusing toDF(). # 

In [12]:
#Convert empty RDD to Dataframe

df1=emptyRDD.toDF(schema)

In [13]:
df1.printSchema()

root
 |-- Firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)



# 4. Create Empty DataFrame with Schema.

In [14]:
#Create empty DataFrame directly
df2=spark.createDataFrame([],schema)
df2.printSchema()

root
 |-- Firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)



# 5. Create Empty DataFrame without Schema (no columns)

# To create empty DataFrame with out schema (no columns) just create a empty schema and use it while creating PySpark DataFrame.

In [15]:
df3=spark.createDataFrame(data=[],schema=StructType([]))

In [16]:
df3.printSchema()

root



# Convert PySpark DataFrame to Pandas

# 1.PySpark DataFrame can be converted to Python Pandas DataFrame using a function toPandas()

# 2.Pandas run operations on a single node whereas PySpark runs on multiple machines. 

# Prepare PySpark DataFrame

In [17]:
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("spark").getOrCreate()
data=[
    ("James","","Smith","36636","M",60000),
    ("Michael","Rose","","40288","M",70000),
    ("Robert","","Williams","42114","",400000),
    ("Maria","Anne","Jones","39192","F",500000),
    ("Jen","Mary","Brown","","F",0)

]
columns=["first_name","middle_name","last_name","dob","gender","salary"]
pysparkDF=spark.createDataFrame(data=data,schema=columns)
pysparkDF.printSchema()
pysparkDF.show(truncate=False)

23/06/15 07:46:49 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


root
 |-- first_name: string (nullable = true)
 |-- middle_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)



                                                                                

+----------+-----------+---------+-----+------+------+
|first_name|middle_name|last_name|dob  |gender|salary|
+----------+-----------+---------+-----+------+------+
|James     |           |Smith    |36636|M     |60000 |
|Michael   |Rose       |         |40288|M     |70000 |
|Robert    |           |Williams |42114|      |400000|
|Maria     |Anne       |Jones    |39192|F     |500000|
|Jen       |Mary       |Brown    |     |F     |0     |
+----------+-----------+---------+-----+------+------+



# Convert PySpark Dataframe to Pandas DataFrame

# PySpark DataFrame provides a method toPandas() to convert Python Pandas DataFrame.

In [18]:
pandasDF=pysparkDF.toPandas()
print(pandasDF)

  first_name middle_name last_name    dob gender  salary
0      James                 Smith  36636      M   60000
1    Michael        Rose            40288      M   70000
2     Robert              Williams  42114         400000
3      Maria        Anne     Jones  39192      F  500000
4        Jen        Mary     Brown             F       0


In [19]:
import pandas as pd

In [20]:
pd.DataFrame(pandasDF)

Unnamed: 0,first_name,middle_name,last_name,dob,gender,salary
0,James,,Smith,36636.0,M,60000
1,Michael,Rose,,40288.0,M,70000
2,Robert,,Williams,42114.0,,400000
3,Maria,Anne,Jones,39192.0,F,500000
4,Jen,Mary,Brown,,F,0
